Skip to main content

atrg_stream/
router.rs

1//! Ergonomic event router for dispatching Jetstream events by collection and operation.
2//!
3//! Instead of writing manual match statements in your `on_event` handler,
4//! use [`EventRouterBuilder`] to declaratively register handlers per collection
5//! and operation type:
6//!
7//! ```rust,ignore
8//! use atrg_stream::{EventRouterBuilder, CommitEvent, Operation};
9//!
10//! let router = EventRouterBuilder::new()
11//!     .on_create("app.bsky.feed.post", handle_new_post)
12//!     .on_delete("app.bsky.feed.post", handle_deleted_post)
13//!     .on("app.bsky.feed.like", handle_any_like)
14//!     .build();
15//! ```
16
17use std::fmt;
18use std::sync::Arc;
19
20use futures::future::BoxFuture;
21
22use crate::event::JetstreamEvent;
23
24/// Operation filter for event routing.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum Operation {
27    /// Match only `"create"` operations.
28    Create,
29    /// Match only `"update"` operations.
30    Update,
31    /// Match only `"delete"` operations.
32    Delete,
33    /// Match all operations on the collection.
34    Any,
35}
36
37impl fmt::Display for Operation {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            Self::Create => write!(f, "create"),
41            Self::Update => write!(f, "update"),
42            Self::Delete => write!(f, "delete"),
43            Self::Any => write!(f, "*"),
44        }
45    }
46}
47
48/// A typed commit event with extracted fields for handler convenience.
49///
50/// This is the value passed to each route handler. It contains all the
51/// information from the raw [`JetstreamEvent`] and commit data in a
52/// flat, ergonomic structure.
53#[derive(Debug, Clone)]
54pub struct CommitEvent {
55    /// DID of the account that produced this event.
56    pub did: String,
57    /// Record key.
58    pub rkey: String,
59    /// Collection NSID.
60    pub collection: String,
61    /// Operation type.
62    pub operation: Operation,
63    /// The record value (present for create/update, absent for delete).
64    pub record: Option<serde_json::Value>,
65    /// Commit revision (if available).
66    pub rev: Option<String>,
67    /// CID of the record (if available).
68    pub cid: Option<String>,
69    /// Original event timestamp in microseconds.
70    pub time_us: i64,
71}
72
73/// Handler function type for the event router.
74pub type RouteHandler<S> =
75    Arc<dyn Fn(CommitEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
76
77/// A routing entry: (collection, operation) -> handler.
78struct Route<S> {
79    collection: String,
80    operation: Operation,
81    handler: RouteHandler<S>,
82}
83
84/// Builder for constructing an event router.
85///
86/// Use the `on_create`, `on_update`, `on_delete`, and `on` methods to
87/// register handlers, then call [`build`](Self::build) to produce a
88/// dispatch function compatible with `AtrgApp::on_event`.
89pub struct EventRouterBuilder<S> {
90    routes: Vec<Route<S>>,
91}
92
93impl<S: Clone + Send + Sync + 'static> EventRouterBuilder<S> {
94    /// Create a new empty router builder.
95    pub fn new() -> Self {
96        Self { routes: Vec::new() }
97    }
98
99    /// Register a handler for `create` operations on a collection.
100    pub fn on_create<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
101    where
102        F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
103        Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
104    {
105        self.routes.push(Route {
106            collection: collection.into(),
107            operation: Operation::Create,
108            handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
109        });
110        self
111    }
112
113    /// Register a handler for `update` operations on a collection.
114    pub fn on_update<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
115    where
116        F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
117        Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
118    {
119        self.routes.push(Route {
120            collection: collection.into(),
121            operation: Operation::Update,
122            handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
123        });
124        self
125    }
126
127    /// Register a handler for `delete` operations on a collection.
128    pub fn on_delete<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
129    where
130        F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
131        Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
132    {
133        self.routes.push(Route {
134            collection: collection.into(),
135            operation: Operation::Delete,
136            handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
137        });
138        self
139    }
140
141    /// Register a handler for ALL operations on a collection.
142    ///
143    /// The handler will be called for create, update, and delete events
144    /// on the specified collection.
145    pub fn on<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
146    where
147        F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
148        Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
149    {
150        self.routes.push(Route {
151            collection: collection.into(),
152            operation: Operation::Any,
153            handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
154        });
155        self
156    }
157
158    /// Build the router into a dispatch function compatible with `AtrgApp::on_event`.
159    ///
160    /// The returned closure can be passed directly to `AtrgApp::on_event(...)`.
161    /// Events that don't match any registered route are silently ignored (with
162    /// a `tracing::debug!` log).
163    pub fn build(
164        self,
165    ) -> impl Fn(JetstreamEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static
166    {
167        let router = Arc::new(EventRouter {
168            routes: self.routes,
169        });
170        move |event, state| {
171            let router = router.clone();
172            Box::pin(async move { router.dispatch(event, state).await })
173        }
174    }
175}
176
177impl<S: Clone + Send + Sync + 'static> Default for EventRouterBuilder<S> {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183/// The compiled event router. Dispatches events to registered handlers.
184struct EventRouter<S> {
185    routes: Vec<Route<S>>,
186}
187
188impl<S: Clone + Send + Sync + 'static> EventRouter<S> {
189    /// Dispatch a single event to all matching handlers.
190    ///
191    /// - Non-commit events (identity, account) are skipped.
192    /// - Unknown operation strings are skipped.
193    /// - Multiple handlers can match the same event (e.g. both an `on_create`
194    ///   and an `on` handler for the same collection). All matching handlers
195    ///   are invoked in registration order.
196    async fn dispatch(&self, event: JetstreamEvent, state: S) -> anyhow::Result<()> {
197        let commit = match event.commit {
198            Some(c) => c,
199            None => return Ok(()), // identity/account events — skip
200        };
201
202        let operation = match commit.operation.as_str() {
203            "create" => Operation::Create,
204            "update" => Operation::Update,
205            "delete" => Operation::Delete,
206            _ => return Ok(()),
207        };
208
209        let commit_event = CommitEvent {
210            did: event.did,
211            rkey: commit.rkey.clone(),
212            collection: commit.collection.clone(),
213            operation,
214            record: commit.record,
215            rev: commit.rev,
216            cid: commit.cid,
217            time_us: event.time_us,
218        };
219
220        let mut handled = false;
221        for route in &self.routes {
222            if route.collection == commit.collection
223                && (route.operation == Operation::Any || route.operation == operation)
224            {
225                (route.handler)(commit_event.clone(), state.clone()).await?;
226                handled = true;
227            }
228        }
229
230        if !handled {
231            tracing::debug!(
232                collection = %commit.collection,
233                operation = %commit.operation,
234                "no handler registered for event, ignoring"
235            );
236        }
237
238        Ok(())
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use crate::event::CommitData;
246    use std::sync::atomic::{AtomicU32, Ordering};
247
248    /// Helper to build a create commit event.
249    fn make_event(collection: &str, operation: &str) -> JetstreamEvent {
250        JetstreamEvent {
251            did: "did:plc:test123".to_string(),
252            time_us: 1_700_000_000_000_000,
253            kind: "commit".to_string(),
254            commit: Some(CommitData {
255                collection: collection.to_string(),
256                rkey: "abc123".to_string(),
257                operation: operation.to_string(),
258                record: Some(serde_json::json!({"text": "hello"})),
259                cid: Some("bafytest".to_string()),
260                rev: Some("rev1".to_string()),
261            }),
262            identity: None,
263            account: None,
264        }
265    }
266
267    /// Helper to build an identity (non-commit) event.
268    fn make_identity_event() -> JetstreamEvent {
269        JetstreamEvent {
270            did: "did:plc:test123".to_string(),
271            time_us: 1_700_000_000_000_000,
272            kind: "identity".to_string(),
273            commit: None,
274            identity: Some(serde_json::json!({"handle": "alice.test"})),
275            account: None,
276        }
277    }
278
279    #[tokio::test]
280    async fn on_create_handler_is_called_for_create_events() {
281        let counter = Arc::new(AtomicU32::new(0));
282        let counter_clone = counter.clone();
283
284        let handler = EventRouterBuilder::new()
285            .on_create(
286                "app.bsky.feed.post",
287                move |event: CommitEvent, _state: ()| {
288                    let c = counter_clone.clone();
289                    async move {
290                        assert_eq!(event.did, "did:plc:test123");
291                        assert_eq!(event.collection, "app.bsky.feed.post");
292                        assert_eq!(event.operation, Operation::Create);
293                        assert_eq!(event.rkey, "abc123");
294                        assert!(event.record.is_some());
295                        c.fetch_add(1, Ordering::SeqCst);
296                        Ok(())
297                    }
298                },
299            )
300            .build();
301
302        let event = make_event("app.bsky.feed.post", "create");
303        handler(event, ()).await.unwrap();
304
305        assert_eq!(counter.load(Ordering::SeqCst), 1);
306    }
307
308    #[tokio::test]
309    async fn unregistered_collections_are_ignored() {
310        let counter = Arc::new(AtomicU32::new(0));
311        let counter_clone = counter.clone();
312
313        let handler = EventRouterBuilder::new()
314            .on_create(
315                "app.bsky.feed.post",
316                move |_event: CommitEvent, _state: ()| {
317                    let c = counter_clone.clone();
318                    async move {
319                        c.fetch_add(1, Ordering::SeqCst);
320                        Ok(())
321                    }
322                },
323            )
324            .build();
325
326        // Send an event for a different collection
327        let event = make_event("app.bsky.feed.like", "create");
328        handler(event, ()).await.unwrap();
329
330        // Handler should NOT have been called
331        assert_eq!(counter.load(Ordering::SeqCst), 0);
332    }
333
334    #[tokio::test]
335    async fn on_delete_is_not_triggered_by_create_events() {
336        let counter = Arc::new(AtomicU32::new(0));
337        let counter_clone = counter.clone();
338
339        let handler = EventRouterBuilder::new()
340            .on_delete(
341                "app.bsky.feed.post",
342                move |_event: CommitEvent, _state: ()| {
343                    let c = counter_clone.clone();
344                    async move {
345                        c.fetch_add(1, Ordering::SeqCst);
346                        Ok(())
347                    }
348                },
349            )
350            .build();
351
352        // Send a create event — the delete handler should NOT fire
353        let event = make_event("app.bsky.feed.post", "create");
354        handler(event, ()).await.unwrap();
355
356        assert_eq!(counter.load(Ordering::SeqCst), 0);
357    }
358
359    #[tokio::test]
360    async fn on_any_handler_is_triggered_for_all_operation_types() {
361        let counter = Arc::new(AtomicU32::new(0));
362
363        let handler = {
364            let c = counter.clone();
365            EventRouterBuilder::new()
366                .on(
367                    "app.bsky.feed.post",
368                    move |_event: CommitEvent, _state: ()| {
369                        let c = c.clone();
370                        async move {
371                            c.fetch_add(1, Ordering::SeqCst);
372                            Ok(())
373                        }
374                    },
375                )
376                .build()
377        };
378
379        // Send create
380        handler(make_event("app.bsky.feed.post", "create"), ())
381            .await
382            .unwrap();
383        // Send update
384        handler(make_event("app.bsky.feed.post", "update"), ())
385            .await
386            .unwrap();
387        // Send delete
388        handler(make_event("app.bsky.feed.post", "delete"), ())
389            .await
390            .unwrap();
391
392        assert_eq!(counter.load(Ordering::SeqCst), 3);
393    }
394
395    #[tokio::test]
396    async fn identity_events_are_silently_skipped() {
397        let counter = Arc::new(AtomicU32::new(0));
398        let counter_clone = counter.clone();
399
400        let handler = EventRouterBuilder::new()
401            .on(
402                "app.bsky.feed.post",
403                move |_event: CommitEvent, _state: ()| {
404                    let c = counter_clone.clone();
405                    async move {
406                        c.fetch_add(1, Ordering::SeqCst);
407                        Ok(())
408                    }
409                },
410            )
411            .build();
412
413        handler(make_identity_event(), ()).await.unwrap();
414
415        assert_eq!(counter.load(Ordering::SeqCst), 0);
416    }
417
418    #[tokio::test]
419    async fn multiple_handlers_for_same_collection_all_fire() {
420        let create_counter = Arc::new(AtomicU32::new(0));
421        let any_counter = Arc::new(AtomicU32::new(0));
422
423        let handler = {
424            let cc = create_counter.clone();
425            let ac = any_counter.clone();
426            EventRouterBuilder::new()
427                .on_create(
428                    "app.bsky.feed.post",
429                    move |_event: CommitEvent, _state: ()| {
430                        let c = cc.clone();
431                        async move {
432                            c.fetch_add(1, Ordering::SeqCst);
433                            Ok(())
434                        }
435                    },
436                )
437                .on(
438                    "app.bsky.feed.post",
439                    move |_event: CommitEvent, _state: ()| {
440                        let c = ac.clone();
441                        async move {
442                            c.fetch_add(1, Ordering::SeqCst);
443                            Ok(())
444                        }
445                    },
446                )
447                .build()
448        };
449
450        handler(make_event("app.bsky.feed.post", "create"), ())
451            .await
452            .unwrap();
453
454        // Both handlers should have fired
455        assert_eq!(create_counter.load(Ordering::SeqCst), 1);
456        assert_eq!(any_counter.load(Ordering::SeqCst), 1);
457    }
458
459    #[tokio::test]
460    async fn state_is_passed_to_handlers() {
461        #[derive(Clone)]
462        struct TestState {
463            prefix: String,
464        }
465
466        let result = Arc::new(tokio::sync::Mutex::new(String::new()));
467        let result_clone = result.clone();
468
469        let handler = EventRouterBuilder::new()
470            .on_create(
471                "app.bsky.feed.post",
472                move |event: CommitEvent, state: TestState| {
473                    let r = result_clone.clone();
474                    async move {
475                        let mut locked = r.lock().await;
476                        *locked = format!("{}:{}", state.prefix, event.did);
477                        Ok(())
478                    }
479                },
480            )
481            .build();
482
483        let state = TestState {
484            prefix: "hello".to_string(),
485        };
486        handler(make_event("app.bsky.feed.post", "create"), state)
487            .await
488            .unwrap();
489
490        let locked = result.lock().await;
491        assert_eq!(*locked, "hello:did:plc:test123");
492    }
493}