1use crate::events::{AlienEvent, EventChange, EventHandle, EventHandler, EventState};
2use crate::{ErrorData, Result};
3use alien_error::Context;
4use chrono::Utc;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9tokio::task_local! {
10 static EVENT_BUS: EventBus;
12}
13
14tokio::task_local! {
15 static PARENT_EVENT_ID: Option<String>;
17}
18
19pub struct EventBus {
21 handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
23}
24
25impl EventBus {
26 pub fn new() -> Self {
28 Self {
29 handlers: Arc::new(RwLock::new(Vec::new())),
30 }
31 }
32
33 pub fn with_handlers(handlers: Vec<Arc<dyn EventHandler>>) -> Self {
35 Self {
36 handlers: Arc::new(RwLock::new(handlers)),
37 }
38 }
39
40 pub async fn run<F, Fut, T>(&self, f: F) -> T
42 where
43 F: FnOnce() -> Fut,
44 Fut: std::future::Future<Output = T>,
45 {
46 EVENT_BUS.scope(self.clone(), f()).await
47 }
48
49 pub async fn register_handler(handler: Arc<dyn EventHandler>) -> Result<()> {
51 let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
52 Ok(bus) => bus,
53 Err(_) => return Ok(()), };
55
56 let mut handlers = bus.handlers.write().await;
57 handlers.push(handler);
58 Ok(())
59 }
60
61 pub async fn emit(
63 event: AlienEvent,
64 parent_id: Option<String>,
65 state: EventState,
66 ) -> Result<EventHandle> {
67 let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
68 Ok(bus) => bus,
69 Err(_) => return Ok(EventHandle::noop()), };
71
72 let id = Uuid::new_v4().to_string();
74
75 let effective_parent_id =
77 parent_id.or_else(|| PARENT_EVENT_ID.try_with(|p| p.clone()).ok().flatten());
78
79 let now = Utc::now();
80
81 let change = EventChange::Created {
83 id: id.clone(),
84 parent_id: effective_parent_id.clone(),
85 created_at: now,
86 event: event.clone(),
87 state: state.clone(),
88 };
89
90 {
92 let handlers = bus.handlers.read().await;
93 for handler in handlers.iter() {
94 handler
95 .on_event_change(change.clone())
96 .await
97 .context(ErrorData::GenericError {
98 message: "Event handler failed".to_string(),
99 })?;
100 }
101 }
102
103 Ok(EventHandle::new(id, effective_parent_id))
104 }
105
106 pub async fn update(id: &str, event: AlienEvent) -> Result<()> {
108 let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
109 Ok(bus) => bus,
110 Err(_) => return Ok(()), };
112
113 let now = Utc::now();
114 let change = EventChange::Updated {
115 id: id.to_string(),
116 updated_at: now,
117 event,
118 };
119
120 let handlers = bus.handlers.read().await;
122 for handler in handlers.iter() {
123 handler
124 .on_event_change(change.clone())
125 .await
126 .context(ErrorData::GenericError {
127 message: "Event handler failed".to_string(),
128 })?;
129 }
130
131 Ok(())
132 }
133
134 pub async fn update_state(id: &str, new_state: EventState) -> Result<()> {
136 let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
137 Ok(bus) => bus,
138 Err(_) => return Ok(()), };
140
141 let now = Utc::now();
142 let change = EventChange::StateChanged {
143 id: id.to_string(),
144 updated_at: now,
145 new_state,
146 };
147
148 let handlers = bus.handlers.read().await;
150 for handler in handlers.iter() {
151 if let Err(e) = handler.on_event_change(change.clone()).await {
152 return Err(e).context(ErrorData::GenericError {
154 message: "Event handler failed".to_string(),
155 });
156 }
157 }
158
159 Ok(())
160 }
161
162 pub async fn with_parent<F, Fut, T>(parent_id: Option<String>, f: F) -> T
164 where
165 F: FnOnce(&EventHandle) -> Fut,
166 Fut: std::future::Future<Output = T>,
167 {
168 let handle = EventHandle::new(parent_id.clone().unwrap_or_else(|| String::new()), None);
170
171 if let Some(parent) = parent_id {
172 PARENT_EVENT_ID.scope(Some(parent), f(&handle)).await
173 } else {
174 f(&handle).await
175 }
176 }
177
178 pub fn current() -> Option<Self> {
180 EVENT_BUS.try_with(|bus| bus.clone()).ok()
181 }
182}
183
184impl Clone for EventBus {
185 fn clone(&self) -> Self {
186 Self {
187 handlers: self.handlers.clone(),
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use alien_error::AlienError;
195
196 use crate::ErrorData;
197
198 use super::*;
199 use std::sync::Mutex;
200
201 struct TestHandler {
202 changes: Arc<Mutex<Vec<EventChange>>>,
203 }
204
205 #[async_trait::async_trait]
206 impl EventHandler for TestHandler {
207 async fn on_event_change(&self, change: EventChange) -> Result<()> {
208 let mut changes = self.changes.lock().unwrap();
209 changes.push(change);
210 Ok(())
211 }
212 }
213
214 #[tokio::test]
215 async fn test_event_emission() {
216 let changes = Arc::new(Mutex::new(Vec::new()));
217 let handler = Arc::new(TestHandler {
218 changes: changes.clone(),
219 });
220 let bus = EventBus::with_handlers(vec![handler]);
221
222 bus.run(|| async {
223 let _handle = AlienEvent::BuildingStack {
224 stack: "test".to_string(),
225 }
226 .emit()
227 .await
228 .unwrap();
229
230 let changes = changes.lock().unwrap();
232 assert_eq!(changes.len(), 1);
233 match &changes[0] {
234 EventChange::Created { event, .. } => match event {
235 AlienEvent::BuildingStack { stack } => assert_eq!(stack, "test"),
236 _ => panic!("Wrong event type"),
237 },
238 _ => panic!("Expected Created change"),
239 }
240 })
241 .await;
242 }
243
244 #[tokio::test]
245 async fn test_event_hierarchy() {
246 let changes = Arc::new(Mutex::new(Vec::new()));
247 let handler = Arc::new(TestHandler {
248 changes: changes.clone(),
249 });
250 let bus = EventBus::with_handlers(vec![handler]);
251
252 bus.run(|| async {
253 let parent = AlienEvent::BuildingStack {
254 stack: "parent".to_string(),
255 }
256 .emit()
257 .await
258 .unwrap();
259
260 parent
262 .as_parent(|_| async {
263 AlienEvent::TestBuildImage {
264 image: "child".to_string(),
265 stage: "test".to_string(),
266 }
267 .emit()
268 .await
269 .unwrap();
270 })
271 .await;
272
273 let changes = changes.lock().unwrap();
274 assert_eq!(changes.len(), 2);
275
276 match &changes[0] {
278 EventChange::Created { id, parent_id, .. } => {
279 assert_eq!(id, &parent.id);
280 assert_eq!(parent_id, &None);
281 }
282 _ => panic!("Expected Created change for parent"),
283 }
284
285 match &changes[1] {
287 EventChange::Created { parent_id, .. } => {
288 assert_eq!(parent_id, &Some(parent.id.clone()));
289 }
290 _ => panic!("Expected Created change for child"),
291 }
292 })
293 .await;
294 }
295
296 #[tokio::test]
297 async fn test_event_update() {
298 let changes = Arc::new(Mutex::new(Vec::new()));
299 let handler = Arc::new(TestHandler {
300 changes: changes.clone(),
301 });
302 let bus = EventBus::with_handlers(vec![handler]);
303
304 bus.run(|| async {
305 let handle = AlienEvent::TestBuildImage {
306 image: "test".to_string(),
307 stage: "stage1".to_string(),
308 }
309 .emit()
310 .await
311 .unwrap();
312
313 handle
314 .update(AlienEvent::TestBuildImage {
315 image: "test".to_string(),
316 stage: "stage2".to_string(),
317 })
318 .await
319 .unwrap();
320
321 let changes = changes.lock().unwrap();
322 assert_eq!(changes.len(), 2);
323
324 match &changes[1] {
326 EventChange::Updated { id, event, .. } => {
327 assert_eq!(id, &handle.id);
328 match event {
329 AlienEvent::TestBuildImage { stage, .. } => assert_eq!(stage, "stage2"),
330 _ => panic!("Wrong event type"),
331 }
332 }
333 _ => panic!("Expected Updated change"),
334 }
335 })
336 .await;
337 }
338
339 #[tokio::test]
340 async fn test_scoped_success() {
341 let changes = Arc::new(Mutex::new(Vec::new()));
342 let handler = Arc::new(TestHandler {
343 changes: changes.clone(),
344 });
345 let bus = EventBus::with_handlers(vec![handler]);
346
347 bus.run(|| async {
348 let result = AlienEvent::BuildingStack {
349 stack: "test".to_string(),
350 }
351 .in_scope(|_handle| async move {
352 AlienEvent::TestBuildImage {
354 image: "child".to_string(),
355 stage: "test".to_string(),
356 }
357 .emit()
358 .await
359 .unwrap();
360 Ok::<_, AlienError<ErrorData>>(42)
361 })
362 .await
363 .unwrap();
364
365 assert_eq!(result, 42);
366
367 let changes = changes.lock().unwrap();
368 assert_eq!(changes.len(), 3); match &changes[2] {
372 EventChange::StateChanged { new_state, .. } => {
373 assert_eq!(new_state, &EventState::Success);
374 }
375 _ => panic!("Expected StateChanged to Success"),
376 }
377 })
378 .await;
379 }
380
381 #[tokio::test]
382 async fn test_scoped_failure() {
383 let changes = Arc::new(Mutex::new(Vec::new()));
384 let handler = Arc::new(TestHandler {
385 changes: changes.clone(),
386 });
387 let bus = EventBus::with_handlers(vec![handler]);
388
389 bus.run(|| async {
390 let result = AlienEvent::BuildingStack {
391 stack: "test".to_string(),
392 }
393 .in_scope(|_handle| async move {
394 Err::<i32, _>(AlienError::new(ErrorData::InvalidResourceUpdate { resource_id: "my_resource".to_string(), reason: "hummus".to_string() }))
395 })
396 .await;
397
398 assert!(result.is_err());
399 let err = result.err().unwrap();
400 assert!(matches!(&err.error, Some(ErrorData::InvalidResourceUpdate { resource_id, .. }) if resource_id == "my_resource"));
401
402 let changes = changes.lock().unwrap();
403 assert_eq!(changes.len(), 2); match &changes[1] {
407 EventChange::StateChanged { new_state, .. } => match new_state {
408 EventState::Failed { error } => {
409 let error = error.as_ref().expect("Expected error to be present");
410 assert_eq!(error.message, "Resource 'my_resource' cannot be updated: hummus")
411 }
412 _ => panic!("Expected Failed state"),
413 },
414 _ => panic!("Expected StateChanged to Failed"),
415 }
416 })
417 .await;
418 }
419
420 #[tokio::test]
421 async fn test_no_event_bus_context() {
422 let result = AlienEvent::BuildingStack {
424 stack: "test".to_string(),
425 }
426 .emit()
427 .await;
428
429 assert!(result.is_ok());
431 let handle = result.unwrap();
432 assert!(handle.is_noop);
433 }
434}