use chrono::Utc;
use peisear_core::notifications::{Severity, channel as channel_id};
use peisear_storage::{Pool, notifications as notif_store};
use tokio::sync::mpsc;
use crate::channel::{ChannelSendError, send_via_channel};
#[derive(Debug, Clone)]
pub struct DispatchEvent {
pub user_id: String,
pub kind: String,
pub severity: Severity,
pub title: String,
pub body: String,
pub payload_json: Option<String>,
}
pub type DispatchTx = mpsc::Sender<DispatchEvent>;
pub type DispatchRx = mpsc::Receiver<DispatchEvent>;
pub const DISPATCH_CHANNEL_BUFFER: usize = 256;
struct DispatchOutcome {
notification_id: Option<String>,
delivered_via: Vec<String>,
skipped: Option<&'static str>,
}
#[derive(Clone)]
pub struct DispatchContext {
pub db: Pool,
pub smtp: Option<crate::config::SmtpConfig>,
}
pub async fn dispatch_loop(ctx: DispatchContext, mut rx: DispatchRx) {
tracing::info!("notification dispatch_loop started");
while let Some(event) = rx.recv().await {
let outcome = match process_event(&ctx, &event).await {
Ok(o) => o,
Err(err) => {
tracing::error!(
error = %err,
user_id = %event.user_id,
kind = %event.kind,
"dispatch_loop: process_event failed",
);
continue;
}
};
if let Some(reason) = outcome.skipped {
tracing::debug!(
user_id = %event.user_id,
kind = %event.kind,
%reason,
"dispatch_loop: skipped",
);
} else {
tracing::debug!(
notification_id = ?outcome.notification_id,
user_id = %event.user_id,
kind = %event.kind,
via = ?outcome.delivered_via,
"dispatch_loop: delivered",
);
}
}
tracing::info!("notification dispatch_loop shutting down (channel closed)");
}
async fn process_event(
ctx: &DispatchContext,
event: &DispatchEvent,
) -> Result<DispatchOutcome, peisear_storage::StorageError> {
let pref = notif_store::preference_for_user_kind(&ctx.db, &event.user_id, &event.kind).await?;
let global = notif_store::global_preference(&ctx.db, &event.user_id).await?;
let (channels, min_severity) = effective_channels_and_min_severity(pref, global);
if !event.severity.meets_minimum(min_severity) {
return Ok(DispatchOutcome {
notification_id: None,
delivered_via: Vec::new(),
skipped: Some("severity below preference minimum"),
});
}
if channels.is_empty() {
return Ok(DispatchOutcome {
notification_id: None,
delivered_via: Vec::new(),
skipped: Some("user has no channels for this kind"),
});
}
let last = notif_store::last_dispatched_at_for_user_kind(&ctx.db, &event.user_id, &event.kind)
.await?;
if let Some(last_at) = last {
let elapsed_hours = (Utc::now() - last_at).num_hours();
if elapsed_hours < peisear_core::notifications::COOLDOWN_HOURS {
return Ok(DispatchOutcome {
notification_id: None,
delivered_via: Vec::new(),
skipped: Some("inside cooldown window"),
});
}
}
let user_email = if channels.contains(&channel_id::EMAIL) && ctx.smtp.is_some() {
peisear_storage::users::find_by_id(&ctx.db, &event.user_id)
.await?
.map(|u| u.email)
} else {
None
};
let mut delivered: Vec<String> = Vec::with_capacity(channels.len());
for chan in &channels {
match send_via_channel(chan, event, ctx.smtp.as_ref(), user_email.as_deref()).await {
Ok(()) => delivered.push((*chan).to_string()),
Err(ChannelSendError::Skipped(reason)) => {
tracing::debug!(
channel = chan,
user_id = %event.user_id,
kind = %event.kind,
reason = reason,
"dispatch_loop: channel skipped",
);
}
Err(err) => {
tracing::warn!(
channel = chan,
user_id = %event.user_id,
kind = %event.kind,
error = %err,
"dispatch_loop: channel send failed",
);
}
}
}
let delivered_refs: Vec<&str> = delivered.iter().map(|s| s.as_str()).collect();
let id = notif_store::insert(
&ctx.db,
&event.user_id,
&event.kind,
event.severity,
&event.title,
&event.body,
event.payload_json.as_deref(),
&delivered_refs,
)
.await?;
Ok(DispatchOutcome {
notification_id: Some(id),
delivered_via: delivered,
skipped: None,
})
}
fn effective_channels_and_min_severity(
pref: Option<peisear_core::notifications::Preference>,
global: Option<peisear_core::notifications::Preference>,
) -> (Vec<&'static str>, Severity) {
if let Some(p) = pref {
let chans: Vec<&'static str> = p
.channels
.iter()
.filter_map(|c| match c.as_str() {
channel_id::IN_APP => Some(channel_id::IN_APP),
channel_id::EMAIL => Some(channel_id::EMAIL),
channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
_ => None,
})
.collect();
return (chans, p.min_severity);
}
let global_channels: Vec<&'static str> = match global {
Some(g) => g
.channels
.iter()
.filter_map(|c| match c.as_str() {
channel_id::IN_APP => Some(channel_id::IN_APP),
channel_id::EMAIL => Some(channel_id::EMAIL),
channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
_ => None,
})
.collect(),
None => peisear_core::notifications::DEFAULT_CHANNELS.to_vec(),
};
(global_channels, peisear_core::notifications::DEFAULT_MIN_SEVERITY)
}