Skip to main content

chipa_webhooks/
dispatcher.rs

1use std::sync::Arc;
2
3use futures::future::join_all;
4use kanal::{AsyncReceiver, AsyncSender};
5use serde::Serialize;
6use serde_json::Value;
7use tokio::sync::oneshot;
8
9use crate::{
10    error::WebhookError,
11    hints::{WithHints, extract_hints},
12    matcher::MatcherRegistry,
13    platform::{Platform, post},
14    template::TemplateEngine,
15};
16
17enum DispatchJob {
18    Send { template_name: String, data: Value },
19    Flush { done_tx: oneshot::Sender<()> },
20}
21
22pub struct Destination {
23    pub name: String,
24    pub platform: Arc<dyn Platform>,
25}
26
27impl Destination {
28    pub fn new(name: impl Into<String>, platform: impl Platform + 'static) -> Self {
29        Self {
30            name: name.into(),
31            platform: Arc::new(platform),
32        }
33    }
34}
35
36pub struct WebhookDispatcher {
37    sender: AsyncSender<DispatchJob>,
38    done_rx: oneshot::Receiver<()>,
39    matcher: MatcherRegistry,
40    engine: TemplateEngine,
41}
42
43impl WebhookDispatcher {
44    pub fn builder() -> WebhookDispatcherBuilder {
45        WebhookDispatcherBuilder::new()
46    }
47
48    /// Register a matcher rule for type `T`. When `send` is called with a `T`,
49    /// this closure decides which template name to use.
50    pub fn register_rule<T: 'static>(
51        &mut self,
52        rule: impl Fn(&T) -> &'static str + Send + Sync + 'static,
53    ) {
54        self.matcher.register(rule);
55    }
56
57    /// Register a new template at runtime.
58    pub fn register_template(&self, name: &str, template: &str) -> Result<(), WebhookError> {
59        self.engine.register(name, template)
60    }
61
62    /// Overwrite an existing template at runtime without restarting the dispatcher.
63    pub fn update_template(&self, name: &str, template: &str) -> Result<(), WebhookError> {
64        self.engine.update(name, template)
65    }
66
67    /// Remove a template at runtime.
68    pub fn remove_template(&self, name: &str) {
69        self.engine.remove(name);
70    }
71
72    /// Send using the internal matcher to resolve the template name.
73    pub async fn send<T: Serialize + 'static>(&self, event: &T) -> Result<(), WebhookError> {
74        let template_name = self.matcher.resolve(event).to_owned();
75        self.push(template_name, event, None).await
76    }
77
78    /// Send using the internal matcher, with platform hints (color, title, etc).
79    pub async fn send_with_hints<T: Serialize + 'static>(
80        &self,
81        event: &T,
82        hints: WithHints,
83    ) -> Result<(), WebhookError> {
84        let template_name = self.matcher.resolve(event).to_owned();
85        self.push(template_name, event, Some(hints)).await
86    }
87
88    /// Send to an explicit template, bypassing the matcher entirely.
89    pub async fn send_with_template<T: Serialize>(
90        &self,
91        template_name: &str,
92        event: &T,
93    ) -> Result<(), WebhookError> {
94        self.push(template_name.to_owned(), event, None).await
95    }
96
97    /// Send to an explicit template with platform hints.
98    pub async fn send_with_template_and_hints<T: Serialize>(
99        &self,
100        template_name: &str,
101        event: &T,
102        hints: WithHints,
103    ) -> Result<(), WebhookError> {
104        self.push(template_name.to_owned(), event, Some(hints))
105            .await
106    }
107
108    /// Waits for all currently queued jobs to finish processing before returning.
109    /// Use this between phases where template mutations depend on prior sends completing.
110    pub async fn flush(&self) -> Result<(), WebhookError> {
111        let (done_tx, done_rx) = oneshot::channel();
112        self.sender
113            .send(DispatchJob::Flush { done_tx })
114            .await
115            .map_err(|_| WebhookError::ChannelClosed)?;
116        let _ = done_rx.await;
117        Ok(())
118    }
119
120    /// Closes the send side of the channel, drains all queued jobs, then resolves.
121    /// Call this during graceful shutdown to ensure no messages are lost.
122    pub async fn shutdown(self) {
123        drop(self.sender);
124        let _ = self.done_rx.await;
125    }
126
127    async fn push<T: Serialize>(
128        &self,
129        template_name: String,
130        event: &T,
131        hints: Option<WithHints>,
132    ) -> Result<(), WebhookError> {
133        if !self.engine.has_template(&template_name) {
134            return Err(WebhookError::TemplateNotFound(template_name));
135        }
136
137        let mut data = serde_json::to_value(event)?;
138
139        if let Some(h) = hints {
140            if let Some(obj) = data.as_object_mut() {
141                obj.extend(h.map);
142            }
143        }
144
145        self.sender
146            .send(DispatchJob::Send {
147                template_name,
148                data,
149            })
150            .await
151            .map_err(|_| WebhookError::ChannelClosed)
152    }
153}
154
155pub struct WebhookDispatcherBuilder {
156    templates: Vec<(String, String)>,
157    destinations: Vec<Destination>,
158    default_template: &'static str,
159    capacity: usize,
160    on_error: Option<Arc<dyn Fn(WebhookError) + Send + Sync>>,
161}
162
163impl WebhookDispatcherBuilder {
164    pub fn new() -> Self {
165        Self {
166            templates: Vec::new(),
167            destinations: Vec::new(),
168            default_template: "default",
169            capacity: 1024,
170            on_error: None,
171        }
172    }
173
174    pub fn template(mut self, name: impl Into<String>, template: impl Into<String>) -> Self {
175        self.templates.push((name.into(), template.into()));
176        self
177    }
178
179    pub fn destination(mut self, dest: Destination) -> Self {
180        self.destinations.push(dest);
181        self
182    }
183
184    pub fn default_template(mut self, name: &'static str) -> Self {
185        self.default_template = name;
186        self
187    }
188
189    pub fn capacity(mut self, capacity: usize) -> Self {
190        self.capacity = capacity;
191        self
192    }
193
194    pub fn on_error(mut self, handler: impl Fn(WebhookError) + Send + Sync + 'static) -> Self {
195        self.on_error = Some(Arc::new(handler));
196        self
197    }
198
199    pub fn build(self) -> Result<WebhookDispatcher, WebhookError> {
200        let engine = TemplateEngine::new();
201        for (name, template) in &self.templates {
202            engine.register(name, template)?;
203        }
204
205        let matcher = MatcherRegistry::new(self.default_template);
206
207        let (sender, receiver) = kanal::bounded_async(self.capacity);
208        let (done_tx, done_rx) = oneshot::channel();
209
210        let destinations: Arc<Vec<Destination>> = Arc::new(self.destinations);
211        let on_error = self.on_error;
212
213        tokio::spawn(dispatch_loop(
214            receiver,
215            engine.clone(),
216            destinations,
217            on_error,
218            done_tx,
219        ));
220
221        Ok(WebhookDispatcher {
222            sender,
223            done_rx,
224            matcher,
225            engine,
226        })
227    }
228}
229
230impl Default for WebhookDispatcherBuilder {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236async fn dispatch_loop(
237    receiver: AsyncReceiver<DispatchJob>,
238    engine: TemplateEngine,
239    destinations: Arc<Vec<Destination>>,
240    on_error: Option<Arc<dyn Fn(WebhookError) + Send + Sync>>,
241    done_tx: oneshot::Sender<()>,
242) {
243    let client = reqwest::Client::new();
244
245    while let Ok(job) = receiver.recv().await {
246        match job {
247            DispatchJob::Flush { done_tx } => {
248                let _ = done_tx.send(());
249            }
250            DispatchJob::Send {
251                template_name,
252                mut data,
253            } => {
254                let hints = extract_hints(&mut data);
255
256                let rendered = match engine.render(&template_name, &data) {
257                    Ok(r) => r,
258                    Err(e) => {
259                        report_error(&on_error, e);
260                        continue;
261                    }
262                };
263
264                let rendered = Arc::new(rendered);
265                let hints = Arc::new(hints);
266
267                let futs = destinations.iter().map(|dest| {
268                    let client = client.clone();
269                    let rendered = Arc::clone(&rendered);
270                    let hints = Arc::clone(&hints);
271                    let dest_name = dest.name.clone();
272                    let on_error = on_error.clone();
273                    let platform = Arc::clone(&dest.platform);
274
275                    async move {
276                        if let Err(e) =
277                            post(&client, platform.as_ref(), &rendered, &hints, &dest_name).await
278                        {
279                            report_error(&on_error, e);
280                        }
281                    }
282                });
283
284                join_all(futs).await;
285            }
286        }
287    }
288
289    // Channel drained and closed — signal graceful shutdown complete.
290    let _ = done_tx.send(());
291}
292
293fn report_error(on_error: &Option<Arc<dyn Fn(WebhookError) + Send + Sync>>, e: WebhookError) {
294    if let Some(handler) = on_error {
295        handler(e);
296    } else {
297        tracing::warn!(error = %e, "webhook dispatch error");
298    }
299}