1use crate::{
2 integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
3 DataError, DataResult,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::{
8 collections::{BTreeMap, VecDeque},
9 fmt,
10 sync::{Arc, Mutex},
11 time::{SystemTime, UNIX_EPOCH},
12};
13use tracing::{info, warn};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum AppServiceErrorKind {
18 InvalidCredentials,
19 Unauthorized,
20 NotFound,
21 Conflict,
22 InvalidInput,
23 Unavailable,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct AppServiceError {
28 pub source: String,
29 pub kind: AppServiceErrorKind,
30 pub code: String,
31 pub message: String,
32}
33
34impl AppServiceError {
35 pub fn new(
36 source: impl Into<String>,
37 kind: AppServiceErrorKind,
38 code: impl Into<String>,
39 message: impl Into<String>,
40 ) -> Self {
41 Self {
42 source: source.into(),
43 kind,
44 code: code.into(),
45 message: message.into(),
46 }
47 }
48}
49
50impl fmt::Display for AppServiceError {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(f, "[{}:{}] {}", self.source, self.code, self.message)
53 }
54}
55
56impl std::error::Error for AppServiceError {}
57
58pub type AppServiceResult<T> = Result<T, AppServiceError>;
59
60pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
61 DataError::Integration(format!("[{}] {}", source.into(), err))
62}
63
64pub fn map_app_service_result<T>(
65 source: impl Into<String>,
66 result: AppServiceResult<T>,
67) -> DataResult<T> {
68 result.map_err(|err| map_app_service_error(source, err))
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct AuthIdentity {
73 pub user_id: String,
74 pub email: String,
75 pub display_name: Option<String>,
76 pub roles: Vec<String>,
77 pub attributes: BTreeMap<String, String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct AuthCredentials {
82 pub identifier: String,
83 pub password: String,
84}
85
86impl AuthCredentials {
87 pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
88 Self {
89 identifier: identifier.into(),
90 password: password.into(),
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct AuthSession {
97 pub token: String,
98 pub identity: AuthIdentity,
99 pub issued_at_unix_ms: u64,
100 pub expires_at_unix_ms: u64,
101}
102
103pub trait IdentityService: Send + Sync {
104 fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
105 fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
106 fn sign_out(&self, token: &str) -> AppServiceResult<()>;
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110struct IdentityRecord {
111 user_id: String,
112 email: String,
113 password: String,
114 display_name: Option<String>,
115 roles: Vec<String>,
116 attributes: BTreeMap<String, String>,
117}
118
119pub struct InMemoryIdentityService {
120 users: Mutex<BTreeMap<String, IdentityRecord>>,
121 sessions: Mutex<BTreeMap<String, AuthSession>>,
122 next_session_id: Mutex<u64>,
123 session_ttl_ms: u64,
124}
125
126impl InMemoryIdentityService {
127 pub fn new() -> Self {
128 let service = Self {
129 users: Mutex::new(BTreeMap::new()),
130 sessions: Mutex::new(BTreeMap::new()),
131 next_session_id: Mutex::new(0),
132 session_ttl_ms: 8 * 60 * 60 * 1000,
133 };
134 service
135 .insert_user(
136 "user-1",
137 "demo@shelly.dev",
138 "shelly-demo",
139 Some("Shelly Demo"),
140 vec!["admin".to_string()],
141 BTreeMap::new(),
142 )
143 .expect("default in-memory auth user must be inserted");
144 service
145 }
146
147 pub fn insert_user(
148 &self,
149 user_id: impl Into<String>,
150 email: impl Into<String>,
151 password: impl Into<String>,
152 display_name: Option<&str>,
153 roles: Vec<String>,
154 attributes: BTreeMap<String, String>,
155 ) -> AppServiceResult<()> {
156 let email = email.into().trim().to_ascii_lowercase();
157 if email.is_empty() {
158 return Err(AppServiceError::new(
159 "auth",
160 AppServiceErrorKind::InvalidInput,
161 "empty_email",
162 "email must not be empty",
163 ));
164 }
165 let mut users = self.users.lock().map_err(|_| {
166 AppServiceError::new(
167 "auth",
168 AppServiceErrorKind::Unavailable,
169 "users_lock_poisoned",
170 "auth users lock poisoned",
171 )
172 })?;
173 users.insert(
174 email.clone(),
175 IdentityRecord {
176 user_id: user_id.into(),
177 email: email.clone(),
178 password: password.into(),
179 display_name: display_name.map(ToString::to_string),
180 roles,
181 attributes,
182 },
183 );
184 info!(
185 target: "shelly.integration.app_service",
186 service = "auth",
187 operation = "insert_user",
188 email,
189 "Shelly app service operation executed"
190 );
191 Ok(())
192 }
193
194 pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
195 self.session_ttl_ms = ttl_ms.max(1);
196 self
197 }
198}
199
200impl Default for InMemoryIdentityService {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl IdentityService for InMemoryIdentityService {
207 fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
208 let identifier = credentials.identifier.trim().to_ascii_lowercase();
209 let users = self.users.lock().map_err(|_| {
210 AppServiceError::new(
211 "auth",
212 AppServiceErrorKind::Unavailable,
213 "users_lock_poisoned",
214 "auth users lock poisoned",
215 )
216 })?;
217 let Some(record) = users.get(&identifier) else {
218 return Err(AppServiceError::new(
219 "auth",
220 AppServiceErrorKind::InvalidCredentials,
221 "invalid_credentials",
222 "invalid credentials",
223 ));
224 };
225 if record.password != credentials.password {
226 return Err(AppServiceError::new(
227 "auth",
228 AppServiceErrorKind::InvalidCredentials,
229 "invalid_credentials",
230 "invalid credentials",
231 ));
232 }
233 let identity = AuthIdentity {
234 user_id: record.user_id.clone(),
235 email: record.email.clone(),
236 display_name: record.display_name.clone(),
237 roles: record.roles.clone(),
238 attributes: record.attributes.clone(),
239 };
240 drop(users);
241
242 let mut next_session_id = self.next_session_id.lock().map_err(|_| {
243 AppServiceError::new(
244 "auth",
245 AppServiceErrorKind::Unavailable,
246 "session_id_lock_poisoned",
247 "session id lock poisoned",
248 )
249 })?;
250 *next_session_id = next_session_id.saturating_add(1);
251 let token = format!("sess-{next_session_id}");
252 let issued_at_unix_ms = now_unix_ms();
253 let session = AuthSession {
254 token: token.clone(),
255 identity,
256 issued_at_unix_ms,
257 expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
258 };
259 self.sessions
260 .lock()
261 .map_err(|_| {
262 AppServiceError::new(
263 "auth",
264 AppServiceErrorKind::Unavailable,
265 "sessions_lock_poisoned",
266 "auth sessions lock poisoned",
267 )
268 })?
269 .insert(token.clone(), session.clone());
270 info!(
271 target: "shelly.integration.app_service",
272 service = "auth",
273 operation = "sign_in",
274 token = token.as_str(),
275 user_id = session.identity.user_id.as_str(),
276 "Shelly app service operation executed"
277 );
278 Ok(session)
279 }
280
281 fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
282 let mut sessions = self.sessions.lock().map_err(|_| {
283 AppServiceError::new(
284 "auth",
285 AppServiceErrorKind::Unavailable,
286 "sessions_lock_poisoned",
287 "auth sessions lock poisoned",
288 )
289 })?;
290 let Some(session) = sessions.get(token).cloned() else {
291 return Ok(None);
292 };
293 if now_unix_ms() >= session.expires_at_unix_ms {
294 sessions.remove(token);
295 warn!(
296 target: "shelly.integration.app_service",
297 service = "auth",
298 operation = "session_identity",
299 token,
300 "Shelly app session expired and was evicted"
301 );
302 return Ok(None);
303 }
304 Ok(Some(session.identity))
305 }
306
307 fn sign_out(&self, token: &str) -> AppServiceResult<()> {
308 self.sessions
309 .lock()
310 .map_err(|_| {
311 AppServiceError::new(
312 "auth",
313 AppServiceErrorKind::Unavailable,
314 "sessions_lock_poisoned",
315 "auth sessions lock poisoned",
316 )
317 })?
318 .remove(token);
319 info!(
320 target: "shelly.integration.app_service",
321 service = "auth",
322 operation = "sign_out",
323 token,
324 "Shelly app service operation executed"
325 );
326 Ok(())
327 }
328}
329
330#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
331pub struct BackgroundJobRequest {
332 pub job: String,
333 pub payload: Value,
334 pub idempotency_key: String,
335 pub scheduled_at_unix_ms: Option<u64>,
336 pub metadata: BTreeMap<String, String>,
337}
338
339impl BackgroundJobRequest {
340 pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
341 Self {
342 job: job.into(),
343 payload,
344 idempotency_key: idempotency_key.into(),
345 scheduled_at_unix_ms: None,
346 metadata: BTreeMap::new(),
347 }
348 }
349
350 pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
351 self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
352 self
353 }
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
357pub struct BackgroundJobHandle {
358 pub id: String,
359 pub job: String,
360 pub idempotency_key: String,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case")]
365pub enum BackgroundJobState {
366 Queued,
367 Running,
368 Succeeded,
369 Failed,
370 Canceled,
371}
372
373impl From<JobState> for BackgroundJobState {
374 fn from(value: JobState) -> Self {
375 match value {
376 JobState::Queued => Self::Queued,
377 JobState::Running => Self::Running,
378 JobState::Succeeded => Self::Succeeded,
379 JobState::Failed => Self::Failed,
380 JobState::Canceled => Self::Canceled,
381 }
382 }
383}
384
385#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
386pub struct BackgroundJobStatus {
387 pub id: String,
388 pub state: BackgroundJobState,
389 pub attempts: u32,
390 pub result: Option<Value>,
391 pub error: Option<String>,
392}
393
394pub trait BackgroundJobService: Send + Sync {
395 fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
396 fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
397}
398
399#[derive(Clone)]
400pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
401 orchestrator: Arc<T>,
402}
403
404impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
405 pub fn new(orchestrator: Arc<T>) -> Self {
406 Self { orchestrator }
407 }
408}
409
410impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
411 fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
412 let mut job_request =
413 JobRequest::new(request.job, request.payload, request.idempotency_key);
414 job_request.metadata = request.metadata;
415 if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
416 job_request.metadata.insert(
417 "scheduled_at_unix_ms".to_string(),
418 scheduled_at_unix_ms.to_string(),
419 );
420 }
421
422 let handle = self
423 .orchestrator
424 .enqueue(job_request)
425 .map_err(map_job_error)?;
426 Ok(BackgroundJobHandle {
427 id: handle.id,
428 job: handle.workflow,
429 idempotency_key: handle.idempotency_key,
430 })
431 }
432
433 fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
434 let status = self.orchestrator.status(id).map_err(map_job_error)?;
435 Ok(BackgroundJobStatus {
436 id: status.id,
437 state: BackgroundJobState::from(status.state),
438 attempts: status.attempts,
439 result: status.result,
440 error: status.error.map(|err| err.to_string()),
441 })
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
446pub struct EmailMessage {
447 pub to: String,
448 pub subject: String,
449 pub text_body: String,
450 pub html_body: Option<String>,
451 pub headers: BTreeMap<String, String>,
452}
453
454impl EmailMessage {
455 pub fn new(
456 to: impl Into<String>,
457 subject: impl Into<String>,
458 text_body: impl Into<String>,
459 ) -> Self {
460 Self {
461 to: to.into(),
462 subject: subject.into(),
463 text_body: text_body.into(),
464 html_body: None,
465 headers: BTreeMap::new(),
466 }
467 }
468}
469
470#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
471pub struct EmailReceipt {
472 pub message_id: String,
473 pub provider: String,
474 pub accepted_at_unix_ms: u64,
475}
476
477pub trait TransactionalEmailService: Send + Sync {
478 fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
479}
480
481#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
482pub struct SentEmail {
483 pub message: EmailMessage,
484 pub receipt: EmailReceipt,
485}
486
487pub struct InMemoryTransactionalEmailService {
488 provider: String,
489 next_id: Mutex<u64>,
490 sent: Mutex<Vec<SentEmail>>,
491}
492
493impl InMemoryTransactionalEmailService {
494 pub fn new(provider: impl Into<String>) -> Self {
495 Self {
496 provider: provider.into(),
497 next_id: Mutex::new(0),
498 sent: Mutex::new(Vec::new()),
499 }
500 }
501
502 pub fn sent(&self) -> Vec<SentEmail> {
503 self.sent
504 .lock()
505 .map(|entries| entries.clone())
506 .unwrap_or_default()
507 }
508}
509
510impl Default for InMemoryTransactionalEmailService {
511 fn default() -> Self {
512 Self::new("in-memory-mailer")
513 }
514}
515
516impl TransactionalEmailService for InMemoryTransactionalEmailService {
517 fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
518 if message.to.trim().is_empty() {
519 return Err(AppServiceError::new(
520 "email",
521 AppServiceErrorKind::InvalidInput,
522 "empty_recipient",
523 "email recipient must not be empty",
524 ));
525 }
526 let mut next_id = self.next_id.lock().map_err(|_| {
527 AppServiceError::new(
528 "email",
529 AppServiceErrorKind::Unavailable,
530 "email_id_lock_poisoned",
531 "email id lock poisoned",
532 )
533 })?;
534 *next_id = next_id.saturating_add(1);
535 let message_id = format!("mail-{next_id}");
536 let receipt = EmailReceipt {
537 message_id: message_id.clone(),
538 provider: self.provider.clone(),
539 accepted_at_unix_ms: now_unix_ms(),
540 };
541 self.sent
542 .lock()
543 .map_err(|_| {
544 AppServiceError::new(
545 "email",
546 AppServiceErrorKind::Unavailable,
547 "email_log_lock_poisoned",
548 "email log lock poisoned",
549 )
550 })?
551 .push(SentEmail {
552 message: message.clone(),
553 receipt: receipt.clone(),
554 });
555 info!(
556 target: "shelly.integration.app_service",
557 service = "email",
558 operation = "send",
559 message_id = message_id.as_str(),
560 recipient = message.to.as_str(),
561 "Shelly app service operation executed"
562 );
563 Ok(receipt)
564 }
565}
566
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct CacheEntry {
569 pub value: Value,
570 pub expires_at_unix_ms: Option<u64>,
571}
572
573impl CacheEntry {
574 pub fn persistent(value: Value) -> Self {
575 Self {
576 value,
577 expires_at_unix_ms: None,
578 }
579 }
580
581 pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
582 Self {
583 value,
584 expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
585 }
586 }
587}
588
589pub trait CacheBackend: Send + Sync {
590 fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
591 fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
592 fn delete(&self, key: &str) -> AppServiceResult<()>;
593}
594
595#[derive(Default)]
596pub struct InMemoryCacheBackend {
597 entries: Mutex<BTreeMap<String, CacheEntry>>,
598}
599
600impl CacheBackend for InMemoryCacheBackend {
601 fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
602 let mut entries = self.entries.lock().map_err(|_| {
603 AppServiceError::new(
604 "cache",
605 AppServiceErrorKind::Unavailable,
606 "cache_lock_poisoned",
607 "cache lock poisoned",
608 )
609 })?;
610 let Some(entry) = entries.get(key).cloned() else {
611 return Ok(None);
612 };
613 if entry
614 .expires_at_unix_ms
615 .is_some_and(|expires_at| now_unix_ms() >= expires_at)
616 {
617 entries.remove(key);
618 return Ok(None);
619 }
620 Ok(Some(entry.value))
621 }
622
623 fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
624 self.entries
625 .lock()
626 .map_err(|_| {
627 AppServiceError::new(
628 "cache",
629 AppServiceErrorKind::Unavailable,
630 "cache_lock_poisoned",
631 "cache lock poisoned",
632 )
633 })?
634 .insert(key.to_string(), entry);
635 Ok(())
636 }
637
638 fn delete(&self, key: &str) -> AppServiceResult<()> {
639 self.entries
640 .lock()
641 .map_err(|_| {
642 AppServiceError::new(
643 "cache",
644 AppServiceErrorKind::Unavailable,
645 "cache_lock_poisoned",
646 "cache lock poisoned",
647 )
648 })?
649 .remove(key);
650 Ok(())
651 }
652}
653
654#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct QueueMessage {
656 pub id: String,
657 pub topic: String,
658 pub payload: Value,
659 pub attempts: u32,
660 pub enqueued_at_unix_ms: u64,
661 pub scheduled_at_unix_ms: Option<u64>,
662}
663
664pub trait QueueBackend: Send + Sync {
665 fn enqueue(
666 &self,
667 topic: &str,
668 payload: Value,
669 scheduled_at_unix_ms: Option<u64>,
670 ) -> AppServiceResult<QueueMessage>;
671 fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
672 fn len(&self, topic: &str) -> AppServiceResult<usize>;
673}
674
675#[derive(Default)]
676pub struct InMemoryQueueBackend {
677 messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
678 next_id: Mutex<u64>,
679}
680
681impl QueueBackend for InMemoryQueueBackend {
682 fn enqueue(
683 &self,
684 topic: &str,
685 payload: Value,
686 scheduled_at_unix_ms: Option<u64>,
687 ) -> AppServiceResult<QueueMessage> {
688 if topic.trim().is_empty() {
689 return Err(AppServiceError::new(
690 "queue",
691 AppServiceErrorKind::InvalidInput,
692 "empty_topic",
693 "queue topic must not be empty",
694 ));
695 }
696 let mut next_id = self.next_id.lock().map_err(|_| {
697 AppServiceError::new(
698 "queue",
699 AppServiceErrorKind::Unavailable,
700 "queue_id_lock_poisoned",
701 "queue id lock poisoned",
702 )
703 })?;
704 *next_id = next_id.saturating_add(1);
705 let message = QueueMessage {
706 id: format!("queue-{next_id}"),
707 topic: topic.to_string(),
708 payload,
709 attempts: 0,
710 enqueued_at_unix_ms: now_unix_ms(),
711 scheduled_at_unix_ms,
712 };
713 self.messages
714 .lock()
715 .map_err(|_| {
716 AppServiceError::new(
717 "queue",
718 AppServiceErrorKind::Unavailable,
719 "queue_lock_poisoned",
720 "queue lock poisoned",
721 )
722 })?
723 .entry(topic.to_string())
724 .or_default()
725 .push_back(message.clone());
726 Ok(message)
727 }
728
729 fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
730 let mut queues = self.messages.lock().map_err(|_| {
731 AppServiceError::new(
732 "queue",
733 AppServiceErrorKind::Unavailable,
734 "queue_lock_poisoned",
735 "queue lock poisoned",
736 )
737 })?;
738 let Some(queue) = queues.get_mut(topic) else {
739 return Ok(None);
740 };
741 Ok(queue.pop_front().map(|mut message| {
742 message.attempts = message.attempts.saturating_add(1);
743 message
744 }))
745 }
746
747 fn len(&self, topic: &str) -> AppServiceResult<usize> {
748 let queues = self.messages.lock().map_err(|_| {
749 AppServiceError::new(
750 "queue",
751 AppServiceErrorKind::Unavailable,
752 "queue_lock_poisoned",
753 "queue lock poisoned",
754 )
755 })?;
756 Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
757 }
758}
759
760fn map_job_error(err: IntegrationError) -> AppServiceError {
761 let kind = match err.kind {
762 IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
763 IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
764 IntegrationErrorKind::Unavailable
765 | IntegrationErrorKind::Timeout
766 | IntegrationErrorKind::RateLimited
767 | IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
768 IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
769 };
770 AppServiceError::new(
771 "jobs",
772 kind,
773 err.code.unwrap_or_else(|| "job_error".to_string()),
774 err.message,
775 )
776}
777
778fn now_unix_ms() -> u64 {
779 SystemTime::now()
780 .duration_since(UNIX_EPOCH)
781 .unwrap_or_default()
782 .as_millis() as u64
783}
784
785#[cfg(test)]
786mod tests {
787 use super::{
788 AppServiceErrorKind, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
789 BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
790 InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
791 InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
792 TransactionalEmailService,
793 };
794 use crate::integrations::InMemoryJobOrchestrator;
795 use serde_json::json;
796 use std::sync::Arc;
797
798 #[test]
799 fn in_memory_identity_service_signs_in_and_signs_out() {
800 let service = InMemoryIdentityService::default();
801 let session = service
802 .sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
803 .unwrap();
804
805 let identity = service.session_identity(&session.token).unwrap().unwrap();
806 assert_eq!(identity.email, "demo@shelly.dev");
807 assert!(identity.roles.contains(&"admin".to_string()));
808
809 service.sign_out(&session.token).unwrap();
810 let missing = service.session_identity(&session.token).unwrap();
811 assert!(missing.is_none());
812 }
813
814 #[test]
815 fn in_memory_identity_service_rejects_invalid_credentials() {
816 let service = InMemoryIdentityService::default();
817 let err = service
818 .sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
819 .unwrap_err();
820 assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
821 }
822
823 #[test]
824 fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
825 let cache = InMemoryCacheBackend::default();
826 cache
827 .set(
828 "sessions:1",
829 CacheEntry::persistent(json!({"active": true})),
830 )
831 .unwrap();
832 let value = cache.get("sessions:1").unwrap();
833 assert_eq!(value, Some(json!({"active": true})));
834
835 cache.delete("sessions:1").unwrap();
836 let cleared = cache.get("sessions:1").unwrap();
837 assert!(cleared.is_none());
838
839 cache
840 .set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
841 .unwrap();
842 let expired = cache.get("temp").unwrap();
843 assert!(expired.is_none());
844 }
845
846 #[test]
847 fn in_memory_queue_backend_is_fifo_per_topic() {
848 let queue = InMemoryQueueBackend::default();
849 queue.enqueue("emails", json!({"id": 1}), None).unwrap();
850 queue.enqueue("emails", json!({"id": 2}), None).unwrap();
851 assert_eq!(queue.len("emails").unwrap(), 2);
852
853 let first = queue.dequeue("emails").unwrap().unwrap();
854 let second = queue.dequeue("emails").unwrap().unwrap();
855 assert_eq!(first.payload, json!({"id": 1}));
856 assert_eq!(second.payload, json!({"id": 2}));
857 assert_eq!(queue.len("emails").unwrap(), 0);
858 }
859
860 #[test]
861 fn transactional_email_service_records_sent_messages() {
862 let mailer = InMemoryTransactionalEmailService::default();
863 let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
864 let receipt = mailer.send(message.clone()).unwrap();
865 assert_eq!(receipt.provider, "in-memory-mailer");
866
867 let sent = mailer.sent();
868 assert_eq!(sent.len(), 1);
869 assert_eq!(sent[0].message, message);
870 assert_eq!(sent[0].receipt.message_id, receipt.message_id);
871 }
872
873 #[test]
874 fn background_job_service_wraps_job_orchestrator_contract() {
875 let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
876 let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
877 let handle = jobs
878 .enqueue(BackgroundJobRequest::new(
879 "send_digest",
880 json!({"account_id": 7}),
881 "send_digest:7",
882 ))
883 .unwrap();
884 let queued = jobs.status(&handle.id).unwrap();
885 assert_eq!(queued.state, BackgroundJobState::Queued);
886
887 orchestrator
888 .mark_succeeded(&handle.id, json!({"emails_sent": 3}))
889 .unwrap();
890 let done = jobs.status(&handle.id).unwrap();
891 assert_eq!(done.state, BackgroundJobState::Succeeded);
892 assert_eq!(done.result, Some(json!({"emails_sent": 3})));
893 }
894}