1use chrono::Utc;
9use peisear_core::notifications::{Severity, channel as channel_id};
10use peisear_storage::{Pool, notifications as notif_store};
11use tokio::sync::mpsc;
12
13use crate::channel::{ChannelSendError, send_via_channel};
14
15#[derive(Debug, Clone)]
19pub struct DispatchEvent {
20 pub user_id: String,
21 pub kind: String,
22 pub severity: Severity,
23 pub title: String,
24 pub body: String,
25 pub payload_json: Option<String>,
29}
30
31pub type DispatchTx = mpsc::Sender<DispatchEvent>;
34
35pub type DispatchRx = mpsc::Receiver<DispatchEvent>;
37
38pub const DISPATCH_CHANNEL_BUFFER: usize = 256;
45
46struct DispatchOutcome {
50 notification_id: Option<String>,
51 delivered_via: Vec<String>,
52 skipped: Option<&'static str>,
53}
54
55#[derive(Clone)]
59pub struct DispatchContext {
60 pub db: Pool,
61 pub smtp: Option<crate::config::SmtpConfig>,
62}
63
64pub async fn dispatch_loop(ctx: DispatchContext, mut rx: DispatchRx) {
68 tracing::info!("notification dispatch_loop started");
69
70 while let Some(event) = rx.recv().await {
71 let outcome = match process_event(&ctx, &event).await {
72 Ok(o) => o,
73 Err(err) => {
74 tracing::error!(
75 error = %err,
76 user_id = %event.user_id,
77 kind = %event.kind,
78 "dispatch_loop: process_event failed",
79 );
80 continue;
81 }
82 };
83
84 if let Some(reason) = outcome.skipped {
85 tracing::debug!(
86 user_id = %event.user_id,
87 kind = %event.kind,
88 %reason,
89 "dispatch_loop: skipped",
90 );
91 } else {
92 tracing::debug!(
93 notification_id = ?outcome.notification_id,
94 user_id = %event.user_id,
95 kind = %event.kind,
96 via = ?outcome.delivered_via,
97 "dispatch_loop: delivered",
98 );
99 }
100 }
101
102 tracing::info!("notification dispatch_loop shutting down (channel closed)");
103}
104
105async fn process_event(
112 ctx: &DispatchContext,
113 event: &DispatchEvent,
114) -> Result<DispatchOutcome, peisear_storage::StorageError> {
115 let pref = notif_store::preference_for_user_kind(&ctx.db, &event.user_id, &event.kind).await?;
116 let global = notif_store::global_preference(&ctx.db, &event.user_id).await?;
117
118 let (channels, min_severity) = effective_channels_and_min_severity(pref, global);
119
120 if !event.severity.meets_minimum(min_severity) {
121 return Ok(DispatchOutcome {
122 notification_id: None,
123 delivered_via: Vec::new(),
124 skipped: Some("severity below preference minimum"),
125 });
126 }
127
128 if channels.is_empty() {
129 return Ok(DispatchOutcome {
130 notification_id: None,
131 delivered_via: Vec::new(),
132 skipped: Some("user has no channels for this kind"),
133 });
134 }
135
136 let last = notif_store::last_dispatched_at_for_user_kind(&ctx.db, &event.user_id, &event.kind)
137 .await?;
138 if let Some(last_at) = last {
139 let elapsed_hours = (Utc::now() - last_at).num_hours();
140 if elapsed_hours < peisear_core::notifications::COOLDOWN_HOURS {
141 return Ok(DispatchOutcome {
142 notification_id: None,
143 delivered_via: Vec::new(),
144 skipped: Some("inside cooldown window"),
145 });
146 }
147 }
148
149 let user_email = if channels.contains(&channel_id::EMAIL) && ctx.smtp.is_some() {
153 peisear_storage::users::find_by_id(&ctx.db, &event.user_id)
154 .await?
155 .map(|u| u.email)
156 } else {
157 None
158 };
159
160 let mut delivered: Vec<String> = Vec::with_capacity(channels.len());
161 for chan in &channels {
162 match send_via_channel(chan, event, ctx.smtp.as_ref(), user_email.as_deref()).await {
163 Ok(()) => delivered.push((*chan).to_string()),
164 Err(ChannelSendError::Skipped(reason)) => {
165 tracing::debug!(
168 channel = chan,
169 user_id = %event.user_id,
170 kind = %event.kind,
171 reason = reason,
172 "dispatch_loop: channel skipped",
173 );
174 }
175 Err(err) => {
176 tracing::warn!(
177 channel = chan,
178 user_id = %event.user_id,
179 kind = %event.kind,
180 error = %err,
181 "dispatch_loop: channel send failed",
182 );
183 }
184 }
185 }
186
187 let delivered_refs: Vec<&str> = delivered.iter().map(|s| s.as_str()).collect();
188 let id = notif_store::insert(
189 &ctx.db,
190 &event.user_id,
191 &event.kind,
192 event.severity,
193 &event.title,
194 &event.body,
195 event.payload_json.as_deref(),
196 &delivered_refs,
197 )
198 .await?;
199
200 Ok(DispatchOutcome {
201 notification_id: Some(id),
202 delivered_via: delivered,
203 skipped: None,
204 })
205}
206
207fn effective_channels_and_min_severity(
212 pref: Option<peisear_core::notifications::Preference>,
213 global: Option<peisear_core::notifications::Preference>,
214) -> (Vec<&'static str>, Severity) {
215 if let Some(p) = pref {
216 let chans: Vec<&'static str> = p
217 .channels
218 .iter()
219 .filter_map(|c| match c.as_str() {
220 channel_id::IN_APP => Some(channel_id::IN_APP),
221 channel_id::EMAIL => Some(channel_id::EMAIL),
222 channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
223 _ => None,
224 })
225 .collect();
226 return (chans, p.min_severity);
227 }
228
229 let global_channels: Vec<&'static str> = match global {
230 Some(g) => g
231 .channels
232 .iter()
233 .filter_map(|c| match c.as_str() {
234 channel_id::IN_APP => Some(channel_id::IN_APP),
235 channel_id::EMAIL => Some(channel_id::EMAIL),
236 channel_id::WEBHOOK => Some(channel_id::WEBHOOK),
237 _ => None,
238 })
239 .collect(),
240 None => peisear_core::notifications::DEFAULT_CHANNELS.to_vec(),
241 };
242
243 (global_channels, peisear_core::notifications::DEFAULT_MIN_SEVERITY)
244}