1use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet, VecDeque};
15use tokio::sync::broadcast;
16
17use std::sync::atomic::{AtomicI64, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21#[derive(Clone)]
23pub struct MemoryStore {
24 events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26 event_seq: Arc<AtomicI64>,
28 states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30 effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32 completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35 insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38 insight_seq: Arc<AtomicI64>,
40 event_history: Arc<DashMap<Uuid, StoredEvent>>,
42 effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44 dead_letter_history: Arc<DashMap<(Uuid, String), StoredDeadLetter>>,
46 join_windows: Arc<Mutex<HashMap<(String, Uuid, Uuid), MemoryJoinWindow>>>,
48}
49
50#[derive(Debug, Clone)]
52struct StoredEvent {
53 seq: i64,
54 event_id: Uuid,
55 parent_id: Option<Uuid>,
56 correlation_id: Uuid,
57 event_type: String,
58 payload: serde_json::Value,
59 hops: i32,
60 retry_count: i32,
61 batch_id: Option<Uuid>,
62 batch_index: Option<i32>,
63 batch_size: Option<i32>,
64 created_at: DateTime<Utc>,
65}
66
67#[derive(Debug, Clone)]
69struct StoredEffect {
70 effect_id: String,
71 event_id: Uuid,
72 correlation_id: Uuid,
73 event_type: String,
74 event_payload: serde_json::Value,
75 batch_id: Option<Uuid>,
76 batch_index: Option<i32>,
77 batch_size: Option<i32>,
78 status: String,
79 result: Option<serde_json::Value>,
80 error: Option<String>,
81 attempts: i32,
82 created_at: DateTime<Utc>,
83 execute_at: DateTime<Utc>,
84 claimed_at: Option<DateTime<Utc>>,
85 last_attempted_at: Option<DateTime<Utc>>,
86 completed_at: Option<DateTime<Utc>>,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum MemoryJoinStatus {
91 Open,
92 Processing,
93 Completed,
94}
95
96#[derive(Debug, Clone)]
97struct MemoryJoinWindow {
98 target_count: i32,
99 status: MemoryJoinStatus,
100 source_event_ids: HashSet<Uuid>,
101 entries_by_index: HashMap<i32, JoinEntry>,
102}
103
104#[derive(Debug, Clone)]
106struct StoredDeadLetter {
107 event_id: Uuid,
108 effect_id: String,
109 correlation_id: Uuid,
110 event_type: String,
111 event_payload: serde_json::Value,
112 error: String,
113 reason: String,
114 attempts: i32,
115 failed_at: DateTime<Utc>,
116 resolved_at: Option<DateTime<Utc>>,
117}
118
119impl MemoryStore {
120 pub fn new() -> Self {
121 let (insight_tx, _) = broadcast::channel(1000);
122 Self {
123 events: Arc::new(DashMap::new()),
124 event_seq: Arc::new(AtomicI64::new(1)),
125 states: Arc::new(DashMap::new()),
126 effects: Arc::new(Mutex::new(VecDeque::new())),
127 completed_effects: Arc::new(DashMap::new()),
128 insight_tx: Arc::new(insight_tx),
129 insight_seq: Arc::new(AtomicI64::new(1)),
130 event_history: Arc::new(DashMap::new()),
131 effect_history: Arc::new(DashMap::new()),
132 dead_letter_history: Arc::new(DashMap::new()),
133 join_windows: Arc::new(Mutex::new(HashMap::new())),
134 }
135 }
136
137 fn publish_insight(&self, event: InsightEvent) {
139 let _ = self.insight_tx.send(event);
141 }
142}
143
144impl Default for MemoryStore {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150#[async_trait]
151impl Store for MemoryStore {
152 async fn publish(&self, event: QueuedEvent) -> Result<()> {
153 if self.event_history.contains_key(&event.event_id) {
154 return Ok(());
155 }
156
157 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
159
160 self.event_history.insert(
162 event.event_id,
163 StoredEvent {
164 seq,
165 event_id: event.event_id,
166 parent_id: event.parent_id,
167 correlation_id: event.correlation_id,
168 event_type: event.event_type.clone(),
169 payload: event.payload.clone(),
170 hops: event.hops,
171 retry_count: event.retry_count,
172 batch_id: event.batch_id,
173 batch_index: event.batch_index,
174 batch_size: event.batch_size,
175 created_at: event.created_at,
176 },
177 );
178
179 self.publish_insight(InsightEvent {
181 seq,
182 stream_type: StreamType::EventDispatched,
183 correlation_id: event.correlation_id,
184 event_id: Some(event.event_id),
185 effect_event_id: None,
186 effect_id: None,
187 event_type: Some(event.event_type.clone()),
188 status: None,
189 error: None,
190 payload: Some(serde_json::json!({
191 "event_type": event.event_type.clone(),
192 "hops": event.hops,
193 "batch_id": event.batch_id,
194 "batch_index": event.batch_index,
195 "batch_size": event.batch_size,
196 "payload": event.payload.clone(),
197 })),
198 created_at: event.created_at,
199 });
200
201 let mut queue = self
203 .events
204 .entry(event.correlation_id)
205 .or_insert_with(VecDeque::new);
206 queue.push_back(event);
207 Ok(())
208 }
209
210 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
211 for mut entry in self.events.iter_mut() {
213 if let Some(event) = entry.value_mut().pop_front() {
214 return Ok(Some(event));
215 }
216 }
217 Ok(None)
218 }
219
220 async fn ack(&self, _id: i64) -> Result<()> {
221 Ok(())
223 }
224
225 async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
226 Ok(())
228 }
229
230 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231 where
232 S: for<'de> Deserialize<'de> + Send,
233 {
234 if let Some(entry) = self.states.get(&correlation_id) {
235 let (json, version) = entry.value();
236 let state: S = serde_json::from_value(json.clone())?;
237 Ok(Some((state, *version)))
238 } else {
239 Ok(None)
240 }
241 }
242
243 async fn save_state<S>(
244 &self,
245 correlation_id: Uuid,
246 state: &S,
247 expected_version: i32,
248 ) -> Result<i32>
249 where
250 S: Serialize + Send + Sync,
251 {
252 let json = serde_json::to_value(state)?;
253 let new_version = expected_version + 1;
254
255 if let Some(mut entry) = self.states.get_mut(&correlation_id) {
257 let (_, current_version) = entry.value();
258 if *current_version != expected_version {
259 return Err(anyhow!(
260 "Version mismatch: expected {} but was {}",
261 expected_version,
262 current_version
263 ));
264 }
265 *entry.value_mut() = (json, new_version);
266 } else {
267 self.states.insert(correlation_id, (json, new_version));
268 }
269
270 Ok(new_version)
271 }
272
273 async fn insert_effect_intent(
274 &self,
275 event_id: Uuid,
276 effect_id: String,
277 correlation_id: Uuid,
278 event_type: String,
279 event_payload: serde_json::Value,
280 parent_event_id: Option<Uuid>,
281 batch_id: Option<Uuid>,
282 batch_index: Option<i32>,
283 batch_size: Option<i32>,
284 execute_at: DateTime<Utc>,
285 timeout_seconds: i32,
286 max_attempts: i32,
287 priority: i32,
288 ) -> Result<()> {
289 let execution = QueuedEffectExecution {
290 event_id,
291 effect_id: effect_id.clone(),
292 correlation_id,
293 event_type,
294 event_payload,
295 parent_event_id,
296 batch_id,
297 batch_index,
298 batch_size,
299 execute_at,
300 timeout_seconds,
301 max_attempts,
302 priority,
303 attempts: 0,
304 };
305
306 let now = Utc::now();
308 self.effect_history.insert(
309 (event_id, effect_id.clone()),
310 StoredEffect {
311 effect_id: effect_id.clone(),
312 event_id,
313 correlation_id,
314 event_type: execution.event_type.clone(),
315 event_payload: execution.event_payload.clone(),
316 batch_id: execution.batch_id,
317 batch_index: execution.batch_index,
318 batch_size: execution.batch_size,
319 status: "pending".to_string(),
320 result: None,
321 error: None,
322 attempts: 0,
323 created_at: now,
324 execute_at,
325 claimed_at: None,
326 last_attempted_at: None,
327 completed_at: None,
328 },
329 );
330
331 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
333 self.publish_insight(InsightEvent {
334 seq,
335 stream_type: StreamType::EffectStarted,
336 correlation_id,
337 event_id: None,
338 effect_event_id: Some(event_id),
339 effect_id: Some(effect_id),
340 event_type: None,
341 status: Some("pending".to_string()),
342 error: None,
343 payload: None,
344 created_at: now,
345 });
346
347 let mut queue = self.effects.lock();
348 queue.push_back(execution);
349 Ok(())
350 }
351
352 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
353 let mut queue = self.effects.lock();
354
355 let now = Utc::now();
357 if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
358 if let Some(next) = queue.remove(pos) {
359 if let Some(mut effect) = self
360 .effect_history
361 .get_mut(&(next.event_id, next.effect_id.clone()))
362 {
363 effect.status = "executing".to_string();
364 effect.claimed_at = Some(now);
365 effect.last_attempted_at = Some(now);
366 effect.attempts = next.attempts + 1;
367 }
368 Ok(Some(next))
369 } else {
370 Ok(None)
371 }
372 } else {
373 Ok(None)
374 }
375 }
376
377 async fn complete_effect(
378 &self,
379 event_id: Uuid,
380 effect_id: String,
381 result: serde_json::Value,
382 ) -> Result<()> {
383 self.completed_effects
384 .insert((event_id, effect_id.clone()), result.clone());
385
386 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
388 entry.status = "completed".to_string();
389 entry.result = Some(result.clone());
390 let completed_at = Utc::now();
391 entry.completed_at = Some(completed_at);
392 entry.last_attempted_at = Some(completed_at);
393
394 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
396 self.publish_insight(InsightEvent {
397 seq,
398 stream_type: StreamType::EffectCompleted,
399 correlation_id: entry.correlation_id,
400 event_id: None,
401 effect_event_id: Some(event_id),
402 effect_id: Some(effect_id),
403 event_type: None,
404 status: Some("completed".to_string()),
405 error: None,
406 payload: Some(result),
407 created_at: completed_at,
408 });
409 }
410
411 Ok(())
412 }
413
414 async fn complete_effect_with_events(
415 &self,
416 event_id: Uuid,
417 effect_id: String,
418 result: serde_json::Value,
419 emitted_events: Vec<EmittedEvent>,
420 ) -> Result<()> {
421 self.complete_effect(event_id, effect_id.clone(), result)
423 .await?;
424
425 let correlation_id = self
426 .effect_history
427 .get(&(event_id, effect_id.clone()))
428 .map(|entry| entry.correlation_id)
429 .or_else(|| {
430 self.event_history
431 .get(&event_id)
432 .map(|entry| entry.correlation_id)
433 })
434 .unwrap_or(event_id);
435 let parent_hops = self
436 .event_history
437 .get(&event_id)
438 .map(|entry| entry.hops)
439 .unwrap_or(0);
440
441 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
443 let new_event_id = Uuid::new_v5(
444 &NAMESPACE_SEESAW,
445 format!(
446 "{}-{}-{}-{}",
447 event_id, effect_id, emitted.event_type, emitted_index
448 )
449 .as_bytes(),
450 );
451
452 let queued = QueuedEvent {
453 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
454 event_id: new_event_id,
455 parent_id: Some(event_id),
456 correlation_id,
457 event_type: emitted.event_type,
458 payload: emitted.payload,
459 hops: parent_hops + 1,
460 retry_count: 0,
461 batch_id: emitted.batch_id,
462 batch_index: emitted.batch_index,
463 batch_size: emitted.batch_size,
464 created_at: Utc::now(),
465 };
466
467 self.publish(queued).await?;
468 }
469
470 Ok(())
471 }
472
473 async fn fail_effect(
474 &self,
475 event_id: Uuid,
476 effect_id: String,
477 error: String,
478 _retry_after_secs: i32,
479 ) -> Result<()> {
480 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
482 let failed_at = Utc::now();
483 entry.status = "failed".to_string();
484 entry.error = Some(error.clone());
485 entry.attempts += 1;
486 entry.last_attempted_at = Some(failed_at);
487
488 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
490 self.publish_insight(InsightEvent {
491 seq,
492 stream_type: StreamType::EffectFailed,
493 correlation_id: entry.correlation_id,
494 event_id: None,
495 effect_event_id: Some(event_id),
496 effect_id: Some(effect_id),
497 event_type: None,
498 status: Some("failed".to_string()),
499 error: Some(error.clone()),
500 payload: None,
501 created_at: failed_at,
502 });
503 }
504
505 eprintln!("Effect failed: {}", error);
506 Ok(())
507 }
508
509 async fn dlq_effect(
510 &self,
511 event_id: Uuid,
512 effect_id: String,
513 error: String,
514 error_type: String,
515 attempts: i32,
516 ) -> Result<()> {
517 self.dlq_effect_with_events(event_id, effect_id, error, error_type, attempts, Vec::new())
518 .await
519 }
520
521 async fn dlq_effect_with_events(
522 &self,
523 event_id: Uuid,
524 effect_id: String,
525 error: String,
526 error_type: String,
527 attempts: i32,
528 emitted_events: Vec<EmittedEvent>,
529 ) -> Result<()> {
530 let effect_snapshot =
531 self.effect_history
532 .get(&(event_id, effect_id.clone()))
533 .map(|entry| {
534 (
535 entry.correlation_id,
536 entry.event_type.clone(),
537 entry.event_payload.clone(),
538 entry.batch_id,
539 entry.batch_index,
540 entry.batch_size,
541 )
542 });
543 let event_snapshot = self.event_history.get(&event_id).map(|event| {
544 (
545 event.correlation_id,
546 event.event_type.clone(),
547 event.payload.clone(),
548 event.batch_id,
549 event.batch_index,
550 event.batch_size,
551 event.hops,
552 )
553 });
554
555 let (correlation_id, event_type, event_payload, batch_id, batch_index, batch_size) =
556 if let Some(snapshot) = effect_snapshot.clone() {
557 (
558 snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
559 )
560 } else if let Some(snapshot) = event_snapshot.clone() {
561 (
562 snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
563 )
564 } else {
565 (
566 event_id,
567 "unknown".to_string(),
568 serde_json::Value::Null,
569 None,
570 None,
571 None,
572 )
573 };
574
575 if let Some(mut effect) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
576 effect.status = "failed".to_string();
577 effect.error = Some(error.clone());
578 effect.attempts = attempts;
579 effect.last_attempted_at = Some(Utc::now());
580 }
581
582 self.dead_letter_history.insert(
583 (event_id, effect_id.clone()),
584 StoredDeadLetter {
585 event_id,
586 effect_id: effect_id.clone(),
587 correlation_id,
588 event_type: event_type.clone(),
589 event_payload: event_payload.clone(),
590 error: error.clone(),
591 reason: error_type,
592 attempts,
593 failed_at: Utc::now(),
594 resolved_at: None,
595 },
596 );
597
598 let parent_hops = event_snapshot.map(|snapshot| snapshot.6).unwrap_or(0);
599 if emitted_events.is_empty() {
600 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
601 (batch_id, batch_index, batch_size)
602 {
603 let synthetic_event_id = Uuid::new_v5(
604 &NAMESPACE_SEESAW,
605 format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
606 );
607 self.publish(QueuedEvent {
608 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
609 event_id: synthetic_event_id,
610 parent_id: Some(event_id),
611 correlation_id,
612 event_type: event_type.clone(),
613 payload: event_payload.clone(),
614 hops: parent_hops + 1,
615 retry_count: 0,
616 batch_id: Some(batch_id),
617 batch_index: Some(batch_index),
618 batch_size: Some(batch_size),
619 created_at: Utc::now(),
620 })
621 .await?;
622 }
623 } else {
624 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
625 let synthetic_event_id = Uuid::new_v5(
626 &NAMESPACE_SEESAW,
627 format!(
628 "{}-{}-dlq-terminal-{}-{}",
629 event_id, effect_id, emitted.event_type, emitted_index
630 )
631 .as_bytes(),
632 );
633 self.publish(QueuedEvent {
634 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
635 event_id: synthetic_event_id,
636 parent_id: Some(event_id),
637 correlation_id,
638 event_type: emitted.event_type,
639 payload: emitted.payload,
640 hops: parent_hops + 1,
641 retry_count: 0,
642 batch_id: emitted.batch_id.or(batch_id),
643 batch_index: emitted.batch_index.or(batch_index),
644 batch_size: emitted.batch_size.or(batch_size),
645 created_at: Utc::now(),
646 })
647 .await?;
648 }
649 }
650
651 eprintln!(
652 "Effect sent to DLQ: {}:{} - {} (attempts: {})",
653 event_id, effect_id, error, attempts
654 );
655 Ok(())
656 }
657
658 async fn join_same_batch_append_and_maybe_claim(
659 &self,
660 join_effect_id: String,
661 correlation_id: Uuid,
662 source_event_id: Uuid,
663 source_event_type: String,
664 source_payload: serde_json::Value,
665 source_created_at: DateTime<Utc>,
666 batch_id: Uuid,
667 batch_index: i32,
668 batch_size: i32,
669 ) -> Result<Option<Vec<JoinEntry>>> {
670 let key = (join_effect_id.clone(), correlation_id, batch_id);
671 let mut windows = self.join_windows.lock();
672 let window = windows.entry(key).or_insert_with(|| MemoryJoinWindow {
673 target_count: batch_size,
674 status: MemoryJoinStatus::Open,
675 source_event_ids: HashSet::new(),
676 entries_by_index: HashMap::new(),
677 });
678
679 if window.status == MemoryJoinStatus::Completed {
680 return Ok(None);
681 }
682
683 if window.target_count != batch_size {
684 window.target_count = batch_size;
685 }
686
687 let already_seen_source = !window.source_event_ids.insert(source_event_id);
688 if !already_seen_source {
689 window
690 .entries_by_index
691 .entry(batch_index)
692 .or_insert_with(|| JoinEntry {
693 source_event_id,
694 event_type: source_event_type,
695 payload: source_payload,
696 batch_id,
697 batch_index,
698 batch_size,
699 created_at: source_created_at,
700 });
701 }
702
703 let ready = window.entries_by_index.len() as i32 >= window.target_count;
704 if ready && window.status == MemoryJoinStatus::Open {
705 window.status = MemoryJoinStatus::Processing;
706 let mut ordered = window
707 .entries_by_index
708 .values()
709 .cloned()
710 .collect::<Vec<_>>();
711 ordered.sort_by_key(|entry| entry.batch_index);
712 return Ok(Some(ordered));
713 }
714
715 Ok(None)
716 }
717
718 async fn join_same_batch_complete(
719 &self,
720 join_effect_id: String,
721 correlation_id: Uuid,
722 batch_id: Uuid,
723 ) -> Result<()> {
724 let key = (join_effect_id, correlation_id, batch_id);
725 if let Some(window) = self.join_windows.lock().get_mut(&key) {
726 window.status = MemoryJoinStatus::Completed;
727 }
728 self.join_windows.lock().remove(&key);
729 Ok(())
730 }
731
732 async fn join_same_batch_release(
733 &self,
734 join_effect_id: String,
735 correlation_id: Uuid,
736 batch_id: Uuid,
737 _error: String,
738 ) -> Result<()> {
739 let key = (join_effect_id, correlation_id, batch_id);
740 if let Some(window) = self.join_windows.lock().get_mut(&key) {
741 if window.status == MemoryJoinStatus::Processing {
742 window.status = MemoryJoinStatus::Open;
743 }
744 }
745 Ok(())
746 }
747
748 async fn subscribe_workflow_events(
749 &self,
750 _correlation_id: Uuid,
751 ) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
752 Err(anyhow!("Subscriptions not supported in memory store"))
754 }
755
756 async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
757 let has_events = self
759 .events
760 .get(&correlation_id)
761 .map(|q| !q.is_empty())
762 .unwrap_or(false);
763 let state = self
764 .states
765 .get(&correlation_id)
766 .map(|entry| entry.value().0.clone());
767 let pending_effects = 0i64; Ok(WorkflowStatus {
770 correlation_id,
771 state,
772 pending_effects,
773 is_settled: !has_events && pending_effects == 0,
774 last_event: None, })
776 }
777}
778
779#[async_trait]
780impl InsightStore for MemoryStore {
781 async fn subscribe_events(
782 &self,
783 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
784 let mut rx = self.insight_tx.subscribe();
785 let stream = async_stream::stream! {
786 while let Ok(event) = rx.recv().await {
787 yield event;
788 }
789 };
790
791 Ok(Box::new(Box::pin(stream)))
792 }
793
794 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
795 let mut events: Vec<_> = self
797 .event_history
798 .iter()
799 .filter(|e| e.value().correlation_id == correlation_id)
800 .map(|e| e.value().clone())
801 .collect();
802
803 events.sort_by_key(|e| e.created_at);
804
805 let event_ids: HashSet<Uuid> = events.iter().map(|e| e.event_id).collect();
808 let roots = self.build_event_nodes(&events, None, &event_ids, true);
809
810 let state = self
812 .states
813 .get(&correlation_id)
814 .map(|entry| entry.value().0.clone());
815
816 Ok(WorkflowTree {
817 correlation_id,
818 roots,
819 state,
820 event_count: events.len(),
821 effect_count: self
822 .effect_history
823 .iter()
824 .filter(|e| e.value().correlation_id == correlation_id)
825 .count(),
826 })
827 }
828
829 async fn get_stats(&self) -> Result<InsightStats> {
830 let total_events = self.event_history.len() as i64;
831
832 let mut active_effects = 0i64;
833 let mut completed_effects = 0i64;
834 let mut failed_effects = 0i64;
835
836 for entry in self.effect_history.iter() {
837 match entry.value().status.as_str() {
838 "pending" | "executing" => active_effects += 1,
839 "completed" => completed_effects += 1,
840 "failed" => failed_effects += 1,
841 _ => {}
842 }
843 }
844
845 Ok(InsightStats {
846 total_events,
847 active_effects,
848 completed_effects,
849 failed_effects,
850 })
851 }
852
853 async fn get_recent_events(
854 &self,
855 cursor: Option<i64>,
856 limit: usize,
857 ) -> Result<Vec<InsightEvent>> {
858 let mut events: Vec<_> = self
860 .event_history
861 .iter()
862 .filter_map(|e| {
863 let stored = e.value();
864 if let Some(cursor_seq) = cursor {
866 if stored.seq <= cursor_seq {
867 return None;
868 }
869 }
870 Some(InsightEvent {
871 seq: stored.seq,
872 stream_type: StreamType::EventDispatched,
873 correlation_id: stored.correlation_id,
874 event_id: Some(stored.event_id),
875 effect_event_id: None,
876 effect_id: None,
877 event_type: Some(stored.event_type.clone()),
878 status: None,
879 error: None,
880 payload: Some(serde_json::json!({
881 "event_type": stored.event_type.clone(),
882 "hops": stored.hops,
883 "batch_id": stored.batch_id,
884 "batch_index": stored.batch_index,
885 "batch_size": stored.batch_size,
886 "payload": stored.payload.clone(),
887 })),
888 created_at: stored.created_at,
889 })
890 })
891 .collect();
892
893 events.sort_by_key(|e| e.seq);
895 events.truncate(limit);
896
897 Ok(events)
898 }
899
900 async fn get_effect_logs(
901 &self,
902 correlation_id: Option<Uuid>,
903 limit: usize,
904 ) -> Result<Vec<EffectExecutionLog>> {
905 let mut logs: Vec<_> = self
906 .effect_history
907 .iter()
908 .filter_map(|entry| {
909 let effect = entry.value();
910 if let Some(filter) = correlation_id {
911 if effect.correlation_id != filter {
912 return None;
913 }
914 }
915
916 let started_at = effect.claimed_at.or(effect.last_attempted_at);
917 let duration_ms = match (started_at, effect.completed_at) {
918 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
919 _ => None,
920 };
921
922 Some(EffectExecutionLog {
923 correlation_id: effect.correlation_id,
924 event_id: effect.event_id,
925 effect_id: effect.effect_id.clone(),
926 status: effect.status.clone(),
927 attempts: effect.attempts,
928 event_type: Some(effect.event_type.clone()),
929 result: effect.result.clone(),
930 error: effect.error.clone(),
931 created_at: effect.created_at,
932 execute_at: Some(effect.execute_at),
933 claimed_at: effect.claimed_at,
934 last_attempted_at: effect.last_attempted_at,
935 completed_at: effect.completed_at,
936 duration_ms,
937 })
938 })
939 .collect();
940
941 logs.sort_by(|a, b| {
942 let a_time = a
943 .last_attempted_at
944 .or(a.completed_at)
945 .or(a.claimed_at)
946 .unwrap_or(a.created_at);
947 let b_time = b
948 .last_attempted_at
949 .or(b.completed_at)
950 .or(b.claimed_at)
951 .unwrap_or(b.created_at);
952 b_time.cmp(&a_time)
953 });
954 logs.truncate(limit);
955 Ok(logs)
956 }
957
958 async fn get_dead_letters(
959 &self,
960 unresolved_only: bool,
961 limit: usize,
962 ) -> Result<Vec<DeadLetterEntry>> {
963 let mut rows: Vec<_> = self
964 .dead_letter_history
965 .iter()
966 .filter_map(|entry| {
967 let dead = entry.value();
968 if unresolved_only && dead.resolved_at.is_some() {
969 return None;
970 }
971
972 Some(DeadLetterEntry {
973 correlation_id: dead.correlation_id,
974 event_id: dead.event_id,
975 effect_id: dead.effect_id.clone(),
976 event_type: dead.event_type.clone(),
977 event_payload: dead.event_payload.clone(),
978 error: dead.error.clone(),
979 reason: dead.reason.clone(),
980 attempts: dead.attempts,
981 failed_at: dead.failed_at,
982 resolved_at: dead.resolved_at,
983 })
984 })
985 .collect();
986
987 rows.sort_by(|a, b| b.failed_at.cmp(&a.failed_at));
988 rows.truncate(limit);
989 Ok(rows)
990 }
991
992 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
993 let mut workflows: HashMap<Uuid, FailedWorkflow> = HashMap::new();
994
995 for entry in self.effect_history.iter() {
996 let effect = entry.value();
997 let workflow = workflows
998 .entry(effect.correlation_id)
999 .or_insert(FailedWorkflow {
1000 correlation_id: effect.correlation_id,
1001 failed_effects: 0,
1002 active_effects: 0,
1003 dead_letters: 0,
1004 last_failed_at: None,
1005 last_error: None,
1006 });
1007
1008 match effect.status.as_str() {
1009 "failed" => {
1010 workflow.failed_effects += 1;
1011 let at = effect.last_attempted_at.unwrap_or(effect.created_at);
1012 if workflow
1013 .last_failed_at
1014 .map(|current| at > current)
1015 .unwrap_or(true)
1016 {
1017 workflow.last_failed_at = Some(at);
1018 workflow.last_error = effect.error.clone();
1019 }
1020 }
1021 "pending" | "executing" => {
1022 workflow.active_effects += 1;
1023 }
1024 _ => {}
1025 }
1026 }
1027
1028 for entry in self.dead_letter_history.iter() {
1029 let dead = entry.value();
1030 if dead.resolved_at.is_some() {
1031 continue;
1032 }
1033
1034 let workflow = workflows
1035 .entry(dead.correlation_id)
1036 .or_insert(FailedWorkflow {
1037 correlation_id: dead.correlation_id,
1038 failed_effects: 0,
1039 active_effects: 0,
1040 dead_letters: 0,
1041 last_failed_at: None,
1042 last_error: None,
1043 });
1044
1045 workflow.dead_letters += 1;
1046 if workflow
1047 .last_failed_at
1048 .map(|current| dead.failed_at > current)
1049 .unwrap_or(true)
1050 {
1051 workflow.last_failed_at = Some(dead.failed_at);
1052 workflow.last_error = Some(dead.error.clone());
1053 }
1054 }
1055
1056 let mut rows: Vec<_> = workflows
1057 .into_values()
1058 .filter(|workflow| workflow.failed_effects > 0 || workflow.dead_letters > 0)
1059 .collect();
1060
1061 rows.sort_by(|a, b| b.last_failed_at.cmp(&a.last_failed_at));
1062 rows.truncate(limit);
1063 Ok(rows)
1064 }
1065}
1066
1067impl MemoryStore {
1068 fn build_event_nodes(
1070 &self,
1071 events: &[StoredEvent],
1072 parent_id: Option<Uuid>,
1073 event_ids: &HashSet<Uuid>,
1074 is_root_pass: bool,
1075 ) -> Vec<EventNode> {
1076 events
1077 .iter()
1078 .filter(|event| {
1079 if is_root_pass {
1080 event.parent_id.is_none()
1081 || event
1082 .parent_id
1083 .map(|parent| !event_ids.contains(&parent))
1084 .unwrap_or(false)
1085 } else {
1086 event.parent_id == parent_id
1087 }
1088 })
1089 .map(|event| {
1090 let effects = self
1092 .effect_history
1093 .iter()
1094 .filter(|e| e.value().event_id == event.event_id)
1095 .map(|e| {
1096 let effect = e.value();
1097 EffectNode {
1098 effect_id: effect.effect_id.clone(),
1099 event_id: effect.event_id,
1100 status: effect.status.clone(),
1101 result: effect.result.clone(),
1102 error: effect.error.clone(),
1103 attempts: effect.attempts,
1104 created_at: effect.created_at,
1105 batch_id: effect.batch_id,
1106 batch_index: effect.batch_index,
1107 batch_size: effect.batch_size,
1108 }
1109 })
1110 .collect();
1111
1112 let children =
1114 self.build_event_nodes(events, Some(event.event_id), event_ids, false);
1115
1116 EventNode {
1117 event_id: event.event_id,
1118 event_type: event.event_type.clone(),
1119 payload: event.payload.clone(),
1120 created_at: event.created_at,
1121 batch_id: event.batch_id,
1122 batch_index: event.batch_index,
1123 batch_size: event.batch_size,
1124 children,
1125 effects,
1126 }
1127 })
1128 .collect()
1129 }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134 use super::*;
1135 use seesaw_core::store::Store;
1136
1137 #[tokio::test]
1138 async fn test_insight_events_have_unique_seq() {
1139 let store = MemoryStore::new();
1140
1141 for i in 1..=3 {
1143 let event = QueuedEvent {
1144 id: i as i64,
1145 event_id: Uuid::new_v4(),
1146 parent_id: None,
1147 correlation_id: Uuid::new_v4(),
1148 event_type: format!("Event{}", i),
1149 payload: serde_json::json!({"n": i}),
1150 created_at: Utc::now(),
1151 hops: 0,
1152 retry_count: 0,
1153 batch_id: None,
1154 batch_index: None,
1155 batch_size: None,
1156 };
1157 store.publish(event).await.unwrap();
1158 }
1159
1160 let events = store.get_recent_events(None, 10).await.unwrap();
1162 assert_eq!(events.len(), 3);
1163 assert_eq!(events[0].seq, 1);
1164 assert_eq!(events[1].seq, 2);
1165 assert_eq!(events[2].seq, 3);
1166 }
1167
1168 #[tokio::test]
1169 async fn test_cursor_based_filtering() {
1170 let store = MemoryStore::new();
1171 let correlation_id = Uuid::new_v4();
1172
1173 for i in 1..=5 {
1175 let event = QueuedEvent {
1176 id: i as i64,
1177 event_id: Uuid::new_v4(),
1178 parent_id: None,
1179 correlation_id,
1180 event_type: format!("Event{}", i),
1181 payload: serde_json::json!({"n": i}),
1182 created_at: Utc::now(),
1183 hops: 0,
1184 retry_count: 0,
1185 batch_id: None,
1186 batch_index: None,
1187 batch_size: None,
1188 };
1189 store.publish(event).await.unwrap();
1190 }
1191
1192 let events = store.get_recent_events(None, 2).await.unwrap();
1194 assert_eq!(events.len(), 2);
1195 assert_eq!(events[0].seq, 1);
1196 assert_eq!(events[1].seq, 2);
1197
1198 let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
1200 assert_eq!(next_events.len(), 2);
1201 assert_eq!(next_events[0].seq, 3);
1202 assert_eq!(next_events[1].seq, 4);
1203
1204 let all_seqs: Vec<i64> = events
1206 .iter()
1207 .chain(next_events.iter())
1208 .map(|e| e.seq)
1209 .collect();
1210 assert_eq!(all_seqs, vec![1, 2, 3, 4]);
1211 }
1212
1213 #[tokio::test]
1214 async fn test_no_events_before_cursor() {
1215 let store = MemoryStore::new();
1216
1217 for i in 1..=3 {
1219 let event = QueuedEvent {
1220 id: i as i64,
1221 event_id: Uuid::new_v4(),
1222 parent_id: None,
1223 correlation_id: Uuid::new_v4(),
1224 event_type: format!("Event{}", i),
1225 payload: serde_json::json!({"n": i}),
1226 created_at: Utc::now(),
1227 hops: 0,
1228 retry_count: 0,
1229 batch_id: None,
1230 batch_index: None,
1231 batch_size: None,
1232 };
1233 store.publish(event).await.unwrap();
1234 }
1235
1236 let events = store.get_recent_events(Some(10), 10).await.unwrap();
1238 assert_eq!(
1239 events.len(),
1240 0,
1241 "Should return no events when cursor is beyond all seq values"
1242 );
1243 }
1244
1245 #[tokio::test]
1246 async fn test_workflow_tree_treats_orphan_parent_as_root() {
1247 let store = MemoryStore::new();
1248 let correlation_id = Uuid::new_v4();
1249 let event_id = Uuid::new_v4();
1250 let missing_parent = Uuid::new_v4();
1251
1252 store
1253 .publish(QueuedEvent {
1254 id: 1,
1255 event_id,
1256 parent_id: Some(missing_parent),
1257 correlation_id,
1258 event_type: "OrphanEvent".to_string(),
1259 payload: serde_json::json!({"ok": true}),
1260 created_at: Utc::now(),
1261 hops: 1,
1262 retry_count: 0,
1263 batch_id: None,
1264 batch_index: None,
1265 batch_size: None,
1266 })
1267 .await
1268 .unwrap();
1269
1270 let tree = store.get_workflow_tree(correlation_id).await.unwrap();
1271 assert_eq!(tree.event_count, 1);
1272 assert_eq!(tree.roots.len(), 1);
1273 assert_eq!(tree.roots[0].event_id, event_id);
1274 }
1275
1276 #[tokio::test]
1277 async fn test_dlq_with_batch_metadata_publishes_synthetic_terminal_event() {
1278 let store = MemoryStore::new();
1279 let event_id = Uuid::new_v4();
1280 let correlation_id = Uuid::new_v4();
1281 let batch_id = Uuid::new_v4();
1282
1283 store
1284 .insert_effect_intent(
1285 event_id,
1286 "join_effect".to_string(),
1287 correlation_id,
1288 "BatchItemResult".to_string(),
1289 serde_json::json!({ "index": 2, "ok": false }),
1290 None,
1291 Some(batch_id),
1292 Some(2),
1293 Some(5),
1294 Utc::now(),
1295 30,
1296 1,
1297 10,
1298 )
1299 .await
1300 .expect("insert should succeed");
1301
1302 store
1303 .dlq_effect(
1304 event_id,
1305 "join_effect".to_string(),
1306 "forced failure".to_string(),
1307 "failed".to_string(),
1308 1,
1309 )
1310 .await
1311 .expect("dlq should succeed");
1312
1313 let synthetic = store.poll_next().await.expect("poll should succeed");
1314 assert!(
1315 synthetic.is_some(),
1316 "synthetic terminal event should be published"
1317 );
1318 let synthetic = synthetic.unwrap();
1319 assert_eq!(synthetic.correlation_id, correlation_id);
1320 assert_eq!(synthetic.event_type, "BatchItemResult");
1321 assert_eq!(synthetic.batch_id, Some(batch_id));
1322 assert_eq!(synthetic.batch_index, Some(2));
1323 assert_eq!(synthetic.batch_size, Some(5));
1324 }
1325
1326 #[tokio::test]
1327 async fn test_dlq_without_batch_metadata_does_not_publish_synthetic_terminal_event() {
1328 let store = MemoryStore::new();
1329 let event_id = Uuid::new_v4();
1330 let correlation_id = Uuid::new_v4();
1331
1332 store
1333 .insert_effect_intent(
1334 event_id,
1335 "normal_effect".to_string(),
1336 correlation_id,
1337 "NormalEvent".to_string(),
1338 serde_json::json!({ "ok": true }),
1339 None,
1340 None,
1341 None,
1342 None,
1343 Utc::now(),
1344 30,
1345 1,
1346 10,
1347 )
1348 .await
1349 .expect("insert should succeed");
1350
1351 store
1352 .dlq_effect(
1353 event_id,
1354 "normal_effect".to_string(),
1355 "forced failure".to_string(),
1356 "failed".to_string(),
1357 1,
1358 )
1359 .await
1360 .expect("dlq should succeed");
1361
1362 let next = store.poll_next().await.expect("poll should succeed");
1363 assert!(next.is_none(), "no synthetic terminal event expected");
1364 }
1365}