Skip to main content

peisear_notify/
dispatch.rs

1//! Dispatch loop. Drains [`DispatchEvent`]s, applies filters,
2//! persists the audit row, fans out to channels.
3//!
4//! Moved from `peisear-web::notifications::mod` in 0.16.0. Refer
5//! to the crate-level docs in `lib.rs` for the architecture
6//! diagram (detection → mpsc → dispatch_loop → channel send).
7
8use chrono::Utc;
9use peisear_core::notifications::{Severity, channel as channel_id};
10use peisear_storage::{Pool, notifications as notif_store};
11use tokio::sync::mpsc;
12
13use crate::channel::{ChannelSendError, send_via_channel};
14
15/// One thing to potentially notify about. Built by callers
16/// (typically a snapshot/jobs loop in `peisear-web`); consumed
17/// by [`dispatch_loop`].
18#[derive(Debug, Clone)]
19pub struct DispatchEvent {
20    pub user_id: String,
21    pub kind: String,
22    pub severity: Severity,
23    pub title: String,
24    pub body: String,
25    /// Free-form JSON payload. Today we keep this `None`;
26    /// later kinds (project trend, digests) attach structured
27    /// detail here.
28    pub payload_json: Option<String>,
29}
30
31/// Sender side of the dispatch channel. Held by callers so
32/// they can enqueue events.
33pub type DispatchTx = mpsc::Sender<DispatchEvent>;
34
35/// Receiver side. Owned by [`dispatch_loop`].
36pub type DispatchRx = mpsc::Receiver<DispatchEvent>;
37
38/// Channel buffer size. Sized for the worst-case "every active
39/// user produces one event in a single tick"; a 256-buffer
40/// covers small-to-medium teams without ever back-pressuring
41/// the snapshot loop. If a deployment grows beyond this, the
42/// fix is per-user back-pressure or batching, not a bigger
43/// number.
44pub const DISPATCH_CHANNEL_BUFFER: usize = 256;
45
46/// Result of attempting to dispatch one event. Used both for
47/// logging and to record which channels were successful in the
48/// `notifications.dispatched_via` audit column.
49struct DispatchOutcome {
50    notification_id: Option<String>,
51    delivered_via: Vec<String>,
52    skipped: Option<&'static str>,
53}
54
55/// Configuration passed into the dispatch loop. Carries
56/// references to backing services that the channel layer needs
57/// (today: the SMTP config; tomorrow: webhook HTTP client, etc.).
58#[derive(Clone)]
59pub struct DispatchContext {
60    pub db: Pool,
61    pub smtp: Option<crate::config::SmtpConfig>,
62}
63
64/// The dispatch loop. Receives events, applies filters,
65/// persists rows, fans out to channels. Runs until its
66/// `Receiver` is closed.
67pub async fn dispatch_loop(ctx: DispatchContext, mut rx: DispatchRx) {
68    tracing::info!("notification dispatch_loop started");
69
70    while let Some(event) = rx.recv().await {
71        let outcome = match process_event(&ctx, &event).await {
72            Ok(o) => o,
73            Err(err) => {
74                tracing::error!(
75                    error = %err,
76                    user_id = %event.user_id,
77                    kind = %event.kind,
78                    "dispatch_loop: process_event failed",
79                );
80                continue;
81            }
82        };
83
84        if let Some(reason) = outcome.skipped {
85            tracing::debug!(
86                user_id = %event.user_id,
87                kind = %event.kind,
88                %reason,
89                "dispatch_loop: skipped",
90            );
91        } else {
92            tracing::debug!(
93                notification_id = ?outcome.notification_id,
94                user_id = %event.user_id,
95                kind = %event.kind,
96                via = ?outcome.delivered_via,
97                "dispatch_loop: delivered",
98            );
99        }
100    }
101
102    tracing::info!("notification dispatch_loop shutting down (channel closed)");
103}
104
105/// Process a single event:
106///   1. Look up the user's preference for this kind (or default).
107///   2. Filter on `min_severity` and `channels`.
108///   3. Apply cooldown.
109///   4. Fan out to channels.
110///   5. Persist a notification row with the actual delivered_via.
111async fn process_event(
112    ctx: &DispatchContext,
113    event: &DispatchEvent,
114) -> Result<DispatchOutcome, peisear_storage::StorageError> {
115    let pref = notif_store::preference_for_user_kind(&ctx.db, &event.user_id, &event.kind).await?;
116    let global = notif_store::global_preference(&ctx.db, &event.user_id).await?;
117
118    let (channels, min_severity) = effective_channels_and_min_severity(pref, global);
119
120    if !event.severity.meets_minimum(min_severity) {
121        return Ok(DispatchOutcome {
122            notification_id: None,
123            delivered_via: Vec::new(),
124            skipped: Some("severity below preference minimum"),
125        });
126    }
127
128    if channels.is_empty() {
129        return Ok(DispatchOutcome {
130            notification_id: None,
131            delivered_via: Vec::new(),
132            skipped: Some("user has no channels for this kind"),
133        });
134    }
135
136    let last = notif_store::last_dispatched_at_for_user_kind(&ctx.db, &event.user_id, &event.kind)
137        .await?;
138    if let Some(last_at) = last {
139        let elapsed_hours = (Utc::now() - last_at).num_hours();
140        if elapsed_hours < peisear_core::notifications::COOLDOWN_HOURS {
141            return Ok(DispatchOutcome {
142                notification_id: None,
143                delivered_via: Vec::new(),
144                skipped: Some("inside cooldown window"),
145            });
146        }
147    }
148
149    // We need the user's email for email channel sends.
150    // Fetch once if email is in the channel list and we have
151    // SMTP config — otherwise we can skip the lookup.
152    let user_email = if channels.contains(&channel_id::EMAIL) && ctx.smtp.is_some() {
153        peisear_storage::users::find_by_id(&ctx.db, &event.user_id)
154            .await?
155            .map(|u| u.email)
156    } else {
157        None
158    };
159
160    let mut delivered: Vec<String> = Vec::with_capacity(channels.len());
161    for chan in &channels {
162        match send_via_channel(chan, event, ctx.smtp.as_ref(), user_email.as_deref()).await {
163            Ok(()) => delivered.push((*chan).to_string()),
164            Err(ChannelSendError::Skipped(reason)) => {
165                // Channel was unavailable (e.g. SMTP not configured).
166                // Log at debug since this is expected in dev/eval setups.
167                tracing::debug!(
168                    channel = chan,
169                    user_id = %event.user_id,
170                    kind = %event.kind,
171                    reason = reason,
172                    "dispatch_loop: channel skipped",
173                );
174            }
175            Err(err) => {
176                tracing::warn!(
177                    channel = chan,
178                    user_id = %event.user_id,
179                    kind = %event.kind,
180                    error = %err,
181                    "dispatch_loop: channel send failed",
182                );
183            }
184        }
185    }
186
187    let delivered_refs: Vec<&str> = delivered.iter().map(|s| s.as_str()).collect();
188    let id = notif_store::insert(
189        &ctx.db,
190        &event.user_id,
191        &event.kind,
192        event.severity,
193        &event.title,
194        &event.body,
195        event.payload_json.as_deref(),
196        &delivered_refs,
197    )
198    .await?;
199
200    Ok(DispatchOutcome {
201        notification_id: Some(id),
202        delivered_via: delivered,
203        skipped: None,
204    })
205}
206
207/// Resolve the effective channel list and minimum severity for
208/// dispatch. Applies the per-kind preference if present;
209/// otherwise falls back to the global preference (for the
210/// channel list) and the system default minimum severity.
211fn effective_channels_and_min_severity(
212    pref: Option<peisear_core::notifications::Preference>,
213    global: Option<peisear_core::notifications::Preference>,
214) -> (Vec<&'static str>, Severity) {
215    if let Some(p) = pref {
216        let chans: Vec<&'static str> = p
217            .channels
218            .iter()
219            .filter_map(|c| match c.as_str() {
220                channel_id::IN_APP => Some(channel_id::IN_APP),
221                channel_id::EMAIL => Some(channel_id::EMAIL),
222                channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
223                _ => None,
224            })
225            .collect();
226        return (chans, p.min_severity);
227    }
228
229    let global_channels: Vec<&'static str> = match global {
230        Some(g) => g
231            .channels
232            .iter()
233            .filter_map(|c| match c.as_str() {
234                channel_id::IN_APP => Some(channel_id::IN_APP),
235                channel_id::EMAIL => Some(channel_id::EMAIL),
236                channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
237                _ => None,
238            })
239            .collect(),
240        None => peisear_core::notifications::DEFAULT_CHANNELS.to_vec(),
241    };
242
243    (global_channels, peisear_core::notifications::DEFAULT_MIN_SEVERITY)
244}