use crate::{
integrations::{IntegrationError, IntegrationErrorKind, JobOrchestrator, JobRequest, JobState},
DataError, DataResult,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, VecDeque},
fmt,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
};
use tracing::{info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AppServiceErrorKind {
InvalidCredentials,
Unauthorized,
NotFound,
Conflict,
InvalidInput,
Unavailable,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppServiceError {
pub source: String,
pub kind: AppServiceErrorKind,
pub code: String,
pub message: String,
}
impl AppServiceError {
pub fn new(
source: impl Into<String>,
kind: AppServiceErrorKind,
code: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
source: source.into(),
kind,
code: code.into(),
message: message.into(),
}
}
}
impl fmt::Display for AppServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}:{}] {}", self.source, self.code, self.message)
}
}
impl std::error::Error for AppServiceError {}
pub type AppServiceResult<T> = Result<T, AppServiceError>;
pub fn map_app_service_error(source: impl Into<String>, err: AppServiceError) -> DataError {
DataError::Integration(format!("[{}] {}", source.into(), err))
}
pub fn map_app_service_result<T>(
source: impl Into<String>,
result: AppServiceResult<T>,
) -> DataResult<T> {
result.map_err(|err| map_app_service_error(source, err))
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthIdentity {
pub user_id: String,
pub email: String,
pub display_name: Option<String>,
pub roles: Vec<String>,
pub attributes: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthCredentials {
pub identifier: String,
pub password: String,
}
impl AuthCredentials {
pub fn new(identifier: impl Into<String>, password: impl Into<String>) -> Self {
Self {
identifier: identifier.into(),
password: password.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthSession {
pub token: String,
pub identity: AuthIdentity,
pub issued_at_unix_ms: u64,
pub expires_at_unix_ms: u64,
}
pub trait IdentityService: Send + Sync {
fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession>;
fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>>;
fn sign_out(&self, token: &str) -> AppServiceResult<()>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct IdentityRecord {
user_id: String,
email: String,
password: String,
display_name: Option<String>,
roles: Vec<String>,
attributes: BTreeMap<String, String>,
}
pub struct InMemoryIdentityService {
users: Mutex<BTreeMap<String, IdentityRecord>>,
sessions: Mutex<BTreeMap<String, AuthSession>>,
next_session_id: Mutex<u64>,
session_ttl_ms: u64,
}
impl InMemoryIdentityService {
pub fn new() -> Self {
let service = Self {
users: Mutex::new(BTreeMap::new()),
sessions: Mutex::new(BTreeMap::new()),
next_session_id: Mutex::new(0),
session_ttl_ms: 8 * 60 * 60 * 1000,
};
service
.insert_user(
"user-1",
"demo@shelly.dev",
"shelly-demo",
Some("Shelly Demo"),
vec!["admin".to_string()],
BTreeMap::new(),
)
.expect("default in-memory auth user must be inserted");
service
}
pub fn insert_user(
&self,
user_id: impl Into<String>,
email: impl Into<String>,
password: impl Into<String>,
display_name: Option<&str>,
roles: Vec<String>,
attributes: BTreeMap<String, String>,
) -> AppServiceResult<()> {
let email = email.into().trim().to_ascii_lowercase();
if email.is_empty() {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidInput,
"empty_email",
"email must not be empty",
));
}
let mut users = self.users.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"users_lock_poisoned",
"auth users lock poisoned",
)
})?;
users.insert(
email.clone(),
IdentityRecord {
user_id: user_id.into(),
email: email.clone(),
password: password.into(),
display_name: display_name.map(ToString::to_string),
roles,
attributes,
},
);
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "insert_user",
email,
"Shelly app service operation executed"
);
Ok(())
}
pub fn with_session_ttl_ms(mut self, ttl_ms: u64) -> Self {
self.session_ttl_ms = ttl_ms.max(1);
self
}
}
impl Default for InMemoryIdentityService {
fn default() -> Self {
Self::new()
}
}
impl IdentityService for InMemoryIdentityService {
fn sign_in(&self, credentials: AuthCredentials) -> AppServiceResult<AuthSession> {
let identifier = credentials.identifier.trim().to_ascii_lowercase();
let users = self.users.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"users_lock_poisoned",
"auth users lock poisoned",
)
})?;
let Some(record) = users.get(&identifier) else {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidCredentials,
"invalid_credentials",
"invalid credentials",
));
};
if record.password != credentials.password {
return Err(AppServiceError::new(
"auth",
AppServiceErrorKind::InvalidCredentials,
"invalid_credentials",
"invalid credentials",
));
}
let identity = AuthIdentity {
user_id: record.user_id.clone(),
email: record.email.clone(),
display_name: record.display_name.clone(),
roles: record.roles.clone(),
attributes: record.attributes.clone(),
};
drop(users);
let mut next_session_id = self.next_session_id.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"session_id_lock_poisoned",
"session id lock poisoned",
)
})?;
*next_session_id = next_session_id.saturating_add(1);
let token = format!("sess-{next_session_id}");
let issued_at_unix_ms = now_unix_ms();
let session = AuthSession {
token: token.clone(),
identity,
issued_at_unix_ms,
expires_at_unix_ms: issued_at_unix_ms.saturating_add(self.session_ttl_ms),
};
self.sessions
.lock()
.map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?
.insert(token.clone(), session.clone());
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "sign_in",
token = token.as_str(),
user_id = session.identity.user_id.as_str(),
"Shelly app service operation executed"
);
Ok(session)
}
fn session_identity(&self, token: &str) -> AppServiceResult<Option<AuthIdentity>> {
let mut sessions = self.sessions.lock().map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?;
let Some(session) = sessions.get(token).cloned() else {
return Ok(None);
};
if now_unix_ms() >= session.expires_at_unix_ms {
sessions.remove(token);
warn!(
target: "shelly.integration.app_service",
service = "auth",
operation = "session_identity",
token,
"Shelly app session expired and was evicted"
);
return Ok(None);
}
Ok(Some(session.identity))
}
fn sign_out(&self, token: &str) -> AppServiceResult<()> {
self.sessions
.lock()
.map_err(|_| {
AppServiceError::new(
"auth",
AppServiceErrorKind::Unavailable,
"sessions_lock_poisoned",
"auth sessions lock poisoned",
)
})?
.remove(token);
info!(
target: "shelly.integration.app_service",
service = "auth",
operation = "sign_out",
token,
"Shelly app service operation executed"
);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobRequest {
pub job: String,
pub payload: Value,
pub idempotency_key: String,
pub scheduled_at_unix_ms: Option<u64>,
pub metadata: BTreeMap<String, String>,
}
impl BackgroundJobRequest {
pub fn new(job: impl Into<String>, payload: Value, idempotency_key: impl Into<String>) -> Self {
Self {
job: job.into(),
payload,
idempotency_key: idempotency_key.into(),
scheduled_at_unix_ms: None,
metadata: BTreeMap::new(),
}
}
pub fn schedule_at_unix_ms(mut self, scheduled_at_unix_ms: u64) -> Self {
self.scheduled_at_unix_ms = Some(scheduled_at_unix_ms);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobHandle {
pub id: String,
pub job: String,
pub idempotency_key: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum BackgroundJobState {
Queued,
Running,
Succeeded,
Failed,
Canceled,
}
impl From<JobState> for BackgroundJobState {
fn from(value: JobState) -> Self {
match value {
JobState::Queued => Self::Queued,
JobState::Running => Self::Running,
JobState::Succeeded => Self::Succeeded,
JobState::Failed => Self::Failed,
JobState::Canceled => Self::Canceled,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BackgroundJobStatus {
pub id: String,
pub state: BackgroundJobState,
pub attempts: u32,
pub result: Option<Value>,
pub error: Option<String>,
}
pub trait BackgroundJobService: Send + Sync {
fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle>;
fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus>;
}
#[derive(Clone)]
pub struct JobOrchestratorBackgroundJobs<T: JobOrchestrator> {
orchestrator: Arc<T>,
}
impl<T: JobOrchestrator> JobOrchestratorBackgroundJobs<T> {
pub fn new(orchestrator: Arc<T>) -> Self {
Self { orchestrator }
}
}
impl<T: JobOrchestrator> BackgroundJobService for JobOrchestratorBackgroundJobs<T> {
fn enqueue(&self, request: BackgroundJobRequest) -> AppServiceResult<BackgroundJobHandle> {
let mut job_request =
JobRequest::new(request.job, request.payload, request.idempotency_key);
job_request.metadata = request.metadata;
if let Some(scheduled_at_unix_ms) = request.scheduled_at_unix_ms {
job_request.metadata.insert(
"scheduled_at_unix_ms".to_string(),
scheduled_at_unix_ms.to_string(),
);
}
let handle = self
.orchestrator
.enqueue(job_request)
.map_err(map_job_error)?;
Ok(BackgroundJobHandle {
id: handle.id,
job: handle.workflow,
idempotency_key: handle.idempotency_key,
})
}
fn status(&self, id: &str) -> AppServiceResult<BackgroundJobStatus> {
let status = self.orchestrator.status(id).map_err(map_job_error)?;
Ok(BackgroundJobStatus {
id: status.id,
state: BackgroundJobState::from(status.state),
attempts: status.attempts,
result: status.result,
error: status.error.map(|err| err.to_string()),
})
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EmailMessage {
pub to: String,
pub subject: String,
pub text_body: String,
pub html_body: Option<String>,
pub headers: BTreeMap<String, String>,
}
impl EmailMessage {
pub fn new(
to: impl Into<String>,
subject: impl Into<String>,
text_body: impl Into<String>,
) -> Self {
Self {
to: to.into(),
subject: subject.into(),
text_body: text_body.into(),
html_body: None,
headers: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmailReceipt {
pub message_id: String,
pub provider: String,
pub accepted_at_unix_ms: u64,
}
pub trait TransactionalEmailService: Send + Sync {
fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SentEmail {
pub message: EmailMessage,
pub receipt: EmailReceipt,
}
pub struct InMemoryTransactionalEmailService {
provider: String,
next_id: Mutex<u64>,
sent: Mutex<Vec<SentEmail>>,
}
impl InMemoryTransactionalEmailService {
pub fn new(provider: impl Into<String>) -> Self {
Self {
provider: provider.into(),
next_id: Mutex::new(0),
sent: Mutex::new(Vec::new()),
}
}
pub fn sent(&self) -> Vec<SentEmail> {
self.sent
.lock()
.map(|entries| entries.clone())
.unwrap_or_default()
}
}
impl Default for InMemoryTransactionalEmailService {
fn default() -> Self {
Self::new("in-memory-mailer")
}
}
impl TransactionalEmailService for InMemoryTransactionalEmailService {
fn send(&self, message: EmailMessage) -> AppServiceResult<EmailReceipt> {
if message.to.trim().is_empty() {
return Err(AppServiceError::new(
"email",
AppServiceErrorKind::InvalidInput,
"empty_recipient",
"email recipient must not be empty",
));
}
let mut next_id = self.next_id.lock().map_err(|_| {
AppServiceError::new(
"email",
AppServiceErrorKind::Unavailable,
"email_id_lock_poisoned",
"email id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let message_id = format!("mail-{next_id}");
let receipt = EmailReceipt {
message_id: message_id.clone(),
provider: self.provider.clone(),
accepted_at_unix_ms: now_unix_ms(),
};
self.sent
.lock()
.map_err(|_| {
AppServiceError::new(
"email",
AppServiceErrorKind::Unavailable,
"email_log_lock_poisoned",
"email log lock poisoned",
)
})?
.push(SentEmail {
message: message.clone(),
receipt: receipt.clone(),
});
info!(
target: "shelly.integration.app_service",
service = "email",
operation = "send",
message_id = message_id.as_str(),
recipient = message.to.as_str(),
"Shelly app service operation executed"
);
Ok(receipt)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CacheEntry {
pub value: Value,
pub expires_at_unix_ms: Option<u64>,
}
impl CacheEntry {
pub fn persistent(value: Value) -> Self {
Self {
value,
expires_at_unix_ms: None,
}
}
pub fn with_ttl_ms(value: Value, ttl_ms: u64) -> Self {
Self {
value,
expires_at_unix_ms: Some(now_unix_ms().saturating_add(ttl_ms)),
}
}
}
pub trait CacheBackend: Send + Sync {
fn get(&self, key: &str) -> AppServiceResult<Option<Value>>;
fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()>;
fn delete(&self, key: &str) -> AppServiceResult<()>;
}
#[derive(Default)]
pub struct InMemoryCacheBackend {
entries: Mutex<BTreeMap<String, CacheEntry>>,
}
impl CacheBackend for InMemoryCacheBackend {
fn get(&self, key: &str) -> AppServiceResult<Option<Value>> {
let mut entries = self.entries.lock().map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?;
let Some(entry) = entries.get(key).cloned() else {
return Ok(None);
};
if entry
.expires_at_unix_ms
.is_some_and(|expires_at| now_unix_ms() >= expires_at)
{
entries.remove(key);
return Ok(None);
}
Ok(Some(entry.value))
}
fn set(&self, key: &str, entry: CacheEntry) -> AppServiceResult<()> {
self.entries
.lock()
.map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?
.insert(key.to_string(), entry);
Ok(())
}
fn delete(&self, key: &str) -> AppServiceResult<()> {
self.entries
.lock()
.map_err(|_| {
AppServiceError::new(
"cache",
AppServiceErrorKind::Unavailable,
"cache_lock_poisoned",
"cache lock poisoned",
)
})?
.remove(key);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct QueueMessage {
pub id: String,
pub topic: String,
pub payload: Value,
pub attempts: u32,
pub enqueued_at_unix_ms: u64,
pub scheduled_at_unix_ms: Option<u64>,
}
pub trait QueueBackend: Send + Sync {
fn enqueue(
&self,
topic: &str,
payload: Value,
scheduled_at_unix_ms: Option<u64>,
) -> AppServiceResult<QueueMessage>;
fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>>;
fn len(&self, topic: &str) -> AppServiceResult<usize>;
}
#[derive(Default)]
pub struct InMemoryQueueBackend {
messages: Mutex<BTreeMap<String, VecDeque<QueueMessage>>>,
next_id: Mutex<u64>,
}
impl QueueBackend for InMemoryQueueBackend {
fn enqueue(
&self,
topic: &str,
payload: Value,
scheduled_at_unix_ms: Option<u64>,
) -> AppServiceResult<QueueMessage> {
if topic.trim().is_empty() {
return Err(AppServiceError::new(
"queue",
AppServiceErrorKind::InvalidInput,
"empty_topic",
"queue topic must not be empty",
));
}
let mut next_id = self.next_id.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_id_lock_poisoned",
"queue id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let message = QueueMessage {
id: format!("queue-{next_id}"),
topic: topic.to_string(),
payload,
attempts: 0,
enqueued_at_unix_ms: now_unix_ms(),
scheduled_at_unix_ms,
};
self.messages
.lock()
.map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?
.entry(topic.to_string())
.or_default()
.push_back(message.clone());
Ok(message)
}
fn dequeue(&self, topic: &str) -> AppServiceResult<Option<QueueMessage>> {
let mut queues = self.messages.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?;
let Some(queue) = queues.get_mut(topic) else {
return Ok(None);
};
Ok(queue.pop_front().map(|mut message| {
message.attempts = message.attempts.saturating_add(1);
message
}))
}
fn len(&self, topic: &str) -> AppServiceResult<usize> {
let queues = self.messages.lock().map_err(|_| {
AppServiceError::new(
"queue",
AppServiceErrorKind::Unavailable,
"queue_lock_poisoned",
"queue lock poisoned",
)
})?;
Ok(queues.get(topic).map(VecDeque::len).unwrap_or(0))
}
}
fn map_job_error(err: IntegrationError) -> AppServiceError {
let kind = match err.kind {
IntegrationErrorKind::InvalidInput => AppServiceErrorKind::InvalidInput,
IntegrationErrorKind::Auth => AppServiceErrorKind::Unauthorized,
IntegrationErrorKind::Unavailable
| IntegrationErrorKind::Timeout
| IntegrationErrorKind::RateLimited
| IntegrationErrorKind::Transient => AppServiceErrorKind::Unavailable,
IntegrationErrorKind::Permanent => AppServiceErrorKind::Conflict,
};
AppServiceError::new(
"jobs",
kind,
err.code.unwrap_or_else(|| "job_error".to_string()),
err.message,
)
}
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::{
AppServiceErrorKind, AuthCredentials, BackgroundJobRequest, BackgroundJobService,
BackgroundJobState, CacheBackend, CacheEntry, EmailMessage, IdentityService,
InMemoryCacheBackend, InMemoryIdentityService, InMemoryQueueBackend,
InMemoryTransactionalEmailService, JobOrchestratorBackgroundJobs, QueueBackend,
TransactionalEmailService,
};
use crate::integrations::InMemoryJobOrchestrator;
use serde_json::json;
use std::sync::Arc;
#[test]
fn in_memory_identity_service_signs_in_and_signs_out() {
let service = InMemoryIdentityService::default();
let session = service
.sign_in(AuthCredentials::new("demo@shelly.dev", "shelly-demo"))
.unwrap();
let identity = service.session_identity(&session.token).unwrap().unwrap();
assert_eq!(identity.email, "demo@shelly.dev");
assert!(identity.roles.contains(&"admin".to_string()));
service.sign_out(&session.token).unwrap();
let missing = service.session_identity(&session.token).unwrap();
assert!(missing.is_none());
}
#[test]
fn in_memory_identity_service_rejects_invalid_credentials() {
let service = InMemoryIdentityService::default();
let err = service
.sign_in(AuthCredentials::new("demo@shelly.dev", "wrong-password"))
.unwrap_err();
assert_eq!(err.kind, AppServiceErrorKind::InvalidCredentials);
}
#[test]
fn in_memory_cache_backend_supports_set_get_delete_and_ttl() {
let cache = InMemoryCacheBackend::default();
cache
.set(
"sessions:1",
CacheEntry::persistent(json!({"active": true})),
)
.unwrap();
let value = cache.get("sessions:1").unwrap();
assert_eq!(value, Some(json!({"active": true})));
cache.delete("sessions:1").unwrap();
let cleared = cache.get("sessions:1").unwrap();
assert!(cleared.is_none());
cache
.set("temp", CacheEntry::with_ttl_ms(json!("stale"), 0))
.unwrap();
let expired = cache.get("temp").unwrap();
assert!(expired.is_none());
}
#[test]
fn in_memory_queue_backend_is_fifo_per_topic() {
let queue = InMemoryQueueBackend::default();
queue.enqueue("emails", json!({"id": 1}), None).unwrap();
queue.enqueue("emails", json!({"id": 2}), None).unwrap();
assert_eq!(queue.len("emails").unwrap(), 2);
let first = queue.dequeue("emails").unwrap().unwrap();
let second = queue.dequeue("emails").unwrap().unwrap();
assert_eq!(first.payload, json!({"id": 1}));
assert_eq!(second.payload, json!({"id": 2}));
assert_eq!(queue.len("emails").unwrap(), 0);
}
#[test]
fn transactional_email_service_records_sent_messages() {
let mailer = InMemoryTransactionalEmailService::default();
let message = EmailMessage::new("ada@example.test", "Welcome", "Hello Ada");
let receipt = mailer.send(message.clone()).unwrap();
assert_eq!(receipt.provider, "in-memory-mailer");
let sent = mailer.sent();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0].message, message);
assert_eq!(sent[0].receipt.message_id, receipt.message_id);
}
#[test]
fn background_job_service_wraps_job_orchestrator_contract() {
let orchestrator = Arc::new(InMemoryJobOrchestrator::default());
let jobs = JobOrchestratorBackgroundJobs::new(orchestrator.clone());
let handle = jobs
.enqueue(BackgroundJobRequest::new(
"send_digest",
json!({"account_id": 7}),
"send_digest:7",
))
.unwrap();
let queued = jobs.status(&handle.id).unwrap();
assert_eq!(queued.state, BackgroundJobState::Queued);
orchestrator
.mark_succeeded(&handle.id, json!({"emails_sent": 3}))
.unwrap();
let done = jobs.status(&handle.id).unwrap();
assert_eq!(done.state, BackgroundJobState::Succeeded);
assert_eq!(done.result, Some(json!({"emails_sent": 3})));
}
}