1use std::fmt;
18use std::sync::Arc;
19
20use futures::future::BoxFuture;
21
22use crate::event::JetstreamEvent;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum Operation {
27 Create,
29 Update,
31 Delete,
33 Any,
35}
36
37impl fmt::Display for Operation {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 Self::Create => write!(f, "create"),
41 Self::Update => write!(f, "update"),
42 Self::Delete => write!(f, "delete"),
43 Self::Any => write!(f, "*"),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
54pub struct CommitEvent {
55 pub did: String,
57 pub rkey: String,
59 pub collection: String,
61 pub operation: Operation,
63 pub record: Option<serde_json::Value>,
65 pub rev: Option<String>,
67 pub cid: Option<String>,
69 pub time_us: i64,
71}
72
73pub type RouteHandler<S> =
75 Arc<dyn Fn(CommitEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
76
77struct Route<S> {
79 collection: String,
80 operation: Operation,
81 handler: RouteHandler<S>,
82}
83
84pub struct EventRouterBuilder<S> {
90 routes: Vec<Route<S>>,
91}
92
93impl<S: Clone + Send + Sync + 'static> EventRouterBuilder<S> {
94 pub fn new() -> Self {
96 Self { routes: Vec::new() }
97 }
98
99 pub fn on_create<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
101 where
102 F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
103 Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
104 {
105 self.routes.push(Route {
106 collection: collection.into(),
107 operation: Operation::Create,
108 handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
109 });
110 self
111 }
112
113 pub fn on_update<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
115 where
116 F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
117 Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
118 {
119 self.routes.push(Route {
120 collection: collection.into(),
121 operation: Operation::Update,
122 handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
123 });
124 self
125 }
126
127 pub fn on_delete<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
129 where
130 F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
131 Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
132 {
133 self.routes.push(Route {
134 collection: collection.into(),
135 operation: Operation::Delete,
136 handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
137 });
138 self
139 }
140
141 pub fn on<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
146 where
147 F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
148 Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
149 {
150 self.routes.push(Route {
151 collection: collection.into(),
152 operation: Operation::Any,
153 handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
154 });
155 self
156 }
157
158 pub fn build(
164 self,
165 ) -> impl Fn(JetstreamEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static
166 {
167 let router = Arc::new(EventRouter {
168 routes: self.routes,
169 });
170 move |event, state| {
171 let router = router.clone();
172 Box::pin(async move { router.dispatch(event, state).await })
173 }
174 }
175}
176
177impl<S: Clone + Send + Sync + 'static> Default for EventRouterBuilder<S> {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183struct EventRouter<S> {
185 routes: Vec<Route<S>>,
186}
187
188impl<S: Clone + Send + Sync + 'static> EventRouter<S> {
189 async fn dispatch(&self, event: JetstreamEvent, state: S) -> anyhow::Result<()> {
197 let commit = match event.commit {
198 Some(c) => c,
199 None => return Ok(()), };
201
202 let operation = match commit.operation.as_str() {
203 "create" => Operation::Create,
204 "update" => Operation::Update,
205 "delete" => Operation::Delete,
206 _ => return Ok(()),
207 };
208
209 let commit_event = CommitEvent {
210 did: event.did,
211 rkey: commit.rkey.clone(),
212 collection: commit.collection.clone(),
213 operation,
214 record: commit.record,
215 rev: commit.rev,
216 cid: commit.cid,
217 time_us: event.time_us,
218 };
219
220 let mut handled = false;
221 for route in &self.routes {
222 if route.collection == commit.collection
223 && (route.operation == Operation::Any || route.operation == operation)
224 {
225 (route.handler)(commit_event.clone(), state.clone()).await?;
226 handled = true;
227 }
228 }
229
230 if !handled {
231 tracing::debug!(
232 collection = %commit.collection,
233 operation = %commit.operation,
234 "no handler registered for event, ignoring"
235 );
236 }
237
238 Ok(())
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use crate::event::CommitData;
246 use std::sync::atomic::{AtomicU32, Ordering};
247
248 fn make_event(collection: &str, operation: &str) -> JetstreamEvent {
250 JetstreamEvent {
251 did: "did:plc:test123".to_string(),
252 time_us: 1_700_000_000_000_000,
253 kind: "commit".to_string(),
254 commit: Some(CommitData {
255 collection: collection.to_string(),
256 rkey: "abc123".to_string(),
257 operation: operation.to_string(),
258 record: Some(serde_json::json!({"text": "hello"})),
259 cid: Some("bafytest".to_string()),
260 rev: Some("rev1".to_string()),
261 }),
262 identity: None,
263 account: None,
264 }
265 }
266
267 fn make_identity_event() -> JetstreamEvent {
269 JetstreamEvent {
270 did: "did:plc:test123".to_string(),
271 time_us: 1_700_000_000_000_000,
272 kind: "identity".to_string(),
273 commit: None,
274 identity: Some(serde_json::json!({"handle": "alice.test"})),
275 account: None,
276 }
277 }
278
279 #[tokio::test]
280 async fn on_create_handler_is_called_for_create_events() {
281 let counter = Arc::new(AtomicU32::new(0));
282 let counter_clone = counter.clone();
283
284 let handler = EventRouterBuilder::new()
285 .on_create(
286 "app.bsky.feed.post",
287 move |event: CommitEvent, _state: ()| {
288 let c = counter_clone.clone();
289 async move {
290 assert_eq!(event.did, "did:plc:test123");
291 assert_eq!(event.collection, "app.bsky.feed.post");
292 assert_eq!(event.operation, Operation::Create);
293 assert_eq!(event.rkey, "abc123");
294 assert!(event.record.is_some());
295 c.fetch_add(1, Ordering::SeqCst);
296 Ok(())
297 }
298 },
299 )
300 .build();
301
302 let event = make_event("app.bsky.feed.post", "create");
303 handler(event, ()).await.unwrap();
304
305 assert_eq!(counter.load(Ordering::SeqCst), 1);
306 }
307
308 #[tokio::test]
309 async fn unregistered_collections_are_ignored() {
310 let counter = Arc::new(AtomicU32::new(0));
311 let counter_clone = counter.clone();
312
313 let handler = EventRouterBuilder::new()
314 .on_create(
315 "app.bsky.feed.post",
316 move |_event: CommitEvent, _state: ()| {
317 let c = counter_clone.clone();
318 async move {
319 c.fetch_add(1, Ordering::SeqCst);
320 Ok(())
321 }
322 },
323 )
324 .build();
325
326 let event = make_event("app.bsky.feed.like", "create");
328 handler(event, ()).await.unwrap();
329
330 assert_eq!(counter.load(Ordering::SeqCst), 0);
332 }
333
334 #[tokio::test]
335 async fn on_delete_is_not_triggered_by_create_events() {
336 let counter = Arc::new(AtomicU32::new(0));
337 let counter_clone = counter.clone();
338
339 let handler = EventRouterBuilder::new()
340 .on_delete(
341 "app.bsky.feed.post",
342 move |_event: CommitEvent, _state: ()| {
343 let c = counter_clone.clone();
344 async move {
345 c.fetch_add(1, Ordering::SeqCst);
346 Ok(())
347 }
348 },
349 )
350 .build();
351
352 let event = make_event("app.bsky.feed.post", "create");
354 handler(event, ()).await.unwrap();
355
356 assert_eq!(counter.load(Ordering::SeqCst), 0);
357 }
358
359 #[tokio::test]
360 async fn on_any_handler_is_triggered_for_all_operation_types() {
361 let counter = Arc::new(AtomicU32::new(0));
362
363 let handler = {
364 let c = counter.clone();
365 EventRouterBuilder::new()
366 .on(
367 "app.bsky.feed.post",
368 move |_event: CommitEvent, _state: ()| {
369 let c = c.clone();
370 async move {
371 c.fetch_add(1, Ordering::SeqCst);
372 Ok(())
373 }
374 },
375 )
376 .build()
377 };
378
379 handler(make_event("app.bsky.feed.post", "create"), ())
381 .await
382 .unwrap();
383 handler(make_event("app.bsky.feed.post", "update"), ())
385 .await
386 .unwrap();
387 handler(make_event("app.bsky.feed.post", "delete"), ())
389 .await
390 .unwrap();
391
392 assert_eq!(counter.load(Ordering::SeqCst), 3);
393 }
394
395 #[tokio::test]
396 async fn identity_events_are_silently_skipped() {
397 let counter = Arc::new(AtomicU32::new(0));
398 let counter_clone = counter.clone();
399
400 let handler = EventRouterBuilder::new()
401 .on(
402 "app.bsky.feed.post",
403 move |_event: CommitEvent, _state: ()| {
404 let c = counter_clone.clone();
405 async move {
406 c.fetch_add(1, Ordering::SeqCst);
407 Ok(())
408 }
409 },
410 )
411 .build();
412
413 handler(make_identity_event(), ()).await.unwrap();
414
415 assert_eq!(counter.load(Ordering::SeqCst), 0);
416 }
417
418 #[tokio::test]
419 async fn multiple_handlers_for_same_collection_all_fire() {
420 let create_counter = Arc::new(AtomicU32::new(0));
421 let any_counter = Arc::new(AtomicU32::new(0));
422
423 let handler = {
424 let cc = create_counter.clone();
425 let ac = any_counter.clone();
426 EventRouterBuilder::new()
427 .on_create(
428 "app.bsky.feed.post",
429 move |_event: CommitEvent, _state: ()| {
430 let c = cc.clone();
431 async move {
432 c.fetch_add(1, Ordering::SeqCst);
433 Ok(())
434 }
435 },
436 )
437 .on(
438 "app.bsky.feed.post",
439 move |_event: CommitEvent, _state: ()| {
440 let c = ac.clone();
441 async move {
442 c.fetch_add(1, Ordering::SeqCst);
443 Ok(())
444 }
445 },
446 )
447 .build()
448 };
449
450 handler(make_event("app.bsky.feed.post", "create"), ())
451 .await
452 .unwrap();
453
454 assert_eq!(create_counter.load(Ordering::SeqCst), 1);
456 assert_eq!(any_counter.load(Ordering::SeqCst), 1);
457 }
458
459 #[tokio::test]
460 async fn state_is_passed_to_handlers() {
461 #[derive(Clone)]
462 struct TestState {
463 prefix: String,
464 }
465
466 let result = Arc::new(tokio::sync::Mutex::new(String::new()));
467 let result_clone = result.clone();
468
469 let handler = EventRouterBuilder::new()
470 .on_create(
471 "app.bsky.feed.post",
472 move |event: CommitEvent, state: TestState| {
473 let r = result_clone.clone();
474 async move {
475 let mut locked = r.lock().await;
476 *locked = format!("{}:{}", state.prefix, event.did);
477 Ok(())
478 }
479 },
480 )
481 .build();
482
483 let state = TestState {
484 prefix: "hello".to_string(),
485 };
486 handler(make_event("app.bsky.feed.post", "create"), state)
487 .await
488 .unwrap();
489
490 let locked = result.lock().await;
491 assert_eq!(*locked, "hello:did:plc:test123");
492 }
493}