mod config;
mod email_handlers;
mod notification_handlers;
#[cfg(test)]
mod tests;
pub use config::OutboxWorkerConfig;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::errors::AppError;
use crate::repositories::{OutboxEvent, OutboxRepository};
use crate::services::{
AdminNotification, EmailService, NotificationService, NotificationSeverity, SettingsService,
};
use crate::utils::TokenCipher;
use email_handlers::process_email_event;
use notification_handlers::process_notification_event;
fn sanitize_error_for_log(error: &str) -> String {
const MAX_LEN: usize = 150;
let truncated = if error.chars().count() > MAX_LEN {
let mut value: String = error.chars().take(MAX_LEN).collect();
value.push_str("...(truncated)");
value
} else {
error.to_string()
};
let mut result = String::with_capacity(truncated.len());
let mut i = 0;
let chars: Vec<char> = truncated.chars().collect();
while i < chars.len() {
if chars[i] == '@' && i > 0 {
let start = result
.rfind(|c: char| c.is_whitespace() || c == '"' || c == '<' || c == '\'')
.map(|p| p + 1)
.unwrap_or(0);
result.truncate(start);
result.push_str("[REDACTED]");
while i < chars.len()
&& !chars[i].is_whitespace()
&& chars[i] != '>'
&& chars[i] != '"'
&& chars[i] != '\''
{
i += 1;
}
} else {
result.push(chars[i]);
i += 1;
}
}
result
}
#[derive(Clone)]
pub struct OutboxWorker {
outbox_repo: Arc<dyn OutboxRepository>,
email_service: Arc<dyn EmailService>,
notification_service: Arc<dyn NotificationService>,
config: OutboxWorkerConfig,
base_url: String,
token_cipher: TokenCipher,
settings_service: Option<Arc<SettingsService>>,
}
impl OutboxWorker {
pub fn new(
outbox_repo: Arc<dyn OutboxRepository>,
email_service: Arc<dyn EmailService>,
notification_service: Arc<dyn NotificationService>,
config: OutboxWorkerConfig,
base_url: String,
token_cipher: TokenCipher,
) -> Self {
Self {
outbox_repo,
email_service,
notification_service,
config,
base_url,
token_cipher,
settings_service: None,
}
}
pub fn with_settings(mut self, settings_service: Arc<SettingsService>) -> Self {
self.settings_service = Some(settings_service);
self
}
pub fn start(self, cancel_token: CancellationToken) -> JoinHandle<()> {
tokio::spawn(async move {
info!(
poll_interval = self.config.poll_interval_secs,
batch_size = self.config.batch_size,
"Outbox worker started"
);
let mut interval =
tokio::time::interval(Duration::from_secs(self.config.poll_interval_secs));
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Outbox worker shutting down gracefully");
break;
}
_ = interval.tick() => {
if let Err(e) = self.process_batch().await {
error!(error = %e, "Failed to process outbox batch");
}
}
}
}
info!("Outbox worker stopped");
})
}
async fn process_batch(&self) -> Result<(), AppError> {
let events = self
.outbox_repo
.fetch_pending(self.config.batch_size, self.config.lock_timeout_secs)
.await?;
if events.is_empty() {
return Ok(());
}
debug!(count = events.len(), "Processing outbox events");
const MAX_CONCURRENCY: usize = 8;
let concurrency = std::cmp::min(events.len(), MAX_CONCURRENCY);
let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
let mut join_set = tokio::task::JoinSet::new();
for event in events {
let permit = semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| AppError::Internal(anyhow::anyhow!("Semaphore closed")))?;
let worker = self.clone();
join_set.spawn(async move {
let _permit = permit;
let event_type = event.event_type.clone();
let result = worker.process_event(&event).await;
(event.id, event_type, result)
});
}
while let Some(joined) = join_set.join_next().await {
match joined {
Ok((event_id, event_type, Err(e))) => {
let safe_error = sanitize_error_for_log(&e.to_string());
warn!(
event_id = %event_id,
event_type = %event_type.as_str(),
error = %safe_error,
"Failed to process outbox event"
);
}
Ok((_event_id, _event_type, Ok(()))) => {}
Err(join_err) => {
error!(error = %join_err, "Outbox worker task panicked or was cancelled");
}
}
}
Ok(())
}
async fn process_event(&self, event: &OutboxEvent) -> Result<(), AppError> {
if event.delivered_at.is_some() {
debug!(
event_id = %event.id,
event_type = %event.event_type.as_str(),
"Skipping already-delivered event"
);
self.outbox_repo.mark_done(event.id).await?;
return Ok(());
}
self.outbox_repo.mark_processing(event.id).await?;
let current_attempts = event.attempts.saturating_add(1);
let delivery_timeout = Duration::from_secs(self.config.delivery_timeout_secs);
let result = if event.event_type.is_email() {
tokio::time::timeout(
delivery_timeout,
process_email_event(
event,
self.email_service.as_ref(),
&self.base_url,
&self.token_cipher,
self.settings_service.clone(),
),
)
.await
} else {
tokio::time::timeout(
delivery_timeout,
process_notification_event(event, self.notification_service.as_ref()),
)
.await
}
.map_err(|_| {
AppError::Internal(anyhow::anyhow!(
"Delivery timed out after {}s",
self.config.delivery_timeout_secs
))
})?;
match result {
Ok(()) => {
const DB_OP_TIMEOUT: Duration = Duration::from_secs(5);
tokio::time::timeout(DB_OP_TIMEOUT, self.outbox_repo.mark_delivered(event.id))
.await
.map_err(|_| {
AppError::Internal(anyhow::anyhow!(
"Timeout marking event {} as delivered",
event.id
))
})??;
tokio::time::timeout(DB_OP_TIMEOUT, self.outbox_repo.mark_done(event.id))
.await
.map_err(|_| {
AppError::Internal(anyhow::anyhow!(
"Timeout marking event {} as done",
event.id
))
})??;
debug!(
event_id = %event.id,
event_type = %event.event_type.as_str(),
"Outbox event processed successfully"
);
}
Err(e) => {
let error_msg = e.to_string();
let safe_error = sanitize_error_for_log(&error_msg);
if current_attempts >= event.max_attempts {
self.outbox_repo.mark_failed(event.id, &error_msg).await?;
error!(
event_id = %event.id,
event_type = %event.event_type.as_str(),
attempts = current_attempts,
error = %safe_error,
"Outbox event permanently failed"
);
let alert = AdminNotification::new(
NotificationSeverity::Critical,
"Outbox Event Failed",
&format!(
"Event {} ({}) failed after {} attempts: {}",
event.id,
event.event_type.as_str(),
current_attempts,
safe_error
),
);
let _ = self.notification_service.notify(alert).await;
} else {
self.outbox_repo.mark_retry(event.id, &error_msg).await?;
debug!(
event_id = %event.id,
event_type = %event.event_type.as_str(),
attempts = event.attempts,
error = %safe_error,
"Outbox event scheduled for retry"
);
}
return Err(e);
}
}
Ok(())
}
}