cratestack_core/
events.rs1use std::collections::BTreeMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8
9use serde::{Deserialize, Serialize};
10
11use crate::error::CoolError;
12
13pub type CoolEventFuture = Pin<Box<dyn Future<Output = Result<(), CoolError>> + Send + 'static>>;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ModelEventKind {
17 Created,
18 Updated,
19 Deleted,
20}
21
22impl ModelEventKind {
23 pub const fn as_str(self) -> &'static str {
24 match self {
25 Self::Created => "created",
26 Self::Updated => "updated",
27 Self::Deleted => "deleted",
28 }
29 }
30
31 pub fn parse(value: &str) -> Result<Self, CoolError> {
32 match value {
33 "created" => Ok(Self::Created),
34 "updated" => Ok(Self::Updated),
35 "deleted" => Ok(Self::Deleted),
36 other => Err(CoolError::Validation(format!(
37 "unsupported model event operation `{other}`"
38 ))),
39 }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44pub struct CoolEventEnvelope {
45 pub event_id: uuid::Uuid,
46 pub model: String,
47 pub operation: ModelEventKind,
48 pub occurred_at: chrono::DateTime<chrono::Utc>,
49 pub data: serde_json::Value,
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub struct ModelEvent<T> {
54 pub event_id: uuid::Uuid,
55 pub model: String,
56 pub operation: ModelEventKind,
57 pub occurred_at: chrono::DateTime<chrono::Utc>,
58 pub data: T,
59}
60
61impl<T> TryFrom<CoolEventEnvelope> for ModelEvent<T>
62where
63 T: serde::de::DeserializeOwned,
64{
65 type Error = CoolError;
66
67 fn try_from(value: CoolEventEnvelope) -> Result<Self, Self::Error> {
68 Ok(Self {
69 event_id: value.event_id,
70 model: value.model,
71 operation: value.operation,
72 occurred_at: value.occurred_at,
73 data: serde_json::from_value(value.data).map_err(|error| {
74 CoolError::Codec(format!("failed to decode event payload: {error}"))
75 })?,
76 })
77 }
78}
79
80type EventHandler = Arc<dyn Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync>;
81
82#[derive(Clone, Default)]
83pub struct CoolEventBus {
84 handlers: Arc<RwLock<BTreeMap<String, Vec<EventHandler>>>>,
85}
86
87impl std::fmt::Debug for CoolEventBus {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 let handler_count = self
90 .handlers
91 .read()
92 .map(|handlers| handlers.values().map(Vec::len).sum::<usize>())
93 .unwrap_or_default();
94 f.debug_struct("CoolEventBus")
95 .field("handler_count", &handler_count)
96 .finish()
97 }
98}
99
100impl CoolEventBus {
101 pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
102 where
103 F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
104 {
105 let mut handlers = self
106 .handlers
107 .write()
108 .expect("event bus handler registry should not be poisoned");
109 handlers
110 .entry(event_topic(model, operation))
111 .or_default()
112 .push(Arc::new(handler));
113 }
114
115 pub async fn emit(&self, envelope: CoolEventEnvelope) -> Result<(), CoolError> {
116 let handlers = self
117 .handlers
118 .read()
119 .expect("event bus handler registry should not be poisoned")
120 .get(&event_topic(&envelope.model, envelope.operation))
121 .cloned()
122 .unwrap_or_default();
123
124 for handler in handlers {
125 handler(envelope.clone()).await?;
126 }
127
128 Ok(())
129 }
130}
131
132pub fn event_topic(model: &str, operation: ModelEventKind) -> String {
133 format!("{}.{}", model, operation.as_str())
134}
135
136pub fn parse_emit_attribute(raw: &str) -> Result<Vec<ModelEventKind>, String> {
137 let Some(inner) = raw
138 .strip_prefix("@@emit(")
139 .and_then(|value| value.strip_suffix(')'))
140 else {
141 return Err(format!("unsupported event attribute `{raw}`"));
142 };
143
144 let mut operations = Vec::new();
145 for part in inner
146 .split(',')
147 .map(str::trim)
148 .filter(|part| !part.is_empty())
149 {
150 let operation = match part {
151 "created" => ModelEventKind::Created,
152 "updated" => ModelEventKind::Updated,
153 "deleted" => ModelEventKind::Deleted,
154 other => {
155 return Err(format!(
156 "unsupported event operation `{other}` in `{raw}`; expected created, updated, or deleted"
157 ));
158 }
159 };
160 if !operations.contains(&operation) {
161 operations.push(operation);
162 }
163 }
164
165 if operations.is_empty() {
166 return Err(format!(
167 "event attribute `{raw}` must declare at least one operation"
168 ));
169 }
170
171 Ok(operations)
172}