use std::fmt;
use std::sync::Arc;
use futures::future::BoxFuture;
use crate::event::JetstreamEvent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Operation {
Create,
Update,
Delete,
Any,
}
impl fmt::Display for Operation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Create => write!(f, "create"),
Self::Update => write!(f, "update"),
Self::Delete => write!(f, "delete"),
Self::Any => write!(f, "*"),
}
}
}
#[derive(Debug, Clone)]
pub struct CommitEvent {
pub did: String,
pub rkey: String,
pub collection: String,
pub operation: Operation,
pub record: Option<serde_json::Value>,
pub rev: Option<String>,
pub cid: Option<String>,
pub time_us: i64,
}
pub type RouteHandler<S> =
Arc<dyn Fn(CommitEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
struct Route<S> {
collection: String,
operation: Operation,
handler: RouteHandler<S>,
}
pub struct EventRouterBuilder<S> {
routes: Vec<Route<S>>,
}
impl<S: Clone + Send + Sync + 'static> EventRouterBuilder<S> {
pub fn new() -> Self {
Self { routes: Vec::new() }
}
pub fn on_create<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
where
F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.routes.push(Route {
collection: collection.into(),
operation: Operation::Create,
handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
});
self
}
pub fn on_update<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
where
F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.routes.push(Route {
collection: collection.into(),
operation: Operation::Update,
handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
});
self
}
pub fn on_delete<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
where
F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.routes.push(Route {
collection: collection.into(),
operation: Operation::Delete,
handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
});
self
}
pub fn on<F, Fut>(mut self, collection: impl Into<String>, handler: F) -> Self
where
F: Fn(CommitEvent, S) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.routes.push(Route {
collection: collection.into(),
operation: Operation::Any,
handler: Arc::new(move |event, state| Box::pin(handler(event, state))),
});
self
}
pub fn build(
self,
) -> impl Fn(JetstreamEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static
{
let router = Arc::new(EventRouter {
routes: self.routes,
});
move |event, state| {
let router = router.clone();
Box::pin(async move { router.dispatch(event, state).await })
}
}
}
impl<S: Clone + Send + Sync + 'static> Default for EventRouterBuilder<S> {
fn default() -> Self {
Self::new()
}
}
struct EventRouter<S> {
routes: Vec<Route<S>>,
}
impl<S: Clone + Send + Sync + 'static> EventRouter<S> {
async fn dispatch(&self, event: JetstreamEvent, state: S) -> anyhow::Result<()> {
let commit = match event.commit {
Some(c) => c,
None => return Ok(()), };
let operation = match commit.operation.as_str() {
"create" => Operation::Create,
"update" => Operation::Update,
"delete" => Operation::Delete,
_ => return Ok(()),
};
let commit_event = CommitEvent {
did: event.did,
rkey: commit.rkey.clone(),
collection: commit.collection.clone(),
operation,
record: commit.record,
rev: commit.rev,
cid: commit.cid,
time_us: event.time_us,
};
let mut handled = false;
for route in &self.routes {
if route.collection == commit.collection
&& (route.operation == Operation::Any || route.operation == operation)
{
(route.handler)(commit_event.clone(), state.clone()).await?;
handled = true;
}
}
if !handled {
tracing::debug!(
collection = %commit.collection,
operation = %commit.operation,
"no handler registered for event, ignoring"
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::CommitData;
use std::sync::atomic::{AtomicU32, Ordering};
fn make_event(collection: &str, operation: &str) -> JetstreamEvent {
JetstreamEvent {
did: "did:plc:test123".to_string(),
time_us: 1_700_000_000_000_000,
kind: "commit".to_string(),
commit: Some(CommitData {
collection: collection.to_string(),
rkey: "abc123".to_string(),
operation: operation.to_string(),
record: Some(serde_json::json!({"text": "hello"})),
cid: Some("bafytest".to_string()),
rev: Some("rev1".to_string()),
}),
identity: None,
account: None,
}
}
fn make_identity_event() -> JetstreamEvent {
JetstreamEvent {
did: "did:plc:test123".to_string(),
time_us: 1_700_000_000_000_000,
kind: "identity".to_string(),
commit: None,
identity: Some(serde_json::json!({"handle": "alice.test"})),
account: None,
}
}
#[tokio::test]
async fn on_create_handler_is_called_for_create_events() {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let handler = EventRouterBuilder::new()
.on_create(
"app.bsky.feed.post",
move |event: CommitEvent, _state: ()| {
let c = counter_clone.clone();
async move {
assert_eq!(event.did, "did:plc:test123");
assert_eq!(event.collection, "app.bsky.feed.post");
assert_eq!(event.operation, Operation::Create);
assert_eq!(event.rkey, "abc123");
assert!(event.record.is_some());
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build();
let event = make_event("app.bsky.feed.post", "create");
handler(event, ()).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn unregistered_collections_are_ignored() {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let handler = EventRouterBuilder::new()
.on_create(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = counter_clone.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build();
let event = make_event("app.bsky.feed.like", "create");
handler(event, ()).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn on_delete_is_not_triggered_by_create_events() {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let handler = EventRouterBuilder::new()
.on_delete(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = counter_clone.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build();
let event = make_event("app.bsky.feed.post", "create");
handler(event, ()).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn on_any_handler_is_triggered_for_all_operation_types() {
let counter = Arc::new(AtomicU32::new(0));
let handler = {
let c = counter.clone();
EventRouterBuilder::new()
.on(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = c.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build()
};
handler(make_event("app.bsky.feed.post", "create"), ())
.await
.unwrap();
handler(make_event("app.bsky.feed.post", "update"), ())
.await
.unwrap();
handler(make_event("app.bsky.feed.post", "delete"), ())
.await
.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn identity_events_are_silently_skipped() {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let handler = EventRouterBuilder::new()
.on(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = counter_clone.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build();
handler(make_identity_event(), ()).await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn multiple_handlers_for_same_collection_all_fire() {
let create_counter = Arc::new(AtomicU32::new(0));
let any_counter = Arc::new(AtomicU32::new(0));
let handler = {
let cc = create_counter.clone();
let ac = any_counter.clone();
EventRouterBuilder::new()
.on_create(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = cc.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.on(
"app.bsky.feed.post",
move |_event: CommitEvent, _state: ()| {
let c = ac.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
)
.build()
};
handler(make_event("app.bsky.feed.post", "create"), ())
.await
.unwrap();
assert_eq!(create_counter.load(Ordering::SeqCst), 1);
assert_eq!(any_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn state_is_passed_to_handlers() {
#[derive(Clone)]
struct TestState {
prefix: String,
}
let result = Arc::new(tokio::sync::Mutex::new(String::new()));
let result_clone = result.clone();
let handler = EventRouterBuilder::new()
.on_create(
"app.bsky.feed.post",
move |event: CommitEvent, state: TestState| {
let r = result_clone.clone();
async move {
let mut locked = r.lock().await;
*locked = format!("{}:{}", state.prefix, event.did);
Ok(())
}
},
)
.build();
let state = TestState {
prefix: "hello".to_string(),
};
handler(make_event("app.bsky.feed.post", "create"), state)
.await
.unwrap();
let locked = result.lock().await;
assert_eq!(*locked, "hello:did:plc:test123");
}
}