1use crate::{
2 integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
3 DataError, DataResult,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::{
8 collections::{BTreeMap, VecDeque},
9 fmt,
10 sync::{Arc, Mutex},
11 time::{SystemTime, UNIX_EPOCH},
12};
13use tracing::{info, warn};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum AppServiceErrorKind {
18 InvalidCredentials,
19 Unauthorized,
20 NotFound,
21 Conflict,
22 InvalidInput,
23 Unavailable,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct AppServiceError {
28 pub source: String,
29 pub kind: AppServiceErrorKind,
30 pub code: String,
31 pub message: String,
32}
33
34impl AppServiceError {
35 pub fn new(
36 source: impl Into<String>,
37 kind: AppServiceErrorKind,
38 code: impl Into<String>,
39 message: impl Into<String>,
40 ) -> Self {
41 Self {
42 source: source.into(),
43 kind,
44 code: code.into(),
45 message: message.into(),
46 }
47 }
48}
49
50impl fmt::Display for AppServiceError {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(f, "[{}:{}] {}", self.source, self.code, self.message)
53 }
54}
55
56impl std::error::Error for AppServiceError {}
57
58pub type AppServiceResult<T> = Result<T, AppServiceError>;
59
60pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
61 DataError::Integration(format!("[{}] {}", source.into(), err))
62}
63
64pub fn map_app_service_result<T>(
65 source: impl Into<String>,
66 result: AppServiceResult<T>,
67) -> DataResult<T> {
68 result.map_err(|err| map_app_service_error(source, err))
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct AuthIdentity {
73 pub user_id: String,
74 pub email: String,
75 pub display_name: Option<String>,
76 pub roles: Vec<String>,
77 pub attributes: BTreeMap<String, String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct AuthCredentials {
82 pub identifier: String,
83 pub password: String,
84}
85
86impl AuthCredentials {
87 pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
88 Self {
89 identifier: identifier.into(),
90 password: password.into(),
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct AuthSession {
97 pub token: String,
98 pub identity: AuthIdentity,
99 pub issued_at_unix_ms: u64,
100 pub expires_at_unix_ms: u64,
101}
102
103pub trait IdentityService: Send + Sync {
104 fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
105 fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
106 fn sign_out(&self, token: &str) -> AppServiceResult<()>;
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110struct IdentityRecord {
111 user_id: String,
112 email: String,
113 password: String,
114 display_name: Option<String>,
115 roles: Vec<String>,
116 attributes: BTreeMap<String, String>,
117}
118
119pub struct InMemoryIdentityService {
120 users: Mutex<BTreeMap<String, IdentityRecord>>,
121 sessions: Mutex<BTreeMap<String, AuthSession>>,
122 next_session_id: Mutex<u64>,
123 session_ttl_ms: u64,
124}
125
126impl InMemoryIdentityService {
127 pub fn new() -> Self {
128 let service = Self {
129 users: Mutex::new(BTreeMap::new()),
130 sessions: Mutex::new(BTreeMap::new()),
131 next_session_id: Mutex::new(0),
132 session_ttl_ms: 8 * 60 * 60 * 1000,
133 };
134 service
135 .insert_user(
136 "user-1",
137 "demo@shelly.dev",
138 "shelly-demo",
139 Some("Shelly Demo"),
140 vec!["admin".to_string()],
141 BTreeMap::new(),
142 )
143 .expect("default in-memory auth user must be inserted");
144 service
145 }
146
147 pub fn insert_user(
148 &self,
149 user_id: impl Into<String>,
150 email: impl Into<String>,
151 password: impl Into<String>,
152 display_name: Option<&str>,
153 roles: Vec<String>,
154 attributes: BTreeMap<String, String>,
155 ) -> AppServiceResult<()> {
156 let email = email.into().trim().to_ascii_lowercase();
157 if email.is_empty() {
158 return Err(AppServiceError::new(
159 "auth",
160 AppServiceErrorKind::InvalidInput,
161 "empty_email",
162 "email must not be empty",
163 ));
164 }
165 let mut users = self.users.lock().map_err(|_| {
166 AppServiceError::new(
167 "auth",
168 AppServiceErrorKind::Unavailable,
169 "users_lock_poisoned",
170 "auth users lock poisoned",
171 )
172 })?;
173 users.insert(
174 email.clone(),
175 IdentityRecord {
176 user_id: user_id.into(),
177 email: email.clone(),
178 password: password.into(),
179 display_name: display_name.map(ToString::to_string),
180 roles,
181 attributes,
182 },
183 );
184 info!(
185 target: "shelly.integration.app_service",
186 service = "auth",
187 operation = "insert_user",
188 email,
189 "Shelly app service operation executed"
190 );
191 Ok(())
192 }
193
194 pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
195 self.session_ttl_ms = ttl_ms.max(1);
196 self
197 }
198}
199
200impl Default for InMemoryIdentityService {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl IdentityService for InMemoryIdentityService {
207 fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
208 let identifier = credentials.identifier.trim().to_ascii_lowercase();
209 let users = self.users.lock().map_err(|_| {
210 AppServiceError::new(
211 "auth",
212 AppServiceErrorKind::Unavailable,
213 "users_lock_poisoned",
214 "auth users lock poisoned",
215 )
216 })?;
217 let Some(record) = users.get(&identifier) else {
218 return Err(AppServiceError::new(
219 "auth",
220 AppServiceErrorKind::InvalidCredentials,
221 "invalid_credentials",
222 "invalid credentials",
223 ));
224 };
225 if record.password != credentials.password {
226 return Err(AppServiceError::new(
227 "auth",
228 AppServiceErrorKind::InvalidCredentials,
229 "invalid_credentials",
230 "invalid credentials",
231 ));
232 }
233 let identity = AuthIdentity {
234 user_id: record.user_id.clone(),
235 email: record.email.clone(),
236 display_name: record.display_name.clone(),
237 roles: record.roles.clone(),
238 attributes: record.attributes.clone(),
239 };
240 drop(users);
241
242 let mut next_session_id = self.next_session_id.lock().map_err(|_| {
243 AppServiceError::new(
244 "auth",
245 AppServiceErrorKind::Unavailable,
246 "session_id_lock_poisoned",
247 "session id lock poisoned",
248 )
249 })?;
250 *next_session_id = next_session_id.saturating_add(1);
251 let token = format!("sess-{next_session_id}");
252 let issued_at_unix_ms = now_unix_ms();
253 let session = AuthSession {
254 token: token.clone(),
255 identity,
256 issued_at_unix_ms,
257 expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
258 };
259 self.sessions
260 .lock()
261 .map_err(|_| {
262 AppServiceError::new(
263 "auth",
264 AppServiceErrorKind::Unavailable,
265 "sessions_lock_poisoned",
266 "auth sessions lock poisoned",
267 )
268 })?
269 .insert(token.clone(), session.clone());
270 info!(
271 target: "shelly.integration.app_service",
272 service = "auth",
273 operation = "sign_in",
274 token = token.as_str(),
275 user_id = session.identity.user_id.as_str(),
276 "Shelly app service operation executed"
277 );
278 Ok(session)
279 }
280
281 fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
282 let mut sessions = self.sessions.lock().map_err(|_| {
283 AppServiceError::new(
284 "auth",
285 AppServiceErrorKind::Unavailable,
286 "sessions_lock_poisoned",
287 "auth sessions lock poisoned",
288 )
289 })?;
290 let Some(session) = sessions.get(token).cloned() else {
291 return Ok(None);
292 };
293 if now_unix_ms() >= session.expires_at_unix_ms {
294 sessions.remove(token);
295 warn!(
296 target: "shelly.integration.app_service",
297 service = "auth",
298 operation = "session_identity",
299 token,
300 "Shelly app session expired and was evicted"
301 );
302 return Ok(None);
303 }
304 Ok(Some(session.identity))
305 }
306
307 fn sign_out(&self, token: &str) -> AppServiceResult<()> {
308 self.sessions
309 .lock()
310 .map_err(|_| {
311 AppServiceError::new(
312 "auth",
313 AppServiceErrorKind::Unavailable,
314 "sessions_lock_poisoned",
315 "auth sessions lock poisoned",
316 )
317 })?
318 .remove(token);
319 info!(
320 target: "shelly.integration.app_service",
321 service = "auth",
322 operation = "sign_out",
323 token,
324 "Shelly app service operation executed"
325 );
326 Ok(())
327 }
328}
329
330#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
331pub struct BackgroundJobRequest {
332 pub job: String,
333 pub payload: Value,
334 pub idempotency_key: String,
335 pub scheduled_at_unix_ms: Option<u64>,
336 pub metadata: BTreeMap<String, String>,
337}
338
339impl BackgroundJobRequest {
340 pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
341 Self {
342 job: job.into(),
343 payload,
344 idempotency_key: idempotency_key.into(),
345 scheduled_at_unix_ms: None,
346 metadata: BTreeMap::new(),
347 }
348 }
349
350 pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
351 self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
352 self
353 }
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct BackgroundJobHandle {
358 pub id: String,
359 pub job: String,
360 pub idempotency_key: String,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case")]
365pub enum BackgroundJobState {
366 Queued,
367 Running,
368 Succeeded,
369 Failed,
370 Canceled,
371}
372
373impl From<JobState> for BackgroundJobState {
374 fn from(value: JobState) -> Self {
375 match value {
376 JobState::Queued => Self::Queued,
377 JobState::Running => Self::Running,
378 JobState::Succeeded => Self::Succeeded,
379 JobState::Failed => Self::Failed,
380 JobState::Canceled => Self::Canceled,
381 }
382 }
383}
384
385#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
386pub struct BackgroundJobStatus {
387 pub id: String,
388 pub state: BackgroundJobState,
389 pub attempts: u32,
390 pub result: Option<Value>,
391 pub error: Option<String>,
392}
393
394pub trait BackgroundJobService: Send + Sync {
395 fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
396 fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
400pub struct TenantBackgroundJobQuota {
401 pub max_jobs_per_window: usize,
402 pub window_ms: u64,
403 pub require_tenant_id: bool,
404}
405
406impl Default for TenantBackgroundJobQuota {
407 fn default() -> Self {
408 Self {
409 max_jobs_per_window: 10_000,
410 window_ms: 60_000,
411 require_tenant_id: true,
412 }
413 }
414}
415
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417struct TenantJobWindow {
418 started_at_unix_ms: u64,
419 enqueued: usize,
420}
421
422impl TenantJobWindow {
423 fn new(now_unix_ms: u64) -> Self {
424 Self {
425 started_at_unix_ms: now_unix_ms,
426 enqueued: 0,
427 }
428 }
429
430 fn roll_if_needed(&mut self, now_unix_ms: u64, window_ms: u64) {
431 if now_unix_ms.saturating_sub(self.started_at_unix_ms) >= window_ms {
432 *self = Self::new(now_unix_ms);
433 }
434 }
435}
436
437pub struct TenantQuotaBackgroundJobs<T: BackgroundJobService> {
438 inner: Arc<T>,
439 quota: TenantBackgroundJobQuota,
440 windows: Mutex<BTreeMap<String, TenantJobWindow>>,
441}
442
443impl<T: BackgroundJobService> TenantQuotaBackgroundJobs<T> {
444 pub fn new(inner: Arc<T>) -> Self {
445 Self {
446 inner,
447 quota: TenantBackgroundJobQuota::default(),
448 windows: Mutex::new(BTreeMap::new()),
449 }
450 }
451
452 pub fn with_quota(mut self, quota: TenantBackgroundJobQuota) -> Self {
453 self.quota = quota;
454 self
455 }
456
457 fn reserve_enqueue_slot(&self, tenant_id: &str) -> AppServiceResult<()> {
458 let now = now_unix_ms();
459 let mut windows = self.windows.lock().map_err(|_| {
460 AppServiceError::new(
461 "jobs",
462 AppServiceErrorKind::Unavailable,
463 "tenant_quota_lock_poisoned",
464 "tenant quota lock poisoned",
465 )
466 })?;
467 let window = windows
468 .entry(tenant_id.to_string())
469 .or_insert_with(|| TenantJobWindow::new(now));
470 window.roll_if_needed(now, self.quota.window_ms.max(1));
471 let next = window.enqueued.saturating_add(1);
472 if next > self.quota.max_jobs_per_window.max(1) {
473 return Err(AppServiceError::new(
474 "jobs",
475 AppServiceErrorKind::Conflict,
476 "tenant_job_quota_exceeded",
477 "tenant background job quota exceeded",
478 ));
479 }
480 window.enqueued = next;
481 Ok(())
482 }
483
484 fn rollback_enqueue_slot(&self, tenant_id: &str) {
485 if let Ok(mut windows) = self.windows.lock() {
486 if let Some(window) = windows.get_mut(tenant_id) {
487 window.enqueued = window.enqueued.saturating_sub(1);
488 }
489 }
490 }
491}
492
493impl<T: BackgroundJobService> BackgroundJobService for TenantQuotaBackgroundJobs<T> {
494 fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
495 let tenant_id = request
496 .metadata
497 .get("tenant_id")
498 .map(String::as_str)
499 .map(str::trim)
500 .filter(|value| !value.is_empty())
501 .map(ToString::to_string);
502 if self.quota.require_tenant_id && tenant_id.is_none() {
503 return Err(AppServiceError::new(
504 "jobs",
505 AppServiceErrorKind::InvalidInput,
506 "tenant_context_required",
507 "tenant_id metadata is required for background jobs",
508 ));
509 }
510
511 if let Some(tenant_id) = tenant_id.as_deref() {
512 self.reserve_enqueue_slot(tenant_id)?;
513 }
514 let result = self.inner.enqueue(request);
515 if result.is_err() {
516 if let Some(tenant_id) = tenant_id.as_deref() {
517 self.rollback_enqueue_slot(tenant_id);
518 }
519 }
520 result
521 }
522
523 fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
524 self.inner.status(id)
525 }
526}
527
528#[derive(Clone)]
529pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
530 orchestrator: Arc<T>,
531}
532
533impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
534 pub fn new(orchestrator: Arc<T>) -> Self {
535 Self { orchestrator }
536 }
537}
538
539impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
540 fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
541 let mut job_request =
542 JobRequest::new(request.job, request.payload, request.idempotency_key);
543 job_request.metadata = request.metadata;
544 if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
545 job_request.metadata.insert(
546 "scheduled_at_unix_ms".to_string(),
547 scheduled_at_unix_ms.to_string(),
548 );
549 }
550
551 let handle = self
552 .orchestrator
553 .enqueue(job_request)
554 .map_err(map_job_error)?;
555 Ok(BackgroundJobHandle {
556 id: handle.id,
557 job: handle.workflow,
558 idempotency_key: handle.idempotency_key,
559 })
560 }
561
562 fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
563 let status = self.orchestrator.status(id).map_err(map_job_error)?;
564 Ok(BackgroundJobStatus {
565 id: status.id,
566 state: BackgroundJobState::from(status.state),
567 attempts: status.attempts,
568 result: status.result,
569 error: status.error.map(|err| err.to_string()),
570 })
571 }
572}
573
574#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
575pub struct EmailMessage {
576 pub to: String,
577 pub subject: String,
578 pub text_body: String,
579 pub html_body: Option<String>,
580 pub headers: BTreeMap<String, String>,
581}
582
583impl EmailMessage {
584 pub fn new(
585 to: impl Into<String>,
586 subject: impl Into<String>,
587 text_body: impl Into<String>,
588 ) -> Self {
589 Self {
590 to: to.into(),
591 subject: subject.into(),
592 text_body: text_body.into(),
593 html_body: None,
594 headers: BTreeMap::new(),
595 }
596 }
597}
598
599#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
600pub struct EmailReceipt {
601 pub message_id: String,
602 pub provider: String,
603 pub accepted_at_unix_ms: u64,
604}
605
606pub trait TransactionalEmailService: Send + Sync {
607 fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
608}
609
610#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
611pub struct SentEmail {
612 pub message: EmailMessage,
613 pub receipt: EmailReceipt,
614}
615
616pub struct InMemoryTransactionalEmailService {
617 provider: String,
618 next_id: Mutex<u64>,
619 sent: Mutex<Vec<SentEmail>>,
620}
621
622impl InMemoryTransactionalEmailService {
623 pub fn new(provider: impl Into<String>) -> Self {
624 Self {
625 provider: provider.into(),
626 next_id: Mutex::new(0),
627 sent: Mutex::new(Vec::new()),
628 }
629 }
630
631 pub fn sent(&self) -> Vec<SentEmail> {
632 self.sent
633 .lock()
634 .map(|entries| entries.clone())
635 .unwrap_or_default()
636 }
637}
638
639impl Default for InMemoryTransactionalEmailService {
640 fn default() -> Self {
641 Self::new("in-memory-mailer")
642 }
643}
644
645impl TransactionalEmailService for InMemoryTransactionalEmailService {
646 fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
647 if message.to.trim().is_empty() {
648 return Err(AppServiceError::new(
649 "email",
650 AppServiceErrorKind::InvalidInput,
651 "empty_recipient",
652 "email recipient must not be empty",
653 ));
654 }
655 let mut next_id = self.next_id.lock().map_err(|_| {
656 AppServiceError::new(
657 "email",
658 AppServiceErrorKind::Unavailable,
659 "email_id_lock_poisoned",
660 "email id lock poisoned",
661 )
662 })?;
663 *next_id = next_id.saturating_add(1);
664 let message_id = format!("mail-{next_id}");
665 let receipt = EmailReceipt {
666 message_id: message_id.clone(),
667 provider: self.provider.clone(),
668 accepted_at_unix_ms: now_unix_ms(),
669 };
670 self.sent
671 .lock()
672 .map_err(|_| {
673 AppServiceError::new(
674 "email",
675 AppServiceErrorKind::Unavailable,
676 "email_log_lock_poisoned",
677 "email log lock poisoned",
678 )
679 })?
680 .push(SentEmail {
681 message: message.clone(),
682 receipt: receipt.clone(),
683 });
684 info!(
685 target: "shelly.integration.app_service",
686 service = "email",
687 operation = "send",
688 message_id = message_id.as_str(),
689 recipient = message.to.as_str(),
690 "Shelly app service operation executed"
691 );
692 Ok(receipt)
693 }
694}
695
696#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
697pub struct CacheEntry {
698 pub value: Value,
699 pub expires_at_unix_ms: Option<u64>,
700}
701
702impl CacheEntry {
703 pub fn persistent(value: Value) -> Self {
704 Self {
705 value,
706 expires_at_unix_ms: None,
707 }
708 }
709
710 pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
711 Self {
712 value,
713 expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
714 }
715 }
716}
717
718pub trait CacheBackend: Send + Sync {
719 fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
720 fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
721 fn delete(&self, key: &str) -> AppServiceResult<()>;
722}
723
724#[derive(Default)]
725pub struct InMemoryCacheBackend {
726 entries: Mutex<BTreeMap<String, CacheEntry>>,
727}
728
729impl CacheBackend for InMemoryCacheBackend {
730 fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
731 let mut entries = self.entries.lock().map_err(|_| {
732 AppServiceError::new(
733 "cache",
734 AppServiceErrorKind::Unavailable,
735 "cache_lock_poisoned",
736 "cache lock poisoned",
737 )
738 })?;
739 let Some(entry) = entries.get(key).cloned() else {
740 return Ok(None);
741 };
742 if entry
743 .expires_at_unix_ms
744 .is_some_and(|expires_at| now_unix_ms() >= expires_at)
745 {
746 entries.remove(key);
747 return Ok(None);
748 }
749 Ok(Some(entry.value))
750 }
751
752 fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
753 self.entries
754 .lock()
755 .map_err(|_| {
756 AppServiceError::new(
757 "cache",
758 AppServiceErrorKind::Unavailable,
759 "cache_lock_poisoned",
760 "cache lock poisoned",
761 )
762 })?
763 .insert(key.to_string(), entry);
764 Ok(())
765 }
766
767 fn delete(&self, key: &str) -> AppServiceResult<()> {
768 self.entries
769 .lock()
770 .map_err(|_| {
771 AppServiceError::new(
772 "cache",
773 AppServiceErrorKind::Unavailable,
774 "cache_lock_poisoned",
775 "cache lock poisoned",
776 )
777 })?
778 .remove(key);
779 Ok(())
780 }
781}
782
783#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
784pub struct QueueMessage {
785 pub id: String,
786 pub topic: String,
787 pub payload: Value,
788 pub attempts: u32,
789 pub enqueued_at_unix_ms: u64,
790 pub scheduled_at_unix_ms: Option<u64>,
791}
792
793pub trait QueueBackend: Send + Sync {
794 fn enqueue(
795 &self,
796 topic: &str,
797 payload: Value,
798 scheduled_at_unix_ms: Option<u64>,
799 ) -> AppServiceResult<QueueMessage>;
800 fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
801 fn len(&self, topic: &str) -> AppServiceResult<usize>;
802}
803
804#[derive(Default)]
805pub struct InMemoryQueueBackend {
806 messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
807 next_id: Mutex<u64>,
808}
809
810impl QueueBackend for InMemoryQueueBackend {
811 fn enqueue(
812 &self,
813 topic: &str,
814 payload: Value,
815 scheduled_at_unix_ms: Option<u64>,
816 ) -> AppServiceResult<QueueMessage> {
817 if topic.trim().is_empty() {
818 return Err(AppServiceError::new(
819 "queue",
820 AppServiceErrorKind::InvalidInput,
821 "empty_topic",
822 "queue topic must not be empty",
823 ));
824 }
825 let mut next_id = self.next_id.lock().map_err(|_| {
826 AppServiceError::new(
827 "queue",
828 AppServiceErrorKind::Unavailable,
829 "queue_id_lock_poisoned",
830 "queue id lock poisoned",
831 )
832 })?;
833 *next_id = next_id.saturating_add(1);
834 let message = QueueMessage {
835 id: format!("queue-{next_id}"),
836 topic: topic.to_string(),
837 payload,
838 attempts: 0,
839 enqueued_at_unix_ms: now_unix_ms(),
840 scheduled_at_unix_ms,
841 };
842 self.messages
843 .lock()
844 .map_err(|_| {
845 AppServiceError::new(
846 "queue",
847 AppServiceErrorKind::Unavailable,
848 "queue_lock_poisoned",
849 "queue lock poisoned",
850 )
851 })?
852 .entry(topic.to_string())
853 .or_default()
854 .push_back(message.clone());
855 Ok(message)
856 }
857
858 fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
859 let mut queues = self.messages.lock().map_err(|_| {
860 AppServiceError::new(
861 "queue",
862 AppServiceErrorKind::Unavailable,
863 "queue_lock_poisoned",
864 "queue lock poisoned",
865 )
866 })?;
867 let Some(queue) = queues.get_mut(topic) else {
868 return Ok(None);
869 };
870 Ok(queue.pop_front().map(|mut message| {
871 message.attempts = message.attempts.saturating_add(1);
872 message
873 }))
874 }
875
876 fn len(&self, topic: &str) -> AppServiceResult<usize> {
877 let queues = self.messages.lock().map_err(|_| {
878 AppServiceError::new(
879 "queue",
880 AppServiceErrorKind::Unavailable,
881 "queue_lock_poisoned",
882 "queue lock poisoned",
883 )
884 })?;
885 Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
886 }
887}
888
889fn map_job_error(err: IntegrationError) -> AppServiceError {
890 let kind = match err.kind {
891 IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
892 IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
893 IntegrationErrorKind::Unavailable
894 | IntegrationErrorKind::Timeout
895 | IntegrationErrorKind::RateLimited
896 | IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
897 IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
898 };
899 AppServiceError::new(
900 "jobs",
901 kind,
902 err.code.unwrap_or_else(|| "job_error".to_string()),
903 err.message,
904 )
905}
906
907fn now_unix_ms() -> u64 {
908 SystemTime::now()
909 .duration_since(UNIX_EPOCH)
910 .unwrap_or_default()
911 .as_millis() as u64
912}
913
914#[cfg(test)]
915mod tests {
916 use super::{
917 map_app_service_result, map_job_error, AppServiceError, AppServiceErrorKind,
918 AppServiceResult, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
919 BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
920 InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
921 InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
922 TenantBackgroundJobQuota, TenantQuotaBackgroundJobs, TransactionalEmailService,
923 };
924 use crate::integrations::{
925 InMemoryJobOrchestrator, IntegrationError, IntegrationErrorKind, JobState,
926 };
927 use serde_json::json;
928 use std::{
929 panic::{catch_unwind, AssertUnwindSafe},
930 sync::Arc,
931 thread,
932 time::Duration,
933 };
934
935 #[test]
936 fn in_memory_identity_service_signs_in_and_signs_out() {
937 let service = InMemoryIdentityService::default();
938 let session = service
939 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
940 .unwrap();
941
942 let identity = service.session_identity(&session.token).unwrap().unwrap();
943 assert_eq!(identity.email, "demo@shelly.dev");
944 assert!(identity.roles.contains(&"admin".to_string()));
945
946 service.sign_out(&session.token).unwrap();
947 let missing = service.session_identity(&session.token).unwrap();
948 assert!(missing.is_none());
949 }
950
951 #[test]
952 fn in_memory_identity_service_rejects_invalid_credentials() {
953 let service = InMemoryIdentityService::default();
954 let err = service
955 .sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
956 .unwrap_err();
957 assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
958 }
959
960 #[test]
961 fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
962 let cache = InMemoryCacheBackend::default();
963 cache
964 .set(
965 "sessions:1",
966 CacheEntry::persistent(json!({"active": true})),
967 )
968 .unwrap();
969 let value = cache.get("sessions:1").unwrap();
970 assert_eq!(value, Some(json!({"active": true})));
971
972 cache.delete("sessions:1").unwrap();
973 let cleared = cache.get("sessions:1").unwrap();
974 assert!(cleared.is_none());
975
976 cache
977 .set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
978 .unwrap();
979 let expired = cache.get("temp").unwrap();
980 assert!(expired.is_none());
981 }
982
983 #[test]
984 fn in_memory_queue_backend_is_fifo_per_topic() {
985 let queue = InMemoryQueueBackend::default();
986 queue.enqueue("emails", json!({"id": 1}), None).unwrap();
987 queue.enqueue("emails", json!({"id": 2}), None).unwrap();
988 assert_eq!(queue.len("emails").unwrap(), 2);
989
990 let first = queue.dequeue("emails").unwrap().unwrap();
991 let second = queue.dequeue("emails").unwrap().unwrap();
992 assert_eq!(first.payload, json!({"id": 1}));
993 assert_eq!(second.payload, json!({"id": 2}));
994 assert_eq!(queue.len("emails").unwrap(), 0);
995 }
996
997 #[test]
998 fn transactional_email_service_records_sent_messages() {
999 let mailer = InMemoryTransactionalEmailService::default();
1000 let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
1001 let receipt = mailer.send(message.clone()).unwrap();
1002 assert_eq!(receipt.provider, "in-memory-mailer");
1003
1004 let sent = mailer.sent();
1005 assert_eq!(sent.len(), 1);
1006 assert_eq!(sent[0].message, message);
1007 assert_eq!(sent[0].receipt.message_id, receipt.message_id);
1008 }
1009
1010 #[test]
1011 fn background_job_service_wraps_job_orchestrator_contract() {
1012 let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
1013 let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
1014 let handle = jobs
1015 .enqueue(BackgroundJobRequest::new(
1016 "send_digest",
1017 json!({"account_id": 7}),
1018 "send_digest:7",
1019 ))
1020 .unwrap();
1021 let queued = jobs.status(&handle.id).unwrap();
1022 assert_eq!(queued.state, BackgroundJobState::Queued);
1023
1024 orchestrator
1025 .mark_succeeded(&handle.id, json!({"emails_sent": 3}))
1026 .unwrap();
1027 let done = jobs.status(&handle.id).unwrap();
1028 assert_eq!(done.state, BackgroundJobState::Succeeded);
1029 assert_eq!(done.result, Some(json!({"emails_sent": 3})));
1030 }
1031
1032 #[test]
1033 fn tenant_quota_background_jobs_enforce_per_tenant_limits() {
1034 let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
1035 let base = Arc::new(JobOrchestratorBackgroundJobs::new(orchestrator));
1036 let jobs = TenantQuotaBackgroundJobs::new(base).with_quota(TenantBackgroundJobQuota {
1037 max_jobs_per_window: 1,
1038 window_ms: u64::MAX,
1039 require_tenant_id: true,
1040 });
1041
1042 let mut first = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-1");
1043 first
1044 .metadata
1045 .insert("tenant_id".to_string(), "tenant-a".to_string());
1046 let mut second = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-2");
1047 second
1048 .metadata
1049 .insert("tenant_id".to_string(), "tenant-a".to_string());
1050 let mut other_tenant =
1051 BackgroundJobRequest::new("sync", json!({"tenant": "b"}), "sync-b-1");
1052 other_tenant
1053 .metadata
1054 .insert("tenant_id".to_string(), "tenant-b".to_string());
1055
1056 jobs.enqueue(first).expect("first tenant-a job should pass");
1057 let denied = jobs
1058 .enqueue(second)
1059 .expect_err("second tenant-a job should be denied by quota");
1060 assert_eq!(denied.code, "tenant_job_quota_exceeded");
1061
1062 jobs.enqueue(other_tenant)
1063 .expect("tenant-b should keep its own quota window");
1064
1065 let missing_tenant = BackgroundJobRequest::new("sync", json!({"tenant": "?"}), "sync-?");
1066 let missing_tenant_error = jobs
1067 .enqueue(missing_tenant)
1068 .expect_err("missing tenant_id metadata should be denied");
1069 assert_eq!(missing_tenant_error.code, "tenant_context_required");
1070 }
1071
1072 #[test]
1073 fn map_app_service_result_wraps_errors_with_source_context() {
1074 let mapped = map_app_service_result::<()>(
1075 "email",
1076 Err(AppServiceError::new(
1077 "mailer",
1078 AppServiceErrorKind::InvalidInput,
1079 "empty_recipient",
1080 "recipient is required",
1081 )),
1082 )
1083 .unwrap_err()
1084 .to_string();
1085 assert!(mapped.contains("[email]"));
1086 assert!(mapped.contains("empty_recipient"));
1087 }
1088
1089 #[test]
1090 fn identity_service_covers_insert_validation_ttl_floor_and_expiry() {
1091 let service = InMemoryIdentityService::default();
1092 let err = service
1093 .insert_user("u-1", " ", "pw", None, Vec::new(), Default::default())
1094 .unwrap_err();
1095 assert_eq!(err.code, "empty_email");
1096
1097 let ttl_floor = InMemoryIdentityService::default().with_session_ttl_ms(0);
1098 let session = ttl_floor
1099 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1100 .unwrap();
1101 assert!(session.expires_at_unix_ms >= session.issued_at_unix_ms.saturating_add(1));
1102
1103 let short_lived = InMemoryIdentityService::default().with_session_ttl_ms(1);
1104 let expiring = short_lived
1105 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1106 .unwrap();
1107 thread::sleep(Duration::from_millis(3));
1108 let expired = short_lived.session_identity(&expiring.token).unwrap();
1109 assert!(expired.is_none());
1110 }
1111
1112 #[test]
1113 fn queue_and_email_services_cover_invalid_input_paths() {
1114 let queue = InMemoryQueueBackend::default();
1115 let err = queue.enqueue(" ", json!({"id": 1}), None).unwrap_err();
1116 assert_eq!(err.code, "empty_topic");
1117 assert!(queue.dequeue("missing").unwrap().is_none());
1118 assert_eq!(queue.len("missing").unwrap(), 0);
1119
1120 let mailer = InMemoryTransactionalEmailService::default();
1121 let err = mailer
1122 .send(EmailMessage::new(" ", "Subject", "Body"))
1123 .unwrap_err();
1124 assert_eq!(err.code, "empty_recipient");
1125 }
1126
1127 #[derive(Default)]
1128 struct FailingJobs;
1129
1130 impl BackgroundJobService for FailingJobs {
1131 fn enqueue(
1132 &self,
1133 _request: BackgroundJobRequest,
1134 ) -> AppServiceResult<super::BackgroundJobHandle> {
1135 Err(AppServiceError::new(
1136 "jobs",
1137 AppServiceErrorKind::Unavailable,
1138 "inner_unavailable",
1139 "inner jobs service unavailable",
1140 ))
1141 }
1142
1143 fn status(&self, id: &str) -> AppServiceResult<super::BackgroundJobStatus> {
1144 Err(AppServiceError::new(
1145 "jobs",
1146 AppServiceErrorKind::NotFound,
1147 "missing_job",
1148 format!("missing job {id}"),
1149 ))
1150 }
1151 }
1152
1153 #[test]
1154 fn tenant_quota_rolls_back_slot_when_inner_enqueue_fails() {
1155 let jobs = TenantQuotaBackgroundJobs::new(Arc::new(FailingJobs)).with_quota(
1156 TenantBackgroundJobQuota {
1157 max_jobs_per_window: 1,
1158 window_ms: u64::MAX,
1159 require_tenant_id: true,
1160 },
1161 );
1162
1163 let mut request_one = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-1");
1164 request_one
1165 .metadata
1166 .insert("tenant_id".to_string(), "tenant-a".to_string());
1167 let mut request_two = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-2");
1168 request_two
1169 .metadata
1170 .insert("tenant_id".to_string(), "tenant-a".to_string());
1171
1172 let first = jobs.enqueue(request_one).unwrap_err();
1173 assert_eq!(first.code, "inner_unavailable");
1174
1175 let second = jobs.enqueue(request_two).unwrap_err();
1176 assert_eq!(second.code, "inner_unavailable");
1177
1178 let status = jobs.status("missing").unwrap_err();
1179 assert_eq!(status.code, "missing_job");
1180 }
1181
1182 #[test]
1183 fn app_service_lock_poisoning_paths_are_reported_with_unavailable_codes() {
1184 let users_poisoned = InMemoryIdentityService::default();
1185 let _ = catch_unwind(AssertUnwindSafe(|| {
1186 let _guard = users_poisoned.users.lock().expect("users lock");
1187 panic!("poison users");
1188 }));
1189 let err = users_poisoned
1190 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1191 .unwrap_err();
1192 assert_eq!(err.code, "users_lock_poisoned");
1193
1194 let session_id_poisoned = InMemoryIdentityService::default();
1195 let _ = catch_unwind(AssertUnwindSafe(|| {
1196 let _guard = session_id_poisoned
1197 .next_session_id
1198 .lock()
1199 .expect("next_session_id lock");
1200 panic!("poison next session id");
1201 }));
1202 let err = session_id_poisoned
1203 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1204 .unwrap_err();
1205 assert_eq!(err.code, "session_id_lock_poisoned");
1206
1207 let sessions_poisoned = InMemoryIdentityService::default();
1208 let _ = catch_unwind(AssertUnwindSafe(|| {
1209 let _guard = sessions_poisoned.sessions.lock().expect("sessions lock");
1210 panic!("poison sessions");
1211 }));
1212 let err = sessions_poisoned
1213 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
1214 .unwrap_err();
1215 assert_eq!(err.code, "sessions_lock_poisoned");
1216 let err = sessions_poisoned.session_identity("sess-1").unwrap_err();
1217 assert_eq!(err.code, "sessions_lock_poisoned");
1218 let err = sessions_poisoned.sign_out("sess-1").unwrap_err();
1219 assert_eq!(err.code, "sessions_lock_poisoned");
1220
1221 let mailer_id_poisoned = InMemoryTransactionalEmailService::default();
1222 let _ = catch_unwind(AssertUnwindSafe(|| {
1223 let _guard = mailer_id_poisoned.next_id.lock().expect("email id lock");
1224 panic!("poison email id");
1225 }));
1226 let err = mailer_id_poisoned
1227 .send(EmailMessage::new("ada@example.test", "Subject", "Body"))
1228 .unwrap_err();
1229 assert_eq!(err.code, "email_id_lock_poisoned");
1230
1231 let mailer_log_poisoned = InMemoryTransactionalEmailService::default();
1232 let _ = catch_unwind(AssertUnwindSafe(|| {
1233 let _guard = mailer_log_poisoned.sent.lock().expect("email log lock");
1234 panic!("poison email log");
1235 }));
1236 let err = mailer_log_poisoned
1237 .send(EmailMessage::new("ada@example.test", "Subject", "Body"))
1238 .unwrap_err();
1239 assert_eq!(err.code, "email_log_lock_poisoned");
1240
1241 let queue_id_poisoned = InMemoryQueueBackend::default();
1242 let _ = catch_unwind(AssertUnwindSafe(|| {
1243 let _guard = queue_id_poisoned.next_id.lock().expect("queue id lock");
1244 panic!("poison queue id");
1245 }));
1246 let err = queue_id_poisoned
1247 .enqueue("jobs", json!({"id": 1}), None)
1248 .unwrap_err();
1249 assert_eq!(err.code, "queue_id_lock_poisoned");
1250
1251 let queue_store_poisoned = InMemoryQueueBackend::default();
1252 let _ = catch_unwind(AssertUnwindSafe(|| {
1253 let _guard = queue_store_poisoned
1254 .messages
1255 .lock()
1256 .expect("queue store lock");
1257 panic!("poison queue store");
1258 }));
1259 let err = queue_store_poisoned.len("jobs").unwrap_err();
1260 assert_eq!(err.code, "queue_lock_poisoned");
1261 let err = queue_store_poisoned.dequeue("jobs").unwrap_err();
1262 assert_eq!(err.code, "queue_lock_poisoned");
1263
1264 let cache_poisoned = InMemoryCacheBackend::default();
1265 let _ = catch_unwind(AssertUnwindSafe(|| {
1266 let _guard = cache_poisoned.entries.lock().expect("cache lock");
1267 panic!("poison cache");
1268 }));
1269 let err = cache_poisoned.get("k").unwrap_err();
1270 assert_eq!(err.code, "cache_lock_poisoned");
1271 let err = cache_poisoned
1272 .set("k", CacheEntry::persistent(json!({"v": 1})))
1273 .unwrap_err();
1274 assert_eq!(err.code, "cache_lock_poisoned");
1275 let err = cache_poisoned.delete("k").unwrap_err();
1276 assert_eq!(err.code, "cache_lock_poisoned");
1277 }
1278
1279 #[test]
1280 fn background_job_helper_builders_and_error_mapping_cover_state_matrix() {
1281 let scheduled = BackgroundJobRequest::new("job", json!({"id": 1}), "idempotent")
1282 .schedule_at_unix_ms(1234);
1283 assert_eq!(scheduled.scheduled_at_unix_ms, Some(1234));
1284
1285 assert_eq!(
1286 BackgroundJobState::from(JobState::Queued),
1287 BackgroundJobState::Queued
1288 );
1289 assert_eq!(
1290 BackgroundJobState::from(JobState::Running),
1291 BackgroundJobState::Running
1292 );
1293 assert_eq!(
1294 BackgroundJobState::from(JobState::Succeeded),
1295 BackgroundJobState::Succeeded
1296 );
1297 assert_eq!(
1298 BackgroundJobState::from(JobState::Failed),
1299 BackgroundJobState::Failed
1300 );
1301 assert_eq!(
1302 BackgroundJobState::from(JobState::Canceled),
1303 BackgroundJobState::Canceled
1304 );
1305
1306 let mapped = map_job_error(
1307 IntegrationError::new("trigger", IntegrationErrorKind::InvalidInput, "bad request")
1308 .with_code("invalid_payload"),
1309 );
1310 assert_eq!(mapped.kind, AppServiceErrorKind::InvalidInput);
1311 assert_eq!(mapped.code, "invalid_payload");
1312
1313 let mapped = map_job_error(IntegrationError::new(
1314 "trigger",
1315 IntegrationErrorKind::Auth,
1316 "unauthorized",
1317 ));
1318 assert_eq!(mapped.kind, AppServiceErrorKind::Unauthorized);
1319
1320 for transient_kind in [
1321 IntegrationErrorKind::Unavailable,
1322 IntegrationErrorKind::Timeout,
1323 IntegrationErrorKind::RateLimited,
1324 IntegrationErrorKind::Transient,
1325 ] {
1326 let mapped = map_job_error(IntegrationError::new("trigger", transient_kind, "retry"));
1327 assert_eq!(mapped.kind, AppServiceErrorKind::Unavailable);
1328 }
1329
1330 let mapped = map_job_error(IntegrationError::new(
1331 "trigger",
1332 IntegrationErrorKind::Permanent,
1333 "conflict",
1334 ));
1335 assert_eq!(mapped.kind, AppServiceErrorKind::Conflict);
1336 }
1337}