Skip to main content

shelly_data/
app_services.rs

1use crate::{
2    integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
3    DataError, DataResult,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::{
8    collections::{BTreeMap, VecDeque},
9    fmt,
10    sync::{Arc, Mutex},
11    time::{SystemTime, UNIX_EPOCH},
12};
13use tracing::{info, warn};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum AppServiceErrorKind {
18    InvalidCredentials,
19    Unauthorized,
20    NotFound,
21    Conflict,
22    InvalidInput,
23    Unavailable,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct AppServiceError {
28    pub source: String,
29    pub kind: AppServiceErrorKind,
30    pub code: String,
31    pub message: String,
32}
33
34impl AppServiceError {
35    pub fn new(
36        source: impl Into<String>,
37        kind: AppServiceErrorKind,
38        code: impl Into<String>,
39        message: impl Into<String>,
40    ) -> Self {
41        Self {
42            source: source.into(),
43            kind,
44            code: code.into(),
45            message: message.into(),
46        }
47    }
48}
49
50impl fmt::Display for AppServiceError {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        write!(f, "[{}:{}] {}", self.source, self.code, self.message)
53    }
54}
55
56impl std::error::Error for AppServiceError {}
57
58pub type AppServiceResult<T> = Result<T, AppServiceError>;
59
60pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
61    DataError::Integration(format!("[{}] {}", source.into(), err))
62}
63
64pub fn map_app_service_result<T>(
65    source: impl Into<String>,
66    result: AppServiceResult<T>,
67) -> DataResult<T> {
68    result.map_err(|err| map_app_service_error(source, err))
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct AuthIdentity {
73    pub user_id: String,
74    pub email: String,
75    pub display_name: Option<String>,
76    pub roles: Vec<String>,
77    pub attributes: BTreeMap<String, String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct AuthCredentials {
82    pub identifier: String,
83    pub password: String,
84}
85
86impl AuthCredentials {
87    pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
88        Self {
89            identifier: identifier.into(),
90            password: password.into(),
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct AuthSession {
97    pub token: String,
98    pub identity: AuthIdentity,
99    pub issued_at_unix_ms: u64,
100    pub expires_at_unix_ms: u64,
101}
102
103pub trait IdentityService: Send + Sync {
104    fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
105    fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
106    fn sign_out(&self, token: &str) -> AppServiceResult<()>;
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110struct IdentityRecord {
111    user_id: String,
112    email: String,
113    password: String,
114    display_name: Option<String>,
115    roles: Vec<String>,
116    attributes: BTreeMap<String, String>,
117}
118
119pub struct InMemoryIdentityService {
120    users: Mutex<BTreeMap<String, IdentityRecord>>,
121    sessions: Mutex<BTreeMap<String, AuthSession>>,
122    next_session_id: Mutex<u64>,
123    session_ttl_ms: u64,
124}
125
126impl InMemoryIdentityService {
127    pub fn new() -> Self {
128        let service = Self {
129            users: Mutex::new(BTreeMap::new()),
130            sessions: Mutex::new(BTreeMap::new()),
131            next_session_id: Mutex::new(0),
132            session_ttl_ms: 8 * 60 * 60 * 1000,
133        };
134        service
135            .insert_user(
136                "user-1",
137                "demo@shelly.dev",
138                "shelly-demo",
139                Some("Shelly Demo"),
140                vec!["admin".to_string()],
141                BTreeMap::new(),
142            )
143            .expect("default in-memory auth user must be inserted");
144        service
145    }
146
147    pub fn insert_user(
148        &self,
149        user_id: impl Into<String>,
150        email: impl Into<String>,
151        password: impl Into<String>,
152        display_name: Option<&str>,
153        roles: Vec<String>,
154        attributes: BTreeMap<String, String>,
155    ) -> AppServiceResult<()> {
156        let email = email.into().trim().to_ascii_lowercase();
157        if email.is_empty() {
158            return Err(AppServiceError::new(
159                "auth",
160                AppServiceErrorKind::InvalidInput,
161                "empty_email",
162                "email must not be empty",
163            ));
164        }
165        let mut users = self.users.lock().map_err(|_| {
166            AppServiceError::new(
167                "auth",
168                AppServiceErrorKind::Unavailable,
169                "users_lock_poisoned",
170                "auth users lock poisoned",
171            )
172        })?;
173        users.insert(
174            email.clone(),
175            IdentityRecord {
176                user_id: user_id.into(),
177                email: email.clone(),
178                password: password.into(),
179                display_name: display_name.map(ToString::to_string),
180                roles,
181                attributes,
182            },
183        );
184        info!(
185            target: "shelly.integration.app_service",
186            service = "auth",
187            operation = "insert_user",
188            email,
189            "Shelly app service operation executed"
190        );
191        Ok(())
192    }
193
194    pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
195        self.session_ttl_ms = ttl_ms.max(1);
196        self
197    }
198}
199
200impl Default for InMemoryIdentityService {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl IdentityService for InMemoryIdentityService {
207    fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
208        let identifier = credentials.identifier.trim().to_ascii_lowercase();
209        let users = self.users.lock().map_err(|_| {
210            AppServiceError::new(
211                "auth",
212                AppServiceErrorKind::Unavailable,
213                "users_lock_poisoned",
214                "auth users lock poisoned",
215            )
216        })?;
217        let Some(record) = users.get(&identifier) else {
218            return Err(AppServiceError::new(
219                "auth",
220                AppServiceErrorKind::InvalidCredentials,
221                "invalid_credentials",
222                "invalid credentials",
223            ));
224        };
225        if record.password != credentials.password {
226            return Err(AppServiceError::new(
227                "auth",
228                AppServiceErrorKind::InvalidCredentials,
229                "invalid_credentials",
230                "invalid credentials",
231            ));
232        }
233        let identity = AuthIdentity {
234            user_id: record.user_id.clone(),
235            email: record.email.clone(),
236            display_name: record.display_name.clone(),
237            roles: record.roles.clone(),
238            attributes: record.attributes.clone(),
239        };
240        drop(users);
241
242        let mut next_session_id = self.next_session_id.lock().map_err(|_| {
243            AppServiceError::new(
244                "auth",
245                AppServiceErrorKind::Unavailable,
246                "session_id_lock_poisoned",
247                "session id lock poisoned",
248            )
249        })?;
250        *next_session_id = next_session_id.saturating_add(1);
251        let token = format!("sess-{next_session_id}");
252        let issued_at_unix_ms = now_unix_ms();
253        let session = AuthSession {
254            token: token.clone(),
255            identity,
256            issued_at_unix_ms,
257            expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
258        };
259        self.sessions
260            .lock()
261            .map_err(|_| {
262                AppServiceError::new(
263                    "auth",
264                    AppServiceErrorKind::Unavailable,
265                    "sessions_lock_poisoned",
266                    "auth sessions lock poisoned",
267                )
268            })?
269            .insert(token.clone(), session.clone());
270        info!(
271            target: "shelly.integration.app_service",
272            service = "auth",
273            operation = "sign_in",
274            token = token.as_str(),
275            user_id = session.identity.user_id.as_str(),
276            "Shelly app service operation executed"
277        );
278        Ok(session)
279    }
280
281    fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
282        let mut sessions = self.sessions.lock().map_err(|_| {
283            AppServiceError::new(
284                "auth",
285                AppServiceErrorKind::Unavailable,
286                "sessions_lock_poisoned",
287                "auth sessions lock poisoned",
288            )
289        })?;
290        let Some(session) = sessions.get(token).cloned() else {
291            return Ok(None);
292        };
293        if now_unix_ms() >= session.expires_at_unix_ms {
294            sessions.remove(token);
295            warn!(
296                target: "shelly.integration.app_service",
297                service = "auth",
298                operation = "session_identity",
299                token,
300                "Shelly app session expired and was evicted"
301            );
302            return Ok(None);
303        }
304        Ok(Some(session.identity))
305    }
306
307    fn sign_out(&self, token: &str) -> AppServiceResult<()> {
308        self.sessions
309            .lock()
310            .map_err(|_| {
311                AppServiceError::new(
312                    "auth",
313                    AppServiceErrorKind::Unavailable,
314                    "sessions_lock_poisoned",
315                    "auth sessions lock poisoned",
316                )
317            })?
318            .remove(token);
319        info!(
320            target: "shelly.integration.app_service",
321            service = "auth",
322            operation = "sign_out",
323            token,
324            "Shelly app service operation executed"
325        );
326        Ok(())
327    }
328}
329
330#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
331pub struct BackgroundJobRequest {
332    pub job: String,
333    pub payload: Value,
334    pub idempotency_key: String,
335    pub scheduled_at_unix_ms: Option<u64>,
336    pub metadata: BTreeMap<String, String>,
337}
338
339impl BackgroundJobRequest {
340    pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
341        Self {
342            job: job.into(),
343            payload,
344            idempotency_key: idempotency_key.into(),
345            scheduled_at_unix_ms: None,
346            metadata: BTreeMap::new(),
347        }
348    }
349
350    pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
351        self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
352        self
353    }
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct BackgroundJobHandle {
358    pub id: String,
359    pub job: String,
360    pub idempotency_key: String,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case")]
365pub enum BackgroundJobState {
366    Queued,
367    Running,
368    Succeeded,
369    Failed,
370    Canceled,
371}
372
373impl From<JobState> for BackgroundJobState {
374    fn from(value: JobState) -> Self {
375        match value {
376            JobState::Queued => Self::Queued,
377            JobState::Running => Self::Running,
378            JobState::Succeeded => Self::Succeeded,
379            JobState::Failed => Self::Failed,
380            JobState::Canceled => Self::Canceled,
381        }
382    }
383}
384
385#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
386pub struct BackgroundJobStatus {
387    pub id: String,
388    pub state: BackgroundJobState,
389    pub attempts: u32,
390    pub result: Option<Value>,
391    pub error: Option<String>,
392}
393
394pub trait BackgroundJobService: Send + Sync {
395    fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
396    fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
400pub struct TenantBackgroundJobQuota {
401    pub max_jobs_per_window: usize,
402    pub window_ms: u64,
403    pub require_tenant_id: bool,
404}
405
406impl Default for TenantBackgroundJobQuota {
407    fn default() -> Self {
408        Self {
409            max_jobs_per_window: 10_000,
410            window_ms: 60_000,
411            require_tenant_id: true,
412        }
413    }
414}
415
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417struct TenantJobWindow {
418    started_at_unix_ms: u64,
419    enqueued: usize,
420}
421
422impl TenantJobWindow {
423    fn new(now_unix_ms: u64) -> Self {
424        Self {
425            started_at_unix_ms: now_unix_ms,
426            enqueued: 0,
427        }
428    }
429
430    fn roll_if_needed(&mut self, now_unix_ms: u64, window_ms: u64) {
431        if now_unix_ms.saturating_sub(self.started_at_unix_ms) >= window_ms {
432            *self = Self::new(now_unix_ms);
433        }
434    }
435}
436
437pub struct TenantQuotaBackgroundJobs<T: BackgroundJobService> {
438    inner: Arc<T>,
439    quota: TenantBackgroundJobQuota,
440    windows: Mutex<BTreeMap<String, TenantJobWindow>>,
441}
442
443impl<T: BackgroundJobService> TenantQuotaBackgroundJobs<T> {
444    pub fn new(inner: Arc<T>) -> Self {
445        Self {
446            inner,
447            quota: TenantBackgroundJobQuota::default(),
448            windows: Mutex::new(BTreeMap::new()),
449        }
450    }
451
452    pub fn with_quota(mut self, quota: TenantBackgroundJobQuota) -> Self {
453        self.quota = quota;
454        self
455    }
456
457    fn reserve_enqueue_slot(&self, tenant_id: &str) -> AppServiceResult<()> {
458        let now = now_unix_ms();
459        let mut windows = self.windows.lock().map_err(|_| {
460            AppServiceError::new(
461                "jobs",
462                AppServiceErrorKind::Unavailable,
463                "tenant_quota_lock_poisoned",
464                "tenant quota lock poisoned",
465            )
466        })?;
467        let window = windows
468            .entry(tenant_id.to_string())
469            .or_insert_with(|| TenantJobWindow::new(now));
470        window.roll_if_needed(now, self.quota.window_ms.max(1));
471        let next = window.enqueued.saturating_add(1);
472        if next > self.quota.max_jobs_per_window.max(1) {
473            return Err(AppServiceError::new(
474                "jobs",
475                AppServiceErrorKind::Conflict,
476                "tenant_job_quota_exceeded",
477                "tenant background job quota exceeded",
478            ));
479        }
480        window.enqueued = next;
481        Ok(())
482    }
483
484    fn rollback_enqueue_slot(&self, tenant_id: &str) {
485        if let Ok(mut windows) = self.windows.lock() {
486            if let Some(window) = windows.get_mut(tenant_id) {
487                window.enqueued = window.enqueued.saturating_sub(1);
488            }
489        }
490    }
491}
492
493impl<T: BackgroundJobService> BackgroundJobService for TenantQuotaBackgroundJobs<T> {
494    fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
495        let tenant_id = request
496            .metadata
497            .get("tenant_id")
498            .map(String::as_str)
499            .map(str::trim)
500            .filter(|value| !value.is_empty())
501            .map(ToString::to_string);
502        if self.quota.require_tenant_id && tenant_id.is_none() {
503            return Err(AppServiceError::new(
504                "jobs",
505                AppServiceErrorKind::InvalidInput,
506                "tenant_context_required",
507                "tenant_id metadata is required for background jobs",
508            ));
509        }
510
511        if let Some(tenant_id) = tenant_id.as_deref() {
512            self.reserve_enqueue_slot(tenant_id)?;
513        }
514        let result = self.inner.enqueue(request);
515        if result.is_err() {
516            if let Some(tenant_id) = tenant_id.as_deref() {
517                self.rollback_enqueue_slot(tenant_id);
518            }
519        }
520        result
521    }
522
523    fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
524        self.inner.status(id)
525    }
526}
527
528#[derive(Clone)]
529pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
530    orchestrator: Arc<T>,
531}
532
533impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
534    pub fn new(orchestrator: Arc<T>) -> Self {
535        Self { orchestrator }
536    }
537}
538
539impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
540    fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
541        let mut job_request =
542            JobRequest::new(request.job, request.payload, request.idempotency_key);
543        job_request.metadata = request.metadata;
544        if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
545            job_request.metadata.insert(
546                "scheduled_at_unix_ms".to_string(),
547                scheduled_at_unix_ms.to_string(),
548            );
549        }
550
551        let handle = self
552            .orchestrator
553            .enqueue(job_request)
554            .map_err(map_job_error)?;
555        Ok(BackgroundJobHandle {
556            id: handle.id,
557            job: handle.workflow,
558            idempotency_key: handle.idempotency_key,
559        })
560    }
561
562    fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
563        let status = self.orchestrator.status(id).map_err(map_job_error)?;
564        Ok(BackgroundJobStatus {
565            id: status.id,
566            state: BackgroundJobState::from(status.state),
567            attempts: status.attempts,
568            result: status.result,
569            error: status.error.map(|err| err.to_string()),
570        })
571    }
572}
573
574#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
575pub struct EmailMessage {
576    pub to: String,
577    pub subject: String,
578    pub text_body: String,
579    pub html_body: Option<String>,
580    pub headers: BTreeMap<String, String>,
581}
582
583impl EmailMessage {
584    pub fn new(
585        to: impl Into<String>,
586        subject: impl Into<String>,
587        text_body: impl Into<String>,
588    ) -> Self {
589        Self {
590            to: to.into(),
591            subject: subject.into(),
592            text_body: text_body.into(),
593            html_body: None,
594            headers: BTreeMap::new(),
595        }
596    }
597}
598
599#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
600pub struct EmailReceipt {
601    pub message_id: String,
602    pub provider: String,
603    pub accepted_at_unix_ms: u64,
604}
605
606pub trait TransactionalEmailService: Send + Sync {
607    fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
608}
609
610#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
611pub struct SentEmail {
612    pub message: EmailMessage,
613    pub receipt: EmailReceipt,
614}
615
616pub struct InMemoryTransactionalEmailService {
617    provider: String,
618    next_id: Mutex<u64>,
619    sent: Mutex<Vec<SentEmail>>,
620}
621
622impl InMemoryTransactionalEmailService {
623    pub fn new(provider: impl Into<String>) -> Self {
624        Self {
625            provider: provider.into(),
626            next_id: Mutex::new(0),
627            sent: Mutex::new(Vec::new()),
628        }
629    }
630
631    pub fn sent(&self) -> Vec<SentEmail> {
632        self.sent
633            .lock()
634            .map(|entries| entries.clone())
635            .unwrap_or_default()
636    }
637}
638
639impl Default for InMemoryTransactionalEmailService {
640    fn default() -> Self {
641        Self::new("in-memory-mailer")
642    }
643}
644
645impl TransactionalEmailService for InMemoryTransactionalEmailService {
646    fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
647        if message.to.trim().is_empty() {
648            return Err(AppServiceError::new(
649                "email",
650                AppServiceErrorKind::InvalidInput,
651                "empty_recipient",
652                "email recipient must not be empty",
653            ));
654        }
655        let mut next_id = self.next_id.lock().map_err(|_| {
656            AppServiceError::new(
657                "email",
658                AppServiceErrorKind::Unavailable,
659                "email_id_lock_poisoned",
660                "email id lock poisoned",
661            )
662        })?;
663        *next_id = next_id.saturating_add(1);
664        let message_id = format!("mail-{next_id}");
665        let receipt = EmailReceipt {
666            message_id: message_id.clone(),
667            provider: self.provider.clone(),
668            accepted_at_unix_ms: now_unix_ms(),
669        };
670        self.sent
671            .lock()
672            .map_err(|_| {
673                AppServiceError::new(
674                    "email",
675                    AppServiceErrorKind::Unavailable,
676                    "email_log_lock_poisoned",
677                    "email log lock poisoned",
678                )
679            })?
680            .push(SentEmail {
681                message: message.clone(),
682                receipt: receipt.clone(),
683            });
684        info!(
685            target: "shelly.integration.app_service",
686            service = "email",
687            operation = "send",
688            message_id = message_id.as_str(),
689            recipient = message.to.as_str(),
690            "Shelly app service operation executed"
691        );
692        Ok(receipt)
693    }
694}
695
696#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
697pub struct CacheEntry {
698    pub value: Value,
699    pub expires_at_unix_ms: Option<u64>,
700}
701
702impl CacheEntry {
703    pub fn persistent(value: Value) -> Self {
704        Self {
705            value,
706            expires_at_unix_ms: None,
707        }
708    }
709
710    pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
711        Self {
712            value,
713            expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
714        }
715    }
716}
717
718pub trait CacheBackend: Send + Sync {
719    fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
720    fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
721    fn delete(&self, key: &str) -> AppServiceResult<()>;
722}
723
724#[derive(Default)]
725pub struct InMemoryCacheBackend {
726    entries: Mutex<BTreeMap<String, CacheEntry>>,
727}
728
729impl CacheBackend for InMemoryCacheBackend {
730    fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
731        let mut entries = self.entries.lock().map_err(|_| {
732            AppServiceError::new(
733                "cache",
734                AppServiceErrorKind::Unavailable,
735                "cache_lock_poisoned",
736                "cache lock poisoned",
737            )
738        })?;
739        let Some(entry) = entries.get(key).cloned() else {
740            return Ok(None);
741        };
742        if entry
743            .expires_at_unix_ms
744            .is_some_and(|expires_at| now_unix_ms() >= expires_at)
745        {
746            entries.remove(key);
747            return Ok(None);
748        }
749        Ok(Some(entry.value))
750    }
751
752    fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
753        self.entries
754            .lock()
755            .map_err(|_| {
756                AppServiceError::new(
757                    "cache",
758                    AppServiceErrorKind::Unavailable,
759                    "cache_lock_poisoned",
760                    "cache lock poisoned",
761                )
762            })?
763            .insert(key.to_string(), entry);
764        Ok(())
765    }
766
767    fn delete(&self, key: &str) -> AppServiceResult<()> {
768        self.entries
769            .lock()
770            .map_err(|_| {
771                AppServiceError::new(
772                    "cache",
773                    AppServiceErrorKind::Unavailable,
774                    "cache_lock_poisoned",
775                    "cache lock poisoned",
776                )
777            })?
778            .remove(key);
779        Ok(())
780    }
781}
782
783#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
784pub struct QueueMessage {
785    pub id: String,
786    pub topic: String,
787    pub payload: Value,
788    pub attempts: u32,
789    pub enqueued_at_unix_ms: u64,
790    pub scheduled_at_unix_ms: Option<u64>,
791}
792
793pub trait QueueBackend: Send + Sync {
794    fn enqueue(
795        &self,
796        topic: &str,
797        payload: Value,
798        scheduled_at_unix_ms: Option<u64>,
799    ) -> AppServiceResult<QueueMessage>;
800    fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
801    fn len(&self, topic: &str) -> AppServiceResult<usize>;
802}
803
804#[derive(Default)]
805pub struct InMemoryQueueBackend {
806    messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
807    next_id: Mutex<u64>,
808}
809
810impl QueueBackend for InMemoryQueueBackend {
811    fn enqueue(
812        &self,
813        topic: &str,
814        payload: Value,
815        scheduled_at_unix_ms: Option<u64>,
816    ) -> AppServiceResult<QueueMessage> {
817        if topic.trim().is_empty() {
818            return Err(AppServiceError::new(
819                "queue",
820                AppServiceErrorKind::InvalidInput,
821                "empty_topic",
822                "queue topic must not be empty",
823            ));
824        }
825        let mut next_id = self.next_id.lock().map_err(|_| {
826            AppServiceError::new(
827                "queue",
828                AppServiceErrorKind::Unavailable,
829                "queue_id_lock_poisoned",
830                "queue id lock poisoned",
831            )
832        })?;
833        *next_id = next_id.saturating_add(1);
834        let message = QueueMessage {
835            id: format!("queue-{next_id}"),
836            topic: topic.to_string(),
837            payload,
838            attempts: 0,
839            enqueued_at_unix_ms: now_unix_ms(),
840            scheduled_at_unix_ms,
841        };
842        self.messages
843            .lock()
844            .map_err(|_| {
845                AppServiceError::new(
846                    "queue",
847                    AppServiceErrorKind::Unavailable,
848                    "queue_lock_poisoned",
849                    "queue lock poisoned",
850                )
851            })?
852            .entry(topic.to_string())
853            .or_default()
854            .push_back(message.clone());
855        Ok(message)
856    }
857
858    fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
859        let mut queues = self.messages.lock().map_err(|_| {
860            AppServiceError::new(
861                "queue",
862                AppServiceErrorKind::Unavailable,
863                "queue_lock_poisoned",
864                "queue lock poisoned",
865            )
866        })?;
867        let Some(queue) = queues.get_mut(topic) else {
868            return Ok(None);
869        };
870        Ok(queue.pop_front().map(|mut message| {
871            message.attempts = message.attempts.saturating_add(1);
872            message
873        }))
874    }
875
876    fn len(&self, topic: &str) -> AppServiceResult<usize> {
877        let queues = self.messages.lock().map_err(|_| {
878            AppServiceError::new(
879                "queue",
880                AppServiceErrorKind::Unavailable,
881                "queue_lock_poisoned",
882                "queue lock poisoned",
883            )
884        })?;
885        Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
886    }
887}
888
889fn map_job_error(err: IntegrationError) -> AppServiceError {
890    let kind = match err.kind {
891        IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
892        IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
893        IntegrationErrorKind::Unavailable
894        | IntegrationErrorKind::Timeout
895        | IntegrationErrorKind::RateLimited
896        | IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
897        IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
898    };
899    AppServiceError::new(
900        "jobs",
901        kind,
902        err.code.unwrap_or_else(|| "job_error".to_string()),
903        err.message,
904    )
905}
906
907fn now_unix_ms() -> u64 {
908    SystemTime::now()
909        .duration_since(UNIX_EPOCH)
910        .unwrap_or_default()
911        .as_millis() as u64
912}
913
914#[cfg(test)]
915mod tests {
916    use super::{
917        map_app_service_result, map_job_error, AppServiceError, AppServiceErrorKind,
918        AppServiceResult, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
919        BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
920        InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
921        InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
922        TenantBackgroundJobQuota, TenantQuotaBackgroundJobs, TransactionalEmailService,
923    };
924    use crate::integrations::{
925        InMemoryJobOrchestrator, IntegrationError, IntegrationErrorKind, JobState,
926    };
927    use serde_json::json;
928    use std::{
929        panic::{catch_unwind, AssertUnwindSafe},
930        sync::Arc,
931        thread,
932        time::Duration,
933    };
934
935    #[test]
936    fn in_memory_identity_service_signs_in_and_signs_out() {
937        let service = InMemoryIdentityService::default();
938        let session = service
939            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
940            .unwrap();
941
942        let identity = service.session_identity(&session.token).unwrap().unwrap();
943        assert_eq!(identity.email, "demo@shelly.dev");
944        assert!(identity.roles.contains(&"admin".to_string()));
945
946        service.sign_out(&session.token).unwrap();
947        let missing = service.session_identity(&session.token).unwrap();
948        assert!(missing.is_none());
949    }
950
951    #[test]
952    fn in_memory_identity_service_rejects_invalid_credentials() {
953        let service = InMemoryIdentityService::default();
954        let err = service
955            .sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
956            .unwrap_err();
957        assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
958    }
959
960    #[test]
961    fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
962        let cache = InMemoryCacheBackend::default();
963        cache
964            .set(
965                "sessions:1",
966                CacheEntry::persistent(json!({"active": true})),
967            )
968            .unwrap();
969        let value = cache.get("sessions:1").unwrap();
970        assert_eq!(value, Some(json!({"active": true})));
971
972        cache.delete("sessions:1").unwrap();
973        let cleared = cache.get("sessions:1").unwrap();
974        assert!(cleared.is_none());
975
976        cache
977            .set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
978            .unwrap();
979        let expired = cache.get("temp").unwrap();
980        assert!(expired.is_none());
981    }
982
983    #[test]
984    fn in_memory_queue_backend_is_fifo_per_topic() {
985        let queue = InMemoryQueueBackend::default();
986        queue.enqueue("emails", json!({"id": 1}), None).unwrap();
987        queue.enqueue("emails", json!({"id": 2}), None).unwrap();
988        assert_eq!(queue.len("emails").unwrap(), 2);
989
990        let first = queue.dequeue("emails").unwrap().unwrap();
991        let second = queue.dequeue("emails").unwrap().unwrap();
992        assert_eq!(first.payload, json!({"id": 1}));
993        assert_eq!(second.payload, json!({"id": 2}));
994        assert_eq!(queue.len("emails").unwrap(), 0);
995    }
996
997    #[test]
998    fn transactional_email_service_records_sent_messages() {
999        let mailer = InMemoryTransactionalEmailService::default();
1000        let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
1001        let receipt = mailer.send(message.clone()).unwrap();
1002        assert_eq!(receipt.provider, "in-memory-mailer");
1003
1004        let sent = mailer.sent();
1005        assert_eq!(sent.len(), 1);
1006        assert_eq!(sent[0].message, message);
1007        assert_eq!(sent[0].receipt.message_id, receipt.message_id);
1008    }
1009
1010    #[test]
1011    fn background_job_service_wraps_job_orchestrator_contract() {
1012        let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
1013        let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
1014        let handle = jobs
1015            .enqueue(BackgroundJobRequest::new(
1016                "send_digest",
1017                json!({"account_id": 7}),
1018                "send_digest:7",
1019            ))
1020            .unwrap();
1021        let queued = jobs.status(&handle.id).unwrap();
1022        assert_eq!(queued.state, BackgroundJobState::Queued);
1023
1024        orchestrator
1025            .mark_succeeded(&handle.id, json!({"emails_sent": 3}))
1026            .unwrap();
1027        let done = jobs.status(&handle.id).unwrap();
1028        assert_eq!(done.state, BackgroundJobState::Succeeded);
1029        assert_eq!(done.result, Some(json!({"emails_sent": 3})));
1030    }
1031
1032    #[test]
1033    fn tenant_quota_background_jobs_enforce_per_tenant_limits() {
1034        let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
1035        let base = Arc::new(JobOrchestratorBackgroundJobs::new(orchestrator));
1036        let jobs = TenantQuotaBackgroundJobs::new(base).with_quota(TenantBackgroundJobQuota {
1037            max_jobs_per_window: 1,
1038            window_ms: u64::MAX,
1039            require_tenant_id: true,
1040        });
1041
1042        let mut first = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-1");
1043        first
1044            .metadata
1045            .insert("tenant_id".to_string(), "tenant-a".to_string());
1046        let mut second = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-2");
1047        second
1048            .metadata
1049            .insert("tenant_id".to_string(), "tenant-a".to_string());
1050        let mut other_tenant =
1051            BackgroundJobRequest::new("sync", json!({"tenant": "b"}), "sync-b-1");
1052        other_tenant
1053            .metadata
1054            .insert("tenant_id".to_string(), "tenant-b".to_string());
1055
1056        jobs.enqueue(first).expect("first tenant-a job should pass");
1057        let denied = jobs
1058            .enqueue(second)
1059            .expect_err("second tenant-a job should be denied by quota");
1060        assert_eq!(denied.code, "tenant_job_quota_exceeded");
1061
1062        jobs.enqueue(other_tenant)
1063            .expect("tenant-b should keep its own quota window");
1064
1065        let missing_tenant = BackgroundJobRequest::new("sync", json!({"tenant": "?"}), "sync-?");
1066        let missing_tenant_error = jobs
1067            .enqueue(missing_tenant)
1068            .expect_err("missing tenant_id metadata should be denied");
1069        assert_eq!(missing_tenant_error.code, "tenant_context_required");
1070    }
1071
1072    #[test]
1073    fn map_app_service_result_wraps_errors_with_source_context() {
1074        let mapped = map_app_service_result::<()>(
1075            "email",
1076            Err(AppServiceError::new(
1077                "mailer",
1078                AppServiceErrorKind::InvalidInput,
1079                "empty_recipient",
1080                "recipient is required",
1081            )),
1082        )
1083        .unwrap_err()
1084        .to_string();
1085        assert!(mapped.contains("[email]"));
1086        assert!(mapped.contains("empty_recipient"));
1087    }
1088
1089    #[test]
1090    fn identity_service_covers_insert_validation_ttl_floor_and_expiry() {
1091        let service = InMemoryIdentityService::default();
1092        let err = service
1093            .insert_user("u-1", "   ", "pw", None, Vec::new(), Default::default())
1094            .unwrap_err();
1095        assert_eq!(err.code, "empty_email");
1096
1097        let ttl_floor = InMemoryIdentityService::default().with_session_ttl_ms(0);
1098        let session = ttl_floor
1099            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1100            .unwrap();
1101        assert!(session.expires_at_unix_ms >= session.issued_at_unix_ms.saturating_add(1));
1102
1103        let short_lived = InMemoryIdentityService::default().with_session_ttl_ms(1);
1104        let expiring = short_lived
1105            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1106            .unwrap();
1107        thread::sleep(Duration::from_millis(3));
1108        let expired = short_lived.session_identity(&expiring.token).unwrap();
1109        assert!(expired.is_none());
1110    }
1111
1112    #[test]
1113    fn queue_and_email_services_cover_invalid_input_paths() {
1114        let queue = InMemoryQueueBackend::default();
1115        let err = queue.enqueue(" ", json!({"id": 1}), None).unwrap_err();
1116        assert_eq!(err.code, "empty_topic");
1117        assert!(queue.dequeue("missing").unwrap().is_none());
1118        assert_eq!(queue.len("missing").unwrap(), 0);
1119
1120        let mailer = InMemoryTransactionalEmailService::default();
1121        let err = mailer
1122            .send(EmailMessage::new(" ", "Subject", "Body"))
1123            .unwrap_err();
1124        assert_eq!(err.code, "empty_recipient");
1125    }
1126
1127    #[derive(Default)]
1128    struct FailingJobs;
1129
1130    impl BackgroundJobService for FailingJobs {
1131        fn enqueue(
1132            &self,
1133            _request: BackgroundJobRequest,
1134        ) -> AppServiceResult<super::BackgroundJobHandle> {
1135            Err(AppServiceError::new(
1136                "jobs",
1137                AppServiceErrorKind::Unavailable,
1138                "inner_unavailable",
1139                "inner jobs service unavailable",
1140            ))
1141        }
1142
1143        fn status(&self, id: &str) -> AppServiceResult<super::BackgroundJobStatus> {
1144            Err(AppServiceError::new(
1145                "jobs",
1146                AppServiceErrorKind::NotFound,
1147                "missing_job",
1148                format!("missing job {id}"),
1149            ))
1150        }
1151    }
1152
1153    #[test]
1154    fn tenant_quota_rolls_back_slot_when_inner_enqueue_fails() {
1155        let jobs = TenantQuotaBackgroundJobs::new(Arc::new(FailingJobs)).with_quota(
1156            TenantBackgroundJobQuota {
1157                max_jobs_per_window: 1,
1158                window_ms: u64::MAX,
1159                require_tenant_id: true,
1160            },
1161        );
1162
1163        let mut request_one = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-1");
1164        request_one
1165            .metadata
1166            .insert("tenant_id".to_string(), "tenant-a".to_string());
1167        let mut request_two = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-2");
1168        request_two
1169            .metadata
1170            .insert("tenant_id".to_string(), "tenant-a".to_string());
1171
1172        let first = jobs.enqueue(request_one).unwrap_err();
1173        assert_eq!(first.code, "inner_unavailable");
1174
1175        let second = jobs.enqueue(request_two).unwrap_err();
1176        assert_eq!(second.code, "inner_unavailable");
1177
1178        let status = jobs.status("missing").unwrap_err();
1179        assert_eq!(status.code, "missing_job");
1180    }
1181
1182    #[test]
1183    fn app_service_lock_poisoning_paths_are_reported_with_unavailable_codes() {
1184        let users_poisoned = InMemoryIdentityService::default();
1185        let _ = catch_unwind(AssertUnwindSafe(|| {
1186            let _guard = users_poisoned.users.lock().expect("users lock");
1187            panic!("poison users");
1188        }));
1189        let err = users_poisoned
1190            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1191            .unwrap_err();
1192        assert_eq!(err.code, "users_lock_poisoned");
1193
1194        let session_id_poisoned = InMemoryIdentityService::default();
1195        let _ = catch_unwind(AssertUnwindSafe(|| {
1196            let _guard = session_id_poisoned
1197                .next_session_id
1198                .lock()
1199                .expect("next_session_id lock");
1200            panic!("poison next session id");
1201        }));
1202        let err = session_id_poisoned
1203            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1204            .unwrap_err();
1205        assert_eq!(err.code, "session_id_lock_poisoned");
1206
1207        let sessions_poisoned = InMemoryIdentityService::default();
1208        let _ = catch_unwind(AssertUnwindSafe(|| {
1209            let _guard = sessions_poisoned.sessions.lock().expect("sessions lock");
1210            panic!("poison sessions");
1211        }));
1212        let err = sessions_poisoned
1213            .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1214            .unwrap_err();
1215        assert_eq!(err.code, "sessions_lock_poisoned");
1216        let err = sessions_poisoned.session_identity("sess-1").unwrap_err();
1217        assert_eq!(err.code, "sessions_lock_poisoned");
1218        let err = sessions_poisoned.sign_out("sess-1").unwrap_err();
1219        assert_eq!(err.code, "sessions_lock_poisoned");
1220
1221        let mailer_id_poisoned = InMemoryTransactionalEmailService::default();
1222        let _ = catch_unwind(AssertUnwindSafe(|| {
1223            let _guard = mailer_id_poisoned.next_id.lock().expect("email id lock");
1224            panic!("poison email id");
1225        }));
1226        let err = mailer_id_poisoned
1227            .send(EmailMessage::new("ada@example.test", "Subject", "Body"))
1228            .unwrap_err();
1229        assert_eq!(err.code, "email_id_lock_poisoned");
1230
1231        let mailer_log_poisoned = InMemoryTransactionalEmailService::default();
1232        let _ = catch_unwind(AssertUnwindSafe(|| {
1233            let _guard = mailer_log_poisoned.sent.lock().expect("email log lock");
1234            panic!("poison email log");
1235        }));
1236        let err = mailer_log_poisoned
1237            .send(EmailMessage::new("ada@example.test", "Subject", "Body"))
1238            .unwrap_err();
1239        assert_eq!(err.code, "email_log_lock_poisoned");
1240
1241        let queue_id_poisoned = InMemoryQueueBackend::default();
1242        let _ = catch_unwind(AssertUnwindSafe(|| {
1243            let _guard = queue_id_poisoned.next_id.lock().expect("queue id lock");
1244            panic!("poison queue id");
1245        }));
1246        let err = queue_id_poisoned
1247            .enqueue("jobs", json!({"id": 1}), None)
1248            .unwrap_err();
1249        assert_eq!(err.code, "queue_id_lock_poisoned");
1250
1251        let queue_store_poisoned = InMemoryQueueBackend::default();
1252        let _ = catch_unwind(AssertUnwindSafe(|| {
1253            let _guard = queue_store_poisoned
1254                .messages
1255                .lock()
1256                .expect("queue store lock");
1257            panic!("poison queue store");
1258        }));
1259        let err = queue_store_poisoned.len("jobs").unwrap_err();
1260        assert_eq!(err.code, "queue_lock_poisoned");
1261        let err = queue_store_poisoned.dequeue("jobs").unwrap_err();
1262        assert_eq!(err.code, "queue_lock_poisoned");
1263
1264        let cache_poisoned = InMemoryCacheBackend::default();
1265        let _ = catch_unwind(AssertUnwindSafe(|| {
1266            let _guard = cache_poisoned.entries.lock().expect("cache lock");
1267            panic!("poison cache");
1268        }));
1269        let err = cache_poisoned.get("k").unwrap_err();
1270        assert_eq!(err.code, "cache_lock_poisoned");
1271        let err = cache_poisoned
1272            .set("k", CacheEntry::persistent(json!({"v": 1})))
1273            .unwrap_err();
1274        assert_eq!(err.code, "cache_lock_poisoned");
1275        let err = cache_poisoned.delete("k").unwrap_err();
1276        assert_eq!(err.code, "cache_lock_poisoned");
1277    }
1278
1279    #[test]
1280    fn background_job_helper_builders_and_error_mapping_cover_state_matrix() {
1281        let scheduled = BackgroundJobRequest::new("job", json!({"id": 1}), "idempotent")
1282            .schedule_at_unix_ms(1234);
1283        assert_eq!(scheduled.scheduled_at_unix_ms, Some(1234));
1284
1285        assert_eq!(
1286            BackgroundJobState::from(JobState::Queued),
1287            BackgroundJobState::Queued
1288        );
1289        assert_eq!(
1290            BackgroundJobState::from(JobState::Running),
1291            BackgroundJobState::Running
1292        );
1293        assert_eq!(
1294            BackgroundJobState::from(JobState::Succeeded),
1295            BackgroundJobState::Succeeded
1296        );
1297        assert_eq!(
1298            BackgroundJobState::from(JobState::Failed),
1299            BackgroundJobState::Failed
1300        );
1301        assert_eq!(
1302            BackgroundJobState::from(JobState::Canceled),
1303            BackgroundJobState::Canceled
1304        );
1305
1306        let mapped = map_job_error(
1307            IntegrationError::new("trigger", IntegrationErrorKind::InvalidInput, "bad request")
1308                .with_code("invalid_payload"),
1309        );
1310        assert_eq!(mapped.kind, AppServiceErrorKind::InvalidInput);
1311        assert_eq!(mapped.code, "invalid_payload");
1312
1313        let mapped = map_job_error(IntegrationError::new(
1314            "trigger",
1315            IntegrationErrorKind::Auth,
1316            "unauthorized",
1317        ));
1318        assert_eq!(mapped.kind, AppServiceErrorKind::Unauthorized);
1319
1320        for transient_kind in [
1321            IntegrationErrorKind::Unavailable,
1322            IntegrationErrorKind::Timeout,
1323            IntegrationErrorKind::RateLimited,
1324            IntegrationErrorKind::Transient,
1325        ] {
1326            let mapped = map_job_error(IntegrationError::new("trigger", transient_kind, "retry"));
1327            assert_eq!(mapped.kind, AppServiceErrorKind::Unavailable);
1328        }
1329
1330        let mapped = map_job_error(IntegrationError::new(
1331            "trigger",
1332            IntegrationErrorKind::Permanent,
1333            "conflict",
1334        ));
1335        assert_eq!(mapped.kind, AppServiceErrorKind::Conflict);
1336    }
1337}