Skip to main content

shelly_data/
app_services.rs

1use crate::{
2    integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
3    DataError, DataResult,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::{
8    collections::{BTreeMap, VecDeque},
9    fmt,
10    sync::{Arc, Mutex},
11    time::{SystemTime, UNIX_EPOCH},
12};
13use tracing::{info, warn};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum AppServiceErrorKind {
18    InvalidCredentials,
19    Unauthorized,
20    NotFound,
21    Conflict,
22    InvalidInput,
23    Unavailable,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct AppServiceError {
28    pub source: String,
29    pub kind: AppServiceErrorKind,
30    pub code: String,
31    pub message: String,
32}
33
34impl AppServiceError {
35    pub fn new(
36        source: impl Into<String>,
37        kind: AppServiceErrorKind,
38        code: impl Into<String>,
39        message: impl Into<String>,
40    ) -> Self {
41        Self {
42            source: source.into(),
43            kind,
44            code: code.into(),
45            message: message.into(),
46        }
47    }
48}
49
50impl fmt::Display for AppServiceError {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        write!(f, "[{}:{}] {}", self.source, self.code, self.message)
53    }
54}
55
56impl std::error::Error for AppServiceError {}
57
58pub type AppServiceResult<T> = Result<T, AppServiceError>;
59
60pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
61    DataError::Integration(format!("[{}] {}", source.into(), err))
62}
63
64pub fn map_app_service_result<T>(
65    source: impl Into<String>,
66    result: AppServiceResult<T>,
67) -> DataResult<T> {
68    result.map_err(|err| map_app_service_error(source, err))
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct AuthIdentity {
73    pub user_id: String,
74    pub email: String,
75    pub display_name: Option<String>,
76    pub roles: Vec<String>,
77    pub attributes: BTreeMap<String, String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct AuthCredentials {
82    pub identifier: String,
83    pub password: String,
84}
85
86impl AuthCredentials {
87    pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
88        Self {
89            identifier: identifier.into(),
90            password: password.into(),
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct AuthSession {
97    pub token: String,
98    pub identity: AuthIdentity,
99    pub issued_at_unix_ms: u64,
100    pub expires_at_unix_ms: u64,
101}
102
103pub trait IdentityService: Send + Sync {
104    fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
105    fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
106    fn sign_out(&self, token: &str) -> AppServiceResult<()>;
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110struct IdentityRecord {
111    user_id: String,
112    email: String,
113    password: String,
114    display_name: Option<String>,
115    roles: Vec<String>,
116    attributes: BTreeMap<String, String>,
117}
118
119pub struct InMemoryIdentityService {
120    users: Mutex<BTreeMap<String, IdentityRecord>>,
121    sessions: Mutex<BTreeMap<String, AuthSession>>,
122    next_session_id: Mutex<u64>,
123    session_ttl_ms: u64,
124}
125
126impl InMemoryIdentityService {
127    pub fn new() -> Self {
128        let service = Self {
129            users: Mutex::new(BTreeMap::new()),
130            sessions: Mutex::new(BTreeMap::new()),
131            next_session_id: Mutex::new(0),
132            session_ttl_ms: 8 * 60 * 60 * 1000,
133        };
134        service
135            .insert_user(
136                "user-1",
137                "demo@shelly.dev",
138                "shelly-demo",
139                Some("Shelly Demo"),
140                vec!["admin".to_string()],
141                BTreeMap::new(),
142            )
143            .expect("default in-memory auth user must be inserted");
144        service
145    }
146
147    pub fn insert_user(
148        &self,
149        user_id: impl Into<String>,
150        email: impl Into<String>,
151        password: impl Into<String>,
152        display_name: Option<&str>,
153        roles: Vec<String>,
154        attributes: BTreeMap<String, String>,
155    ) -> AppServiceResult<()> {
156        let email = email.into().trim().to_ascii_lowercase();
157        if email.is_empty() {
158            return Err(AppServiceError::new(
159                "auth",
160                AppServiceErrorKind::InvalidInput,
161                "empty_email",
162                "email must not be empty",
163            ));
164        }
165        let mut users = self.users.lock().map_err(|_| {
166            AppServiceError::new(
167                "auth",
168                AppServiceErrorKind::Unavailable,
169                "users_lock_poisoned",
170                "auth users lock poisoned",
171            )
172        })?;
173        users.insert(
174            email.clone(),
175            IdentityRecord {
176                user_id: user_id.into(),
177                email: email.clone(),
178                password: password.into(),
179                display_name: display_name.map(ToString::to_string),
180                roles,
181                attributes,
182            },
183        );
184        info!(
185            target: "shelly.integration.app_service",
186            service = "auth",
187            operation = "insert_user",
188            email,
189            "Shelly app service operation executed"
190        );
191        Ok(())
192    }
193
194    pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
195        self.session_ttl_ms = ttl_ms.max(1);
196        self
197    }
198}
199
200impl Default for InMemoryIdentityService {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl IdentityService for InMemoryIdentityService {
207    fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
208        let identifier = credentials.identifier.trim().to_ascii_lowercase();
209        let users = self.users.lock().map_err(|_| {
210            AppServiceError::new(
211                "auth",
212                AppServiceErrorKind::Unavailable,
213                "users_lock_poisoned",
214                "auth users lock poisoned",
215            )
216        })?;
217        let Some(record) = users.get(&identifier) else {
218            return Err(AppServiceError::new(
219                "auth",
220                AppServiceErrorKind::InvalidCredentials,
221                "invalid_credentials",
222                "invalid credentials",
223            ));
224        };
225        if record.password != credentials.password {
226            return Err(AppServiceError::new(
227                "auth",
228                AppServiceErrorKind::InvalidCredentials,
229                "invalid_credentials",
230                "invalid credentials",
231            ));
232        }
233        let identity = AuthIdentity {
234            user_id: record.user_id.clone(),
235            email: record.email.clone(),
236            display_name: record.display_name.clone(),
237            roles: record.roles.clone(),
238            attributes: record.attributes.clone(),
239        };
240        drop(users);
241
242        let mut next_session_id = self.next_session_id.lock().map_err(|_| {
243            AppServiceError::new(
244                "auth",
245                AppServiceErrorKind::Unavailable,
246                "session_id_lock_poisoned",
247                "session id lock poisoned",
248            )
249        })?;
250        *next_session_id = next_session_id.saturating_add(1);
251        let token = format!("sess-{next_session_id}");
252        let issued_at_unix_ms = now_unix_ms();
253        let session = AuthSession {
254            token: token.clone(),
255            identity,
256            issued_at_unix_ms,
257            expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
258        };
259        self.sessions
260            .lock()
261            .map_err(|_| {
262                AppServiceError::new(
263                    "auth",
264                    AppServiceErrorKind::Unavailable,
265                    "sessions_lock_poisoned",
266                    "auth sessions lock poisoned",
267                )
268            })?
269            .insert(token.clone(), session.clone());
270        info!(
271            target: "shelly.integration.app_service",
272            service = "auth",
273            operation = "sign_in",
274            token = token.as_str(),
275            user_id = session.identity.user_id.as_str(),
276            "Shelly app service operation executed"
277        );
278        Ok(session)
279    }
280
281    fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
282        let mut sessions = self.sessions.lock().map_err(|_| {
283            AppServiceError::new(
284                "auth",
285                AppServiceErrorKind::Unavailable,
286                "sessions_lock_poisoned",
287                "auth sessions lock poisoned",
288            )
289        })?;
290        let Some(session) = sessions.get(token).cloned() else {
291            return Ok(None);
292        };
293        if now_unix_ms() >= session.expires_at_unix_ms {
294            sessions.remove(token);
295            warn!(
296                target: "shelly.integration.app_service",
297                service = "auth",
298                operation = "session_identity",
299                token,
300                "Shelly app session expired and was evicted"
301            );
302            return Ok(None);
303        }
304        Ok(Some(session.identity))
305    }
306
307    fn sign_out(&self, token: &str) -> AppServiceResult<()> {
308        self.sessions
309            .lock()
310            .map_err(|_| {
311                AppServiceError::new(
312                    "auth",
313                    AppServiceErrorKind::Unavailable,
314                    "sessions_lock_poisoned",
315                    "auth sessions lock poisoned",
316                )
317            })?
318            .remove(token);
319        info!(
320            target: "shelly.integration.app_service",
321            service = "auth",
322            operation = "sign_out",
323            token,
324            "Shelly app service operation executed"
325        );
326        Ok(())
327    }
328}
329
330#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
331pub struct BackgroundJobRequest {
332    pub job: String,
333    pub payload: Value,
334    pub idempotency_key: String,
335    pub scheduled_at_unix_ms: Option<u64>,
336    pub metadata: BTreeMap<String, String>,
337}
338
339impl BackgroundJobRequest {
340    pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
341        Self {
342            job: job.into(),
343            payload,
344            idempotency_key: idempotency_key.into(),
345            scheduled_at_unix_ms: None,
346            metadata: BTreeMap::new(),
347        }
348    }
349
350    pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
351        self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
352        self
353    }
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct BackgroundJobHandle {
358    pub id: String,
359    pub job: String,
360    pub idempotency_key: String,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case")]
365pub enum BackgroundJobState {
366    Queued,
367    Running,
368    Succeeded,
369    Failed,
370    Canceled,
371}
372
373impl From<JobState> for BackgroundJobState {
374    fn from(value: JobState) -> Self {
375        match value {
376            JobState::Queued => Self::Queued,
377            JobState::Running => Self::Running,
378            JobState::Succeeded => Self::Succeeded,
379            JobState::Failed => Self::Failed,
380            JobState::Canceled => Self::Canceled,
381        }
382    }
383}
384
385#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
386pub struct BackgroundJobStatus {
387    pub id: String,
388    pub state: BackgroundJobState,
389    pub attempts: u32,
390    pub result: Option<Value>,
391    pub error: Option<String>,
392}
393
394pub trait BackgroundJobService: Send + Sync {
395    fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
396    fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
397}
398
399#[derive(Clone)]
400pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
401    orchestrator: Arc<T>,
402}
403
404impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
405    pub fn new(orchestrator: Arc<T>) -> Self {
406        Self { orchestrator }
407    }
408}
409
410impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
411    fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
412        let mut job_request =
413            JobRequest::new(request.job, request.payload, request.idempotency_key);
414        job_request.metadata = request.metadata;
415        if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
416            job_request.metadata.insert(
417                "scheduled_at_unix_ms".to_string(),
418                scheduled_at_unix_ms.to_string(),
419            );
420        }
421
422        let handle = self
423            .orchestrator
424            .enqueue(job_request)
425            .map_err(map_job_error)?;
426        Ok(BackgroundJobHandle {
427            id: handle.id,
428            job: handle.workflow,
429            idempotency_key: handle.idempotency_key,
430        })
431    }
432
433    fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
434        let status = self.orchestrator.status(id).map_err(map_job_error)?;
435        Ok(BackgroundJobStatus {
436            id: status.id,
437            state: BackgroundJobState::from(status.state),
438            attempts: status.attempts,
439            result: status.result,
440            error: status.error.map(|err| err.to_string()),
441        })
442    }
443}
444
445#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
446pub struct EmailMessage {
447    pub to: String,
448    pub subject: String,
449    pub text_body: String,
450    pub html_body: Option<String>,
451    pub headers: BTreeMap<String, String>,
452}
453
454impl EmailMessage {
455    pub fn new(
456        to: impl Into<String>,
457        subject: impl Into<String>,
458        text_body: impl Into<String>,
459    ) -> Self {
460        Self {
461            to: to.into(),
462            subject: subject.into(),
463            text_body: text_body.into(),
464            html_body: None,
465            headers: BTreeMap::new(),
466        }
467    }
468}
469
470#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
471pub struct EmailReceipt {
472    pub message_id: String,
473    pub provider: String,
474    pub accepted_at_unix_ms: u64,
475}
476
477pub trait TransactionalEmailService: Send + Sync {
478    fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
479}
480
481#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
482pub struct SentEmail {
483    pub message: EmailMessage,
484    pub receipt: EmailReceipt,
485}
486
487pub struct InMemoryTransactionalEmailService {
488    provider: String,
489    next_id: Mutex<u64>,
490    sent: Mutex<Vec<SentEmail>>,
491}
492
493impl InMemoryTransactionalEmailService {
494    pub fn new(provider: impl Into<String>) -> Self {
495        Self {
496            provider: provider.into(),
497            next_id: Mutex::new(0),
498            sent: Mutex::new(Vec::new()),
499        }
500    }
501
502    pub fn sent(&self) -> Vec<SentEmail> {
503        self.sent
504            .lock()
505            .map(|entries| entries.clone())
506            .unwrap_or_default()
507    }
508}
509
510impl Default for InMemoryTransactionalEmailService {
511    fn default() -> Self {
512        Self::new("in-memory-mailer")
513    }
514}
515
516impl TransactionalEmailService for InMemoryTransactionalEmailService {
517    fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
518        if message.to.trim().is_empty() {
519            return Err(AppServiceError::new(
520                "email",
521                AppServiceErrorKind::InvalidInput,
522                "empty_recipient",
523                "email recipient must not be empty",
524            ));
525        }
526        let mut next_id = self.next_id.lock().map_err(|_| {
527            AppServiceError::new(
528                "email",
529                AppServiceErrorKind::Unavailable,
530                "email_id_lock_poisoned",
531                "email id lock poisoned",
532            )
533        })?;
534        *next_id = next_id.saturating_add(1);
535        let message_id = format!("mail-{next_id}");
536        let receipt = EmailReceipt {
537            message_id: message_id.clone(),
538            provider: self.provider.clone(),
539            accepted_at_unix_ms: now_unix_ms(),
540        };
541        self.sent
542            .lock()
543            .map_err(|_| {
544                AppServiceError::new(
545                    "email",
546                    AppServiceErrorKind::Unavailable,
547                    "email_log_lock_poisoned",
548                    "email log lock poisoned",
549                )
550            })?
551            .push(SentEmail {
552                message: message.clone(),
553                receipt: receipt.clone(),
554            });
555        info!(
556            target: "shelly.integration.app_service",
557            service = "email",
558            operation = "send",
559            message_id = message_id.as_str(),
560            recipient = message.to.as_str(),
561            "Shelly app service operation executed"
562        );
563        Ok(receipt)
564    }
565}
566
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct CacheEntry {
569    pub value: Value,
570    pub expires_at_unix_ms: Option<u64>,
571}
572
573impl CacheEntry {
574    pub fn persistent(value: Value) -> Self {
575        Self {
576            value,
577            expires_at_unix_ms: None,
578        }
579    }
580
581    pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
582        Self {
583            value,
584            expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
585        }
586    }
587}
588
589pub trait CacheBackend: Send + Sync {
590    fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
591    fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
592    fn delete(&self, key: &str) -> AppServiceResult<()>;
593}
594
595#[derive(Default)]
596pub struct InMemoryCacheBackend {
597    entries: Mutex<BTreeMap<String, CacheEntry>>,
598}
599
600impl CacheBackend for InMemoryCacheBackend {
601    fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
602        let mut entries = self.entries.lock().map_err(|_| {
603            AppServiceError::new(
604                "cache",
605                AppServiceErrorKind::Unavailable,
606                "cache_lock_poisoned",
607                "cache lock poisoned",
608            )
609        })?;
610        let Some(entry) = entries.get(key).cloned() else {
611            return Ok(None);
612        };
613        if entry
614            .expires_at_unix_ms
615            .is_some_and(|expires_at| now_unix_ms() >= expires_at)
616        {
617            entries.remove(key);
618            return Ok(None);
619        }
620        Ok(Some(entry.value))
621    }
622
623    fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
624        self.entries
625            .lock()
626            .map_err(|_| {
627                AppServiceError::new(
628                    "cache",
629                    AppServiceErrorKind::Unavailable,
630                    "cache_lock_poisoned",
631                    "cache lock poisoned",
632                )
633            })?
634            .insert(key.to_string(), entry);
635        Ok(())
636    }
637
638    fn delete(&self, key: &str) -> AppServiceResult<()> {
639        self.entries
640            .lock()
641            .map_err(|_| {
642                AppServiceError::new(
643                    "cache",
644                    AppServiceErrorKind::Unavailable,
645                    "cache_lock_poisoned",
646                    "cache lock poisoned",
647                )
648            })?
649            .remove(key);
650        Ok(())
651    }
652}
653
654#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct QueueMessage {
656    pub id: String,
657    pub topic: String,
658    pub payload: Value,
659    pub attempts: u32,
660    pub enqueued_at_unix_ms: u64,
661    pub scheduled_at_unix_ms: Option<u64>,
662}
663
664pub trait QueueBackend: Send + Sync {
665    fn enqueue(
666        &self,
667        topic: &str,
668        payload: Value,
669        scheduled_at_unix_ms: Option<u64>,
670    ) -> AppServiceResult<QueueMessage>;
671    fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
672    fn len(&self, topic: &str) -> AppServiceResult<usize>;
673}
674
675#[derive(Default)]
676pub struct InMemoryQueueBackend {
677    messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
678    next_id: Mutex<u64>,
679}
680
681impl QueueBackend for InMemoryQueueBackend {
682    fn enqueue(
683        &self,
684        topic: &str,
685        payload: Value,
686        scheduled_at_unix_ms: Option<u64>,
687    ) -> AppServiceResult<QueueMessage> {
688        if topic.trim().is_empty() {
689            return Err(AppServiceError::new(
690                "queue",
691                AppServiceErrorKind::InvalidInput,
692                "empty_topic",
693                "queue topic must not be empty",
694            ));
695        }
696        let mut next_id = self.next_id.lock().map_err(|_| {
697            AppServiceError::new(
698                "queue",
699                AppServiceErrorKind::Unavailable,
700                "queue_id_lock_poisoned",
701                "queue id lock poisoned",
702            )
703        })?;
704        *next_id = next_id.saturating_add(1);
705        let message = QueueMessage {
706            id: format!("queue-{next_id}"),
707            topic: topic.to_string(),
708            payload,
709            attempts: 0,
710            enqueued_at_unix_ms: now_unix_ms(),
711            scheduled_at_unix_ms,
712        };
713        self.messages
714            .lock()
715            .map_err(|_| {
716                AppServiceError::new(
717                    "queue",
718                    AppServiceErrorKind::Unavailable,
719                    "queue_lock_poisoned",
720                    "queue lock poisoned",
721                )
722            })?
723            .entry(topic.to_string())
724            .or_default()
725            .push_back(message.clone());
726        Ok(message)
727    }
728
729    fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
730        let mut queues = self.messages.lock().map_err(|_| {
731            AppServiceError::new(
732                "queue",
733                AppServiceErrorKind::Unavailable,
734                "queue_lock_poisoned",
735                "queue lock poisoned",
736            )
737        })?;
738        let Some(queue) = queues.get_mut(topic) else {
739            return Ok(None);
740        };
741        Ok(queue.pop_front().map(|mut message| {
742            message.attempts = message.attempts.saturating_add(1);
743            message
744        }))
745    }
746
747    fn len(&self, topic: &str) -> AppServiceResult<usize> {
748        let queues = self.messages.lock().map_err(|_| {
749            AppServiceError::new(
750                "queue",
751                AppServiceErrorKind::Unavailable,
752                "queue_lock_poisoned",
753                "queue lock poisoned",
754            )
755        })?;
756        Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
757    }
758}
759
760fn map_job_error(err: IntegrationError) -> AppServiceError {
761    let kind = match err.kind {
762        IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
763        IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
764        IntegrationErrorKind::Unavailable
765        | IntegrationErrorKind::Timeout
766        | IntegrationErrorKind::RateLimited
767        | IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
768        IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
769    };
770    AppServiceError::new(
771        "jobs",
772        kind,
773        err.code.unwrap_or_else(|| "job_error".to_string()),
774        err.message,
775    )
776}
777
778fn now_unix_ms() -> u64 {
779    SystemTime::now()
780        .duration_since(UNIX_EPOCH)
781        .unwrap_or_default()
782        .as_millis() as u64
783}
784
785#[cfg(test)]
786mod tests {
787    use super::{
788        AppServiceErrorKind, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
789        BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
790        InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
791        InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
792        TransactionalEmailService,
793    };
794    use crate::integrations::InMemoryJobOrchestrator;
795    use serde_json::json;
796    use std::sync::Arc;
797
798    #[test]
799    fn in_memory_identity_service_signs_in_and_signs_out() {
800        let service = InMemoryIdentityService::default();
801        let session = service
802            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
803            .unwrap();
804
805        let identity = service.session_identity(&session.token).unwrap().unwrap();
806        assert_eq!(identity.email, "demo@shelly.dev");
807        assert!(identity.roles.contains(&"admin".to_string()));
808
809        service.sign_out(&session.token).unwrap();
810        let missing = service.session_identity(&session.token).unwrap();
811        assert!(missing.is_none());
812    }
813
814    #[test]
815    fn in_memory_identity_service_rejects_invalid_credentials() {
816        let service = InMemoryIdentityService::default();
817        let err = service
818            .sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
819            .unwrap_err();
820        assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
821    }
822
823    #[test]
824    fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
825        let cache = InMemoryCacheBackend::default();
826        cache
827            .set(
828                "sessions:1",
829                CacheEntry::persistent(json!({"active": true})),
830            )
831            .unwrap();
832        let value = cache.get("sessions:1").unwrap();
833        assert_eq!(value, Some(json!({"active": true})));
834
835        cache.delete("sessions:1").unwrap();
836        let cleared = cache.get("sessions:1").unwrap();
837        assert!(cleared.is_none());
838
839        cache
840            .set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
841            .unwrap();
842        let expired = cache.get("temp").unwrap();
843        assert!(expired.is_none());
844    }
845
846    #[test]
847    fn in_memory_queue_backend_is_fifo_per_topic() {
848        let queue = InMemoryQueueBackend::default();
849        queue.enqueue("emails", json!({"id": 1}), None).unwrap();
850        queue.enqueue("emails", json!({"id": 2}), None).unwrap();
851        assert_eq!(queue.len("emails").unwrap(), 2);
852
853        let first = queue.dequeue("emails").unwrap().unwrap();
854        let second = queue.dequeue("emails").unwrap().unwrap();
855        assert_eq!(first.payload, json!({"id": 1}));
856        assert_eq!(second.payload, json!({"id": 2}));
857        assert_eq!(queue.len("emails").unwrap(), 0);
858    }
859
860    #[test]
861    fn transactional_email_service_records_sent_messages() {
862        let mailer = InMemoryTransactionalEmailService::default();
863        let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
864        let receipt = mailer.send(message.clone()).unwrap();
865        assert_eq!(receipt.provider, "in-memory-mailer");
866
867        let sent = mailer.sent();
868        assert_eq!(sent.len(), 1);
869        assert_eq!(sent[0].message, message);
870        assert_eq!(sent[0].receipt.message_id, receipt.message_id);
871    }
872
873    #[test]
874    fn background_job_service_wraps_job_orchestrator_contract() {
875        let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
876        let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
877        let handle = jobs
878            .enqueue(BackgroundJobRequest::new(
879                "send_digest",
880                json!({"account_id": 7}),
881                "send_digest:7",
882            ))
883            .unwrap();
884        let queued = jobs.status(&handle.id).unwrap();
885        assert_eq!(queued.state, BackgroundJobState::Queued);
886
887        orchestrator
888            .mark_succeeded(&handle.id, json!({"emails_sent": 3}))
889            .unwrap();
890        let done = jobs.status(&handle.id).unwrap();
891        assert_eq!(done.state, BackgroundJobState::Succeeded);
892        assert_eq!(done.result, Some(json!({"emails_sent": 3})));
893    }
894}