Skip to main content

cratestack_core/
events.rs

1//! Model-event bus: typed `created/updated/deleted` envelopes that
2//! procedure handlers can subscribe to.
3
4use std::collections::BTreeMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8
9use serde::{Deserialize, Serialize};
10
11use crate::error::CoolError;
12
13pub type CoolEventFuture = Pin<Box<dyn Future<Output = Result<(), CoolError>> + Send + 'static>>;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ModelEventKind {
17    Created,
18    Updated,
19    Deleted,
20}
21
22impl ModelEventKind {
23    pub const fn as_str(self) -> &'static str {
24        match self {
25            Self::Created => "created",
26            Self::Updated => "updated",
27            Self::Deleted => "deleted",
28        }
29    }
30
31    pub fn parse(value: &str) -> Result<Self, CoolError> {
32        match value {
33            "created" => Ok(Self::Created),
34            "updated" => Ok(Self::Updated),
35            "deleted" => Ok(Self::Deleted),
36            other => Err(CoolError::Validation(format!(
37                "unsupported model event operation `{other}`"
38            ))),
39        }
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44pub struct CoolEventEnvelope {
45    pub event_id: uuid::Uuid,
46    pub model: String,
47    pub operation: ModelEventKind,
48    pub occurred_at: chrono::DateTime<chrono::Utc>,
49    pub data: serde_json::Value,
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub struct ModelEvent<T> {
54    pub event_id: uuid::Uuid,
55    pub model: String,
56    pub operation: ModelEventKind,
57    pub occurred_at: chrono::DateTime<chrono::Utc>,
58    pub data: T,
59}
60
61impl<T> TryFrom<CoolEventEnvelope> for ModelEvent<T>
62where
63    T: serde::de::DeserializeOwned,
64{
65    type Error = CoolError;
66
67    fn try_from(value: CoolEventEnvelope) -> Result<Self, Self::Error> {
68        Ok(Self {
69            event_id: value.event_id,
70            model: value.model,
71            operation: value.operation,
72            occurred_at: value.occurred_at,
73            data: serde_json::from_value(value.data).map_err(|error| {
74                CoolError::Codec(format!("failed to decode event payload: {error}"))
75            })?,
76        })
77    }
78}
79
80type EventHandler = Arc<dyn Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync>;
81
82#[derive(Clone, Default)]
83pub struct CoolEventBus {
84    handlers: Arc<RwLock<BTreeMap<String, Vec<EventHandler>>>>,
85}
86
87impl std::fmt::Debug for CoolEventBus {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        let handler_count = self
90            .handlers
91            .read()
92            .map(|handlers| handlers.values().map(Vec::len).sum::<usize>())
93            .unwrap_or_default();
94        f.debug_struct("CoolEventBus")
95            .field("handler_count", &handler_count)
96            .finish()
97    }
98}
99
100impl CoolEventBus {
101    pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
102    where
103        F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
104    {
105        let mut handlers = self
106            .handlers
107            .write()
108            .expect("event bus handler registry should not be poisoned");
109        handlers
110            .entry(event_topic(model, operation))
111            .or_default()
112            .push(Arc::new(handler));
113    }
114
115    pub async fn emit(&self, envelope: CoolEventEnvelope) -> Result<(), CoolError> {
116        let handlers = self
117            .handlers
118            .read()
119            .expect("event bus handler registry should not be poisoned")
120            .get(&event_topic(&envelope.model, envelope.operation))
121            .cloned()
122            .unwrap_or_default();
123
124        for handler in handlers {
125            handler(envelope.clone()).await?;
126        }
127
128        Ok(())
129    }
130}
131
132pub fn event_topic(model: &str, operation: ModelEventKind) -> String {
133    format!("{}.{}", model, operation.as_str())
134}
135
136pub fn parse_emit_attribute(raw: &str) -> Result<Vec<ModelEventKind>, String> {
137    let Some(inner) = raw
138        .strip_prefix("@@emit(")
139        .and_then(|value| value.strip_suffix(')'))
140    else {
141        return Err(format!("unsupported event attribute `{raw}`"));
142    };
143
144    let mut operations = Vec::new();
145    for part in inner
146        .split(',')
147        .map(str::trim)
148        .filter(|part| !part.is_empty())
149    {
150        let operation = match part {
151            "created" => ModelEventKind::Created,
152            "updated" => ModelEventKind::Updated,
153            "deleted" => ModelEventKind::Deleted,
154            other => {
155                return Err(format!(
156                    "unsupported event operation `{other}` in `{raw}`; expected created, updated, or deleted"
157                ));
158            }
159        };
160        if !operations.contains(&operation) {
161            operations.push(operation);
162        }
163    }
164
165    if operations.is_empty() {
166        return Err(format!(
167            "event attribute `{raw}` must declare at least one operation"
168        ));
169    }
170
171    Ok(operations)
172}