1use std::sync::Arc;
2
3use futures::future::join_all;
4use kanal::{AsyncReceiver, AsyncSender};
5use serde::Serialize;
6use serde_json::Value;
7use tokio::sync::oneshot;
8
9use crate::{
10 error::WebhookError,
11 hints::{WithHints, extract_hints},
12 matcher::MatcherRegistry,
13 platform::{Platform, post},
14 template::TemplateEngine,
15};
16
17enum DispatchJob {
18 Send { template_name: String, data: Value },
19 Flush { done_tx: oneshot::Sender<()> },
20}
21
22pub struct Destination {
23 pub name: String,
24 pub platform: Arc<dyn Platform>,
25}
26
27impl Destination {
28 pub fn new(name: impl Into<String>, platform: impl Platform + 'static) -> Self {
29 Self {
30 name: name.into(),
31 platform: Arc::new(platform),
32 }
33 }
34}
35
36pub struct WebhookDispatcher {
37 sender: AsyncSender<DispatchJob>,
38 done_rx: oneshot::Receiver<()>,
39 matcher: MatcherRegistry,
40 engine: TemplateEngine,
41}
42
43impl WebhookDispatcher {
44 pub fn builder() -> WebhookDispatcherBuilder {
45 WebhookDispatcherBuilder::new()
46 }
47
48 pub fn register_rule<T: 'static>(
51 &mut self,
52 rule: impl Fn(&T) -> &'static str + Send + Sync + 'static,
53 ) {
54 self.matcher.register(rule);
55 }
56
57 pub fn register_template(&self, name: &str, template: &str) -> Result<(), WebhookError> {
59 self.engine.register(name, template)
60 }
61
62 pub fn update_template(&self, name: &str, template: &str) -> Result<(), WebhookError> {
64 self.engine.update(name, template)
65 }
66
67 pub fn remove_template(&self, name: &str) {
69 self.engine.remove(name);
70 }
71
72 pub async fn send<T: Serialize + 'static>(&self, event: &T) -> Result<(), WebhookError> {
74 let template_name = self.matcher.resolve(event).to_owned();
75 self.push(template_name, event, None).await
76 }
77
78 pub async fn send_with_hints<T: Serialize + 'static>(
80 &self,
81 event: &T,
82 hints: WithHints,
83 ) -> Result<(), WebhookError> {
84 let template_name = self.matcher.resolve(event).to_owned();
85 self.push(template_name, event, Some(hints)).await
86 }
87
88 pub async fn send_with_template<T: Serialize>(
90 &self,
91 template_name: &str,
92 event: &T,
93 ) -> Result<(), WebhookError> {
94 self.push(template_name.to_owned(), event, None).await
95 }
96
97 pub async fn send_with_template_and_hints<T: Serialize>(
99 &self,
100 template_name: &str,
101 event: &T,
102 hints: WithHints,
103 ) -> Result<(), WebhookError> {
104 self.push(template_name.to_owned(), event, Some(hints))
105 .await
106 }
107
108 pub async fn flush(&self) -> Result<(), WebhookError> {
111 let (done_tx, done_rx) = oneshot::channel();
112 self.sender
113 .send(DispatchJob::Flush { done_tx })
114 .await
115 .map_err(|_| WebhookError::ChannelClosed)?;
116 let _ = done_rx.await;
117 Ok(())
118 }
119
120 pub async fn shutdown(self) {
123 drop(self.sender);
124 let _ = self.done_rx.await;
125 }
126
127 async fn push<T: Serialize>(
128 &self,
129 template_name: String,
130 event: &T,
131 hints: Option<WithHints>,
132 ) -> Result<(), WebhookError> {
133 if !self.engine.has_template(&template_name) {
134 return Err(WebhookError::TemplateNotFound(template_name));
135 }
136
137 let mut data = serde_json::to_value(event)?;
138
139 if let Some(h) = hints {
140 if let Some(obj) = data.as_object_mut() {
141 obj.extend(h.map);
142 }
143 }
144
145 self.sender
146 .send(DispatchJob::Send {
147 template_name,
148 data,
149 })
150 .await
151 .map_err(|_| WebhookError::ChannelClosed)
152 }
153}
154
155pub struct WebhookDispatcherBuilder {
156 templates: Vec<(String, String)>,
157 destinations: Vec<Destination>,
158 default_template: &'static str,
159 capacity: usize,
160 on_error: Option<Arc<dyn Fn(WebhookError) + Send + Sync>>,
161}
162
163impl WebhookDispatcherBuilder {
164 pub fn new() -> Self {
165 Self {
166 templates: Vec::new(),
167 destinations: Vec::new(),
168 default_template: "default",
169 capacity: 1024,
170 on_error: None,
171 }
172 }
173
174 pub fn template(mut self, name: impl Into<String>, template: impl Into<String>) -> Self {
175 self.templates.push((name.into(), template.into()));
176 self
177 }
178
179 pub fn destination(mut self, dest: Destination) -> Self {
180 self.destinations.push(dest);
181 self
182 }
183
184 pub fn default_template(mut self, name: &'static str) -> Self {
185 self.default_template = name;
186 self
187 }
188
189 pub fn capacity(mut self, capacity: usize) -> Self {
190 self.capacity = capacity;
191 self
192 }
193
194 pub fn on_error(mut self, handler: impl Fn(WebhookError) + Send + Sync + 'static) -> Self {
195 self.on_error = Some(Arc::new(handler));
196 self
197 }
198
199 pub fn build(self) -> Result<WebhookDispatcher, WebhookError> {
200 let engine = TemplateEngine::new();
201 for (name, template) in &self.templates {
202 engine.register(name, template)?;
203 }
204
205 let matcher = MatcherRegistry::new(self.default_template);
206
207 let (sender, receiver) = kanal::bounded_async(self.capacity);
208 let (done_tx, done_rx) = oneshot::channel();
209
210 let destinations: Arc<Vec<Destination>> = Arc::new(self.destinations);
211 let on_error = self.on_error;
212
213 tokio::spawn(dispatch_loop(
214 receiver,
215 engine.clone(),
216 destinations,
217 on_error,
218 done_tx,
219 ));
220
221 Ok(WebhookDispatcher {
222 sender,
223 done_rx,
224 matcher,
225 engine,
226 })
227 }
228}
229
230impl Default for WebhookDispatcherBuilder {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236async fn dispatch_loop(
237 receiver: AsyncReceiver<DispatchJob>,
238 engine: TemplateEngine,
239 destinations: Arc<Vec<Destination>>,
240 on_error: Option<Arc<dyn Fn(WebhookError) + Send + Sync>>,
241 done_tx: oneshot::Sender<()>,
242) {
243 let client = reqwest::Client::new();
244
245 while let Ok(job) = receiver.recv().await {
246 match job {
247 DispatchJob::Flush { done_tx } => {
248 let _ = done_tx.send(());
249 }
250 DispatchJob::Send {
251 template_name,
252 mut data,
253 } => {
254 let hints = extract_hints(&mut data);
255
256 let rendered = match engine.render(&template_name, &data) {
257 Ok(r) => r,
258 Err(e) => {
259 report_error(&on_error, e);
260 continue;
261 }
262 };
263
264 let rendered = Arc::new(rendered);
265 let hints = Arc::new(hints);
266
267 let futs = destinations.iter().map(|dest| {
268 let client = client.clone();
269 let rendered = Arc::clone(&rendered);
270 let hints = Arc::clone(&hints);
271 let dest_name = dest.name.clone();
272 let on_error = on_error.clone();
273 let platform = Arc::clone(&dest.platform);
274
275 async move {
276 if let Err(e) =
277 post(&client, platform.as_ref(), &rendered, &hints, &dest_name).await
278 {
279 report_error(&on_error, e);
280 }
281 }
282 });
283
284 join_all(futs).await;
285 }
286 }
287 }
288
289 let _ = done_tx.send(());
291}
292
293fn report_error(on_error: &Option<Arc<dyn Fn(WebhookError) + Send + Sync>>, e: WebhookError) {
294 if let Some(handler) = on_error {
295 handler(e);
296 } else {
297 tracing::warn!(error = %e, "webhook dispatch error");
298 }
299}