use crate::{
integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
DataError, DataResult,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, VecDeque},
fmt,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
};
use tracing::{info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AppServiceErrorKind {
InvalidCredentials,
Unauthorized,
NotFound,
Conflict,
InvalidInput,
Unavailable,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppServiceError {
pub source: String,
pub kind: AppServiceErrorKind,
pub code: String,
pub message: String,
}
impl AppServiceError {
pub fn new(
source: impl Into<String>,
kind: AppServiceErrorKind,
code: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
source: source.into(),
kind,
code: code.into(),
message: message.into(),
}
}
}
impl fmt::Display for AppServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}:{}] {}", self.source, self.code, self.message)
}
}
impl std::error::Error for AppServiceError {}
pub type AppServiceResult<T> = Result<T, AppServiceError>;
pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
DataError::Integration(format!("[{}] {}", source.into(), err))
}
pub fn map_app_service_result<T>(
source: impl Into<String>,
result: AppServiceResult<T>,
) -> DataResult<T> {
result.map_err(|err| map_app_service_error(source, err))
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthIdentity {
pub user_id: String,
pub email: String,
pub display_name: Option<String>,
pub roles: Vec<String>,
pub attributes: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthCredentials {
pub identifier: String,
pub password: String,
}
impl AuthCredentials {
pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
Self {
identifier: identifier.into(),
password: password.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthSession {
pub token: String,
pub identity: AuthIdentity,
pub issued_at_unix_ms: u64,
pub expires_at_unix_ms: u64,
}
pub trait IdentityService: Send + Sync {
fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
fn sign_out(&self, token: &str) -> AppServiceResult<()>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct IdentityRecord {
user_id: String,
email: String,
password: String,
display_name: Option<String>,
roles: Vec<String>,
attributes: BTreeMap<String, String>,
}
pub struct InMemoryIdentityService {
users: Mutex<BTreeMap<String, IdentityRecord>>,
sessions: Mutex<BTreeMap<String, AuthSession>>,
next_session_id: Mutex<u64>,
session_ttl_ms: u64,
}
impl InMemoryIdentityService {
pub fn new() -> Self {
let service = Self {
users: Mutex::new(BTreeMap::new()),
sessions: Mutex::new(BTreeMap::new()),
next_session_id: Mutex::new(0),
session_ttl_ms: 8 * 60 * 60 * 1000,
};
service
.insert_user(
"user-1",
"demo@shelly.dev",
"shelly-demo",
Some("Shelly Demo"),
vec!["admin".to_string()],
BTreeMap::new(),
)
.expect("default in-memory auth user must be inserted");
service
}
pub fn insert_user(
&self,
user_id: impl Into<String>,
email: impl Into<String>,
password: impl Into<String>,
display_name: Option<&str>,
roles: Vec<String>,
attributes: BTreeMap<String, String>,
) -> AppServiceResult<()> {
let email = email.into().trim().to_ascii_lowercase();
if email.is_empty() {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidInput,
"empty_email",
"email must not be empty",
));
}
let mut users = self.users.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"users_lock_poisoned",
"auth users lock poisoned",
)
})?;
users.insert(
email.clone(),
IdentityRecord {
user_id: user_id.into(),
email: email.clone(),
password: password.into(),
display_name: display_name.map(ToString::to_string),
roles,
attributes,
},
);
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "insert_user",
email,
"Shelly app service operation executed"
);
Ok(())
}
pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
self.session_ttl_ms = ttl_ms.max(1);
self
}
}
impl Default for InMemoryIdentityService {
fn default() -> Self {
Self::new()
}
}
impl IdentityService for InMemoryIdentityService {
fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
let identifier = credentials.identifier.trim().to_ascii_lowercase();
let users = self.users.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"users_lock_poisoned",
"auth users lock poisoned",
)
})?;
let Some(record) = users.get(&identifier) else {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidCredentials,
"invalid_credentials",
"invalid credentials",
));
};
if record.password != credentials.password {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidCredentials,
"invalid_credentials",
"invalid credentials",
));
}
let identity = AuthIdentity {
user_id: record.user_id.clone(),
email: record.email.clone(),
display_name: record.display_name.clone(),
roles: record.roles.clone(),
attributes: record.attributes.clone(),
};
drop(users);
let mut next_session_id = self.next_session_id.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"session_id_lock_poisoned",
"session id lock poisoned",
)
})?;
*next_session_id = next_session_id.saturating_add(1);
let token = format!("sess-{next_session_id}");
let issued_at_unix_ms = now_unix_ms();
let session = AuthSession {
token: token.clone(),
identity,
issued_at_unix_ms,
expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
};
self.sessions
.lock()
.map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?
.insert(token.clone(), session.clone());
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "sign_in",
token = token.as_str(),
user_id = session.identity.user_id.as_str(),
"Shelly app service operation executed"
);
Ok(session)
}
fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
let mut sessions = self.sessions.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?;
let Some(session) = sessions.get(token).cloned() else {
return Ok(None);
};
if now_unix_ms() >= session.expires_at_unix_ms {
sessions.remove(token);
warn!(
target: "shelly.integration.app_service",
service = "auth",
operation = "session_identity",
token,
"Shelly app session expired and was evicted"
);
return Ok(None);
}
Ok(Some(session.identity))
}
fn sign_out(&self, token: &str) -> AppServiceResult<()> {
self.sessions
.lock()
.map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?
.remove(token);
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "sign_out",
token,
"Shelly app service operation executed"
);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobRequest {
pub job: String,
pub payload: Value,
pub idempotency_key: String,
pub scheduled_at_unix_ms: Option<u64>,
pub metadata: BTreeMap<String, String>,
}
impl BackgroundJobRequest {
pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
Self {
job: job.into(),
payload,
idempotency_key: idempotency_key.into(),
scheduled_at_unix_ms: None,
metadata: BTreeMap::new(),
}
}
pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobHandle {
pub id: String,
pub job: String,
pub idempotency_key: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum BackgroundJobState {
Queued,
Running,
Succeeded,
Failed,
Canceled,
}
impl From<JobState> for BackgroundJobState {
fn from(value: JobState) -> Self {
match value {
JobState::Queued => Self::Queued,
JobState::Running => Self::Running,
JobState::Succeeded => Self::Succeeded,
JobState::Failed => Self::Failed,
JobState::Canceled => Self::Canceled,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobStatus {
pub id: String,
pub state: BackgroundJobState,
pub attempts: u32,
pub result: Option<Value>,
pub error: Option<String>,
}
pub trait BackgroundJobService: Send + Sync {
fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantBackgroundJobQuota {
pub max_jobs_per_window: usize,
pub window_ms: u64,
pub require_tenant_id: bool,
}
impl Default for TenantBackgroundJobQuota {
fn default() -> Self {
Self {
max_jobs_per_window: 10_000,
window_ms: 60_000,
require_tenant_id: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TenantJobWindow {
started_at_unix_ms: u64,
enqueued: usize,
}
impl TenantJobWindow {
fn new(now_unix_ms: u64) -> Self {
Self {
started_at_unix_ms: now_unix_ms,
enqueued: 0,
}
}
fn roll_if_needed(&mut self, now_unix_ms: u64, window_ms: u64) {
if now_unix_ms.saturating_sub(self.started_at_unix_ms) >= window_ms {
*self = Self::new(now_unix_ms);
}
}
}
pub struct TenantQuotaBackgroundJobs<T: BackgroundJobService> {
inner: Arc<T>,
quota: TenantBackgroundJobQuota,
windows: Mutex<BTreeMap<String, TenantJobWindow>>,
}
impl<T: BackgroundJobService> TenantQuotaBackgroundJobs<T> {
pub fn new(inner: Arc<T>) -> Self {
Self {
inner,
quota: TenantBackgroundJobQuota::default(),
windows: Mutex::new(BTreeMap::new()),
}
}
pub fn with_quota(mut self, quota: TenantBackgroundJobQuota) -> Self {
self.quota = quota;
self
}
fn reserve_enqueue_slot(&self, tenant_id: &str) -> AppServiceResult<()> {
let now = now_unix_ms();
let mut windows = self.windows.lock().map_err(|_| {
AppServiceError::new(
"jobs",
AppServiceErrorKind::Unavailable,
"tenant_quota_lock_poisoned",
"tenant quota lock poisoned",
)
})?;
let window = windows
.entry(tenant_id.to_string())
.or_insert_with(|| TenantJobWindow::new(now));
window.roll_if_needed(now, self.quota.window_ms.max(1));
let next = window.enqueued.saturating_add(1);
if next > self.quota.max_jobs_per_window.max(1) {
return Err(AppServiceError::new(
"jobs",
AppServiceErrorKind::Conflict,
"tenant_job_quota_exceeded",
"tenant background job quota exceeded",
));
}
window.enqueued = next;
Ok(())
}
fn rollback_enqueue_slot(&self, tenant_id: &str) {
if let Ok(mut windows) = self.windows.lock() {
if let Some(window) = windows.get_mut(tenant_id) {
window.enqueued = window.enqueued.saturating_sub(1);
}
}
}
}
impl<T: BackgroundJobService> BackgroundJobService for TenantQuotaBackgroundJobs<T> {
fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
let tenant_id = request
.metadata
.get("tenant_id")
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string);
if self.quota.require_tenant_id && tenant_id.is_none() {
return Err(AppServiceError::new(
"jobs",
AppServiceErrorKind::InvalidInput,
"tenant_context_required",
"tenant_id metadata is required for background jobs",
));
}
if let Some(tenant_id) = tenant_id.as_deref() {
self.reserve_enqueue_slot(tenant_id)?;
}
let result = self.inner.enqueue(request);
if result.is_err() {
if let Some(tenant_id) = tenant_id.as_deref() {
self.rollback_enqueue_slot(tenant_id);
}
}
result
}
fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
self.inner.status(id)
}
}
#[derive(Clone)]
pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
orchestrator: Arc<T>,
}
impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
pub fn new(orchestrator: Arc<T>) -> Self {
Self { orchestrator }
}
}
impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
let mut job_request =
JobRequest::new(request.job, request.payload, request.idempotency_key);
job_request.metadata = request.metadata;
if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
job_request.metadata.insert(
"scheduled_at_unix_ms".to_string(),
scheduled_at_unix_ms.to_string(),
);
}
let handle = self
.orchestrator
.enqueue(job_request)
.map_err(map_job_error)?;
Ok(BackgroundJobHandle {
id: handle.id,
job: handle.workflow,
idempotency_key: handle.idempotency_key,
})
}
fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
let status = self.orchestrator.status(id).map_err(map_job_error)?;
Ok(BackgroundJobStatus {
id: status.id,
state: BackgroundJobState::from(status.state),
attempts: status.attempts,
result: status.result,
error: status.error.map(|err| err.to_string()),
})
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EmailMessage {
pub to: String,
pub subject: String,
pub text_body: String,
pub html_body: Option<String>,
pub headers: BTreeMap<String, String>,
}
impl EmailMessage {
pub fn new(
to: impl Into<String>,
subject: impl Into<String>,
text_body: impl Into<String>,
) -> Self {
Self {
to: to.into(),
subject: subject.into(),
text_body: text_body.into(),
html_body: None,
headers: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmailReceipt {
pub message_id: String,
pub provider: String,
pub accepted_at_unix_ms: u64,
}
pub trait TransactionalEmailService: Send + Sync {
fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SentEmail {
pub message: EmailMessage,
pub receipt: EmailReceipt,
}
pub struct InMemoryTransactionalEmailService {
provider: String,
next_id: Mutex<u64>,
sent: Mutex<Vec<SentEmail>>,
}
impl InMemoryTransactionalEmailService {
pub fn new(provider: impl Into<String>) -> Self {
Self {
provider: provider.into(),
next_id: Mutex::new(0),
sent: Mutex::new(Vec::new()),
}
}
pub fn sent(&self) -> Vec<SentEmail> {
self.sent
.lock()
.map(|entries| entries.clone())
.unwrap_or_default()
}
}
impl Default for InMemoryTransactionalEmailService {
fn default() -> Self {
Self::new("in-memory-mailer")
}
}
impl TransactionalEmailService for InMemoryTransactionalEmailService {
fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
if message.to.trim().is_empty() {
return Err(AppServiceError::new(
"email",
AppServiceErrorKind::InvalidInput,
"empty_recipient",
"email recipient must not be empty",
));
}
let mut next_id = self.next_id.lock().map_err(|_| {
AppServiceError::new(
"email",
AppServiceErrorKind::Unavailable,
"email_id_lock_poisoned",
"email id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let message_id = format!("mail-{next_id}");
let receipt = EmailReceipt {
message_id: message_id.clone(),
provider: self.provider.clone(),
accepted_at_unix_ms: now_unix_ms(),
};
self.sent
.lock()
.map_err(|_| {
AppServiceError::new(
"email",
AppServiceErrorKind::Unavailable,
"email_log_lock_poisoned",
"email log lock poisoned",
)
})?
.push(SentEmail {
message: message.clone(),
receipt: receipt.clone(),
});
info!(
target: "shelly.integration.app_service",
service = "email",
operation = "send",
message_id = message_id.as_str(),
recipient = message.to.as_str(),
"Shelly app service operation executed"
);
Ok(receipt)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CacheEntry {
pub value: Value,
pub expires_at_unix_ms: Option<u64>,
}
impl CacheEntry {
pub fn persistent(value: Value) -> Self {
Self {
value,
expires_at_unix_ms: None,
}
}
pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
Self {
value,
expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
}
}
}
pub trait CacheBackend: Send + Sync {
fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
fn delete(&self, key: &str) -> AppServiceResult<()>;
}
#[derive(Default)]
pub struct InMemoryCacheBackend {
entries: Mutex<BTreeMap<String, CacheEntry>>,
}
impl CacheBackend for InMemoryCacheBackend {
fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
let mut entries = self.entries.lock().map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?;
let Some(entry) = entries.get(key).cloned() else {
return Ok(None);
};
if entry
.expires_at_unix_ms
.is_some_and(|expires_at| now_unix_ms() >= expires_at)
{
entries.remove(key);
return Ok(None);
}
Ok(Some(entry.value))
}
fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
self.entries
.lock()
.map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?
.insert(key.to_string(), entry);
Ok(())
}
fn delete(&self, key: &str) -> AppServiceResult<()> {
self.entries
.lock()
.map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?
.remove(key);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct QueueMessage {
pub id: String,
pub topic: String,
pub payload: Value,
pub attempts: u32,
pub enqueued_at_unix_ms: u64,
pub scheduled_at_unix_ms: Option<u64>,
}
pub trait QueueBackend: Send + Sync {
fn enqueue(
&self,
topic: &str,
payload: Value,
scheduled_at_unix_ms: Option<u64>,
) -> AppServiceResult<QueueMessage>;
fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
fn len(&self, topic: &str) -> AppServiceResult<usize>;
}
#[derive(Default)]
pub struct InMemoryQueueBackend {
messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
next_id: Mutex<u64>,
}
impl QueueBackend for InMemoryQueueBackend {
fn enqueue(
&self,
topic: &str,
payload: Value,
scheduled_at_unix_ms: Option<u64>,
) -> AppServiceResult<QueueMessage> {
if topic.trim().is_empty() {
return Err(AppServiceError::new(
"queue",
AppServiceErrorKind::InvalidInput,
"empty_topic",
"queue topic must not be empty",
));
}
let mut next_id = self.next_id.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_id_lock_poisoned",
"queue id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let message = QueueMessage {
id: format!("queue-{next_id}"),
topic: topic.to_string(),
payload,
attempts: 0,
enqueued_at_unix_ms: now_unix_ms(),
scheduled_at_unix_ms,
};
self.messages
.lock()
.map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?
.entry(topic.to_string())
.or_default()
.push_back(message.clone());
Ok(message)
}
fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
let mut queues = self.messages.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?;
let Some(queue) = queues.get_mut(topic) else {
return Ok(None);
};
Ok(queue.pop_front().map(|mut message| {
message.attempts = message.attempts.saturating_add(1);
message
}))
}
fn len(&self, topic: &str) -> AppServiceResult<usize> {
let queues = self.messages.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?;
Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
}
}
fn map_job_error(err: IntegrationError) -> AppServiceError {
let kind = match err.kind {
IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
IntegrationErrorKind::Unavailable
| IntegrationErrorKind::Timeout
| IntegrationErrorKind::RateLimited
| IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
};
AppServiceError::new(
"jobs",
kind,
err.code.unwrap_or_else(|| "job_error".to_string()),
err.message,
)
}
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::{
map_app_service_result, map_job_error, AppServiceError, AppServiceErrorKind,
AppServiceResult, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
TenantBackgroundJobQuota, TenantQuotaBackgroundJobs, TransactionalEmailService,
};
use crate::integrations::{
InMemoryJobOrchestrator, IntegrationError, IntegrationErrorKind, JobState,
};
use serde_json::json;
use std::{
panic::{catch_unwind, AssertUnwindSafe},
sync::Arc,
thread,
time::Duration,
};
#[test]
fn in_memory_identity_service_signs_in_and_signs_out() {
let service = InMemoryIdentityService::default();
let session = service
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap();
let identity = service.session_identity(&session.token).unwrap().unwrap();
assert_eq!(identity.email, "demo@shelly.dev");
assert!(identity.roles.contains(&"admin".to_string()));
service.sign_out(&session.token).unwrap();
let missing = service.session_identity(&session.token).unwrap();
assert!(missing.is_none());
}
#[test]
fn in_memory_identity_service_rejects_invalid_credentials() {
let service = InMemoryIdentityService::default();
let err = service
.sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
.unwrap_err();
assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
}
#[test]
fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
let cache = InMemoryCacheBackend::default();
cache
.set(
"sessions:1",
CacheEntry::persistent(json!({"active": true})),
)
.unwrap();
let value = cache.get("sessions:1").unwrap();
assert_eq!(value, Some(json!({"active": true})));
cache.delete("sessions:1").unwrap();
let cleared = cache.get("sessions:1").unwrap();
assert!(cleared.is_none());
cache
.set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
.unwrap();
let expired = cache.get("temp").unwrap();
assert!(expired.is_none());
}
#[test]
fn in_memory_queue_backend_is_fifo_per_topic() {
let queue = InMemoryQueueBackend::default();
queue.enqueue("emails", json!({"id": 1}), None).unwrap();
queue.enqueue("emails", json!({"id": 2}), None).unwrap();
assert_eq!(queue.len("emails").unwrap(), 2);
let first = queue.dequeue("emails").unwrap().unwrap();
let second = queue.dequeue("emails").unwrap().unwrap();
assert_eq!(first.payload, json!({"id": 1}));
assert_eq!(second.payload, json!({"id": 2}));
assert_eq!(queue.len("emails").unwrap(), 0);
}
#[test]
fn transactional_email_service_records_sent_messages() {
let mailer = InMemoryTransactionalEmailService::default();
let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
let receipt = mailer.send(message.clone()).unwrap();
assert_eq!(receipt.provider, "in-memory-mailer");
let sent = mailer.sent();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0].message, message);
assert_eq!(sent[0].receipt.message_id, receipt.message_id);
}
#[test]
fn background_job_service_wraps_job_orchestrator_contract() {
let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
let handle = jobs
.enqueue(BackgroundJobRequest::new(
"send_digest",
json!({"account_id": 7}),
"send_digest:7",
))
.unwrap();
let queued = jobs.status(&handle.id).unwrap();
assert_eq!(queued.state, BackgroundJobState::Queued);
orchestrator
.mark_succeeded(&handle.id, json!({"emails_sent": 3}))
.unwrap();
let done = jobs.status(&handle.id).unwrap();
assert_eq!(done.state, BackgroundJobState::Succeeded);
assert_eq!(done.result, Some(json!({"emails_sent": 3})));
}
#[test]
fn tenant_quota_background_jobs_enforce_per_tenant_limits() {
let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
let base = Arc::new(JobOrchestratorBackgroundJobs::new(orchestrator));
let jobs = TenantQuotaBackgroundJobs::new(base).with_quota(TenantBackgroundJobQuota {
max_jobs_per_window: 1,
window_ms: u64::MAX,
require_tenant_id: true,
});
let mut first = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-1");
first
.metadata
.insert("tenant_id".to_string(), "tenant-a".to_string());
let mut second = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-a-2");
second
.metadata
.insert("tenant_id".to_string(), "tenant-a".to_string());
let mut other_tenant =
BackgroundJobRequest::new("sync", json!({"tenant": "b"}), "sync-b-1");
other_tenant
.metadata
.insert("tenant_id".to_string(), "tenant-b".to_string());
jobs.enqueue(first).expect("first tenant-a job should pass");
let denied = jobs
.enqueue(second)
.expect_err("second tenant-a job should be denied by quota");
assert_eq!(denied.code, "tenant_job_quota_exceeded");
jobs.enqueue(other_tenant)
.expect("tenant-b should keep its own quota window");
let missing_tenant = BackgroundJobRequest::new("sync", json!({"tenant": "?"}), "sync-?");
let missing_tenant_error = jobs
.enqueue(missing_tenant)
.expect_err("missing tenant_id metadata should be denied");
assert_eq!(missing_tenant_error.code, "tenant_context_required");
}
#[test]
fn map_app_service_result_wraps_errors_with_source_context() {
let mapped = map_app_service_result::<()>(
"email",
Err(AppServiceError::new(
"mailer",
AppServiceErrorKind::InvalidInput,
"empty_recipient",
"recipient is required",
)),
)
.unwrap_err()
.to_string();
assert!(mapped.contains("[email]"));
assert!(mapped.contains("empty_recipient"));
}
#[test]
fn identity_service_covers_insert_validation_ttl_floor_and_expiry() {
let service = InMemoryIdentityService::default();
let err = service
.insert_user("u-1", " ", "pw", None, Vec::new(), Default::default())
.unwrap_err();
assert_eq!(err.code, "empty_email");
let ttl_floor = InMemoryIdentityService::default().with_session_ttl_ms(0);
let session = ttl_floor
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap();
assert!(session.expires_at_unix_ms >= session.issued_at_unix_ms.saturating_add(1));
let short_lived = InMemoryIdentityService::default().with_session_ttl_ms(1);
let expiring = short_lived
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap();
thread::sleep(Duration::from_millis(3));
let expired = short_lived.session_identity(&expiring.token).unwrap();
assert!(expired.is_none());
}
#[test]
fn queue_and_email_services_cover_invalid_input_paths() {
let queue = InMemoryQueueBackend::default();
let err = queue.enqueue(" ", json!({"id": 1}), None).unwrap_err();
assert_eq!(err.code, "empty_topic");
assert!(queue.dequeue("missing").unwrap().is_none());
assert_eq!(queue.len("missing").unwrap(), 0);
let mailer = InMemoryTransactionalEmailService::default();
let err = mailer
.send(EmailMessage::new(" ", "Subject", "Body"))
.unwrap_err();
assert_eq!(err.code, "empty_recipient");
}
#[derive(Default)]
struct FailingJobs;
impl BackgroundJobService for FailingJobs {
fn enqueue(
&self,
_request: BackgroundJobRequest,
) -> AppServiceResult<super::BackgroundJobHandle> {
Err(AppServiceError::new(
"jobs",
AppServiceErrorKind::Unavailable,
"inner_unavailable",
"inner jobs service unavailable",
))
}
fn status(&self, id: &str) -> AppServiceResult<super::BackgroundJobStatus> {
Err(AppServiceError::new(
"jobs",
AppServiceErrorKind::NotFound,
"missing_job",
format!("missing job {id}"),
))
}
}
#[test]
fn tenant_quota_rolls_back_slot_when_inner_enqueue_fails() {
let jobs = TenantQuotaBackgroundJobs::new(Arc::new(FailingJobs)).with_quota(
TenantBackgroundJobQuota {
max_jobs_per_window: 1,
window_ms: u64::MAX,
require_tenant_id: true,
},
);
let mut request_one = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-1");
request_one
.metadata
.insert("tenant_id".to_string(), "tenant-a".to_string());
let mut request_two = BackgroundJobRequest::new("sync", json!({"tenant": "a"}), "sync-2");
request_two
.metadata
.insert("tenant_id".to_string(), "tenant-a".to_string());
let first = jobs.enqueue(request_one).unwrap_err();
assert_eq!(first.code, "inner_unavailable");
let second = jobs.enqueue(request_two).unwrap_err();
assert_eq!(second.code, "inner_unavailable");
let status = jobs.status("missing").unwrap_err();
assert_eq!(status.code, "missing_job");
}
#[test]
fn app_service_lock_poisoning_paths_are_reported_with_unavailable_codes() {
let users_poisoned = InMemoryIdentityService::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = users_poisoned.users.lock().expect("users lock");
panic!("poison users");
}));
let err = users_poisoned
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap_err();
assert_eq!(err.code, "users_lock_poisoned");
let session_id_poisoned = InMemoryIdentityService::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = session_id_poisoned
.next_session_id
.lock()
.expect("next_session_id lock");
panic!("poison next session id");
}));
let err = session_id_poisoned
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap_err();
assert_eq!(err.code, "session_id_lock_poisoned");
let sessions_poisoned = InMemoryIdentityService::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = sessions_poisoned.sessions.lock().expect("sessions lock");
panic!("poison sessions");
}));
let err = sessions_poisoned
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap_err();
assert_eq!(err.code, "sessions_lock_poisoned");
let err = sessions_poisoned.session_identity("sess-1").unwrap_err();
assert_eq!(err.code, "sessions_lock_poisoned");
let err = sessions_poisoned.sign_out("sess-1").unwrap_err();
assert_eq!(err.code, "sessions_lock_poisoned");
let mailer_id_poisoned = InMemoryTransactionalEmailService::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = mailer_id_poisoned.next_id.lock().expect("email id lock");
panic!("poison email id");
}));
let err = mailer_id_poisoned
.send(EmailMessage::new("ada@example.test", "Subject", "Body"))
.unwrap_err();
assert_eq!(err.code, "email_id_lock_poisoned");
let mailer_log_poisoned = InMemoryTransactionalEmailService::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = mailer_log_poisoned.sent.lock().expect("email log lock");
panic!("poison email log");
}));
let err = mailer_log_poisoned
.send(EmailMessage::new("ada@example.test", "Subject", "Body"))
.unwrap_err();
assert_eq!(err.code, "email_log_lock_poisoned");
let queue_id_poisoned = InMemoryQueueBackend::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = queue_id_poisoned.next_id.lock().expect("queue id lock");
panic!("poison queue id");
}));
let err = queue_id_poisoned
.enqueue("jobs", json!({"id": 1}), None)
.unwrap_err();
assert_eq!(err.code, "queue_id_lock_poisoned");
let queue_store_poisoned = InMemoryQueueBackend::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = queue_store_poisoned
.messages
.lock()
.expect("queue store lock");
panic!("poison queue store");
}));
let err = queue_store_poisoned.len("jobs").unwrap_err();
assert_eq!(err.code, "queue_lock_poisoned");
let err = queue_store_poisoned.dequeue("jobs").unwrap_err();
assert_eq!(err.code, "queue_lock_poisoned");
let cache_poisoned = InMemoryCacheBackend::default();
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = cache_poisoned.entries.lock().expect("cache lock");
panic!("poison cache");
}));
let err = cache_poisoned.get("k").unwrap_err();
assert_eq!(err.code, "cache_lock_poisoned");
let err = cache_poisoned
.set("k", CacheEntry::persistent(json!({"v": 1})))
.unwrap_err();
assert_eq!(err.code, "cache_lock_poisoned");
let err = cache_poisoned.delete("k").unwrap_err();
assert_eq!(err.code, "cache_lock_poisoned");
}
#[test]
fn background_job_helper_builders_and_error_mapping_cover_state_matrix() {
let scheduled = BackgroundJobRequest::new("job", json!({"id": 1}), "idempotent")
.schedule_at_unix_ms(1234);
assert_eq!(scheduled.scheduled_at_unix_ms, Some(1234));
assert_eq!(
BackgroundJobState::from(JobState::Queued),
BackgroundJobState::Queued
);
assert_eq!(
BackgroundJobState::from(JobState::Running),
BackgroundJobState::Running
);
assert_eq!(
BackgroundJobState::from(JobState::Succeeded),
BackgroundJobState::Succeeded
);
assert_eq!(
BackgroundJobState::from(JobState::Failed),
BackgroundJobState::Failed
);
assert_eq!(
BackgroundJobState::from(JobState::Canceled),
BackgroundJobState::Canceled
);
let mapped = map_job_error(
IntegrationError::new("trigger", IntegrationErrorKind::InvalidInput, "bad request")
.with_code("invalid_payload"),
);
assert_eq!(mapped.kind, AppServiceErrorKind::InvalidInput);
assert_eq!(mapped.code, "invalid_payload");
let mapped = map_job_error(IntegrationError::new(
"trigger",
IntegrationErrorKind::Auth,
"unauthorized",
));
assert_eq!(mapped.kind, AppServiceErrorKind::Unauthorized);
for transient_kind in [
IntegrationErrorKind::Unavailable,
IntegrationErrorKind::Timeout,
IntegrationErrorKind::RateLimited,
IntegrationErrorKind::Transient,
] {
let mapped = map_job_error(IntegrationError::new("trigger", transient_kind, "retry"));
assert_eq!(mapped.kind, AppServiceErrorKind::Unavailable);
}
let mapped = map_job_error(IntegrationError::new(
"trigger",
IntegrationErrorKind::Permanent,
"conflict",
));
assert_eq!(mapped.kind, AppServiceErrorKind::Conflict);
}
}