1use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet, VecDeque};
15use tokio::sync::broadcast;
16
17use std::sync::atomic::{AtomicI64, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21#[derive(Clone)]
23pub struct MemoryStore {
24 events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26 event_seq: Arc<AtomicI64>,
28 states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30 effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32 completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35 insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38 insight_seq: Arc<AtomicI64>,
40 event_history: Arc<DashMap<Uuid, StoredEvent>>,
42 effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44 dead_letter_history: Arc<DashMap<(Uuid, String), StoredDeadLetter>>,
46 join_windows: Arc<Mutex<HashMap<(String, Uuid, Uuid), MemoryJoinWindow>>>,
48}
49
50#[derive(Debug, Clone)]
52struct StoredEvent {
53 seq: i64,
54 event_id: Uuid,
55 parent_id: Option<Uuid>,
56 correlation_id: Uuid,
57 event_type: String,
58 payload: serde_json::Value,
59 hops: i32,
60 batch_id: Option<Uuid>,
61 batch_index: Option<i32>,
62 batch_size: Option<i32>,
63 created_at: DateTime<Utc>,
64}
65
66#[derive(Debug, Clone)]
68struct StoredEffect {
69 effect_id: String,
70 event_id: Uuid,
71 correlation_id: Uuid,
72 event_type: String,
73 event_payload: serde_json::Value,
74 batch_id: Option<Uuid>,
75 batch_index: Option<i32>,
76 batch_size: Option<i32>,
77 status: String,
78 result: Option<serde_json::Value>,
79 error: Option<String>,
80 attempts: i32,
81 created_at: DateTime<Utc>,
82 execute_at: DateTime<Utc>,
83 claimed_at: Option<DateTime<Utc>>,
84 last_attempted_at: Option<DateTime<Utc>>,
85 completed_at: Option<DateTime<Utc>>,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89enum MemoryJoinStatus {
90 Open,
91 Processing,
92 Completed,
93}
94
95#[derive(Debug, Clone)]
96struct MemoryJoinWindow {
97 target_count: i32,
98 status: MemoryJoinStatus,
99 source_event_ids: HashSet<Uuid>,
100 entries_by_index: HashMap<i32, JoinEntry>,
101}
102
103#[derive(Debug, Clone)]
105struct StoredDeadLetter {
106 event_id: Uuid,
107 effect_id: String,
108 correlation_id: Uuid,
109 event_type: String,
110 event_payload: serde_json::Value,
111 error: String,
112 reason: String,
113 attempts: i32,
114 failed_at: DateTime<Utc>,
115 resolved_at: Option<DateTime<Utc>>,
116}
117
118impl MemoryStore {
119 pub fn new() -> Self {
120 let (insight_tx, _) = broadcast::channel(1000);
121 Self {
122 events: Arc::new(DashMap::new()),
123 event_seq: Arc::new(AtomicI64::new(1)),
124 states: Arc::new(DashMap::new()),
125 effects: Arc::new(Mutex::new(VecDeque::new())),
126 completed_effects: Arc::new(DashMap::new()),
127 insight_tx: Arc::new(insight_tx),
128 insight_seq: Arc::new(AtomicI64::new(1)),
129 event_history: Arc::new(DashMap::new()),
130 effect_history: Arc::new(DashMap::new()),
131 dead_letter_history: Arc::new(DashMap::new()),
132 join_windows: Arc::new(Mutex::new(HashMap::new())),
133 }
134 }
135
136 fn publish_insight(&self, event: InsightEvent) {
138 let _ = self.insight_tx.send(event);
140 }
141}
142
143impl Default for MemoryStore {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149#[async_trait]
150impl Store for MemoryStore {
151 async fn publish(&self, event: QueuedEvent) -> Result<()> {
152 if self.event_history.contains_key(&event.event_id) {
153 return Ok(());
154 }
155
156 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
158
159 self.event_history.insert(
161 event.event_id,
162 StoredEvent {
163 seq,
164 event_id: event.event_id,
165 parent_id: event.parent_id,
166 correlation_id: event.correlation_id,
167 event_type: event.event_type.clone(),
168 payload: event.payload.clone(),
169 hops: event.hops,
170 batch_id: event.batch_id,
171 batch_index: event.batch_index,
172 batch_size: event.batch_size,
173 created_at: event.created_at,
174 },
175 );
176
177 self.publish_insight(InsightEvent {
179 seq,
180 stream_type: StreamType::EventDispatched,
181 correlation_id: event.correlation_id,
182 event_id: Some(event.event_id),
183 effect_event_id: None,
184 effect_id: None,
185 event_type: Some(event.event_type.clone()),
186 status: None,
187 error: None,
188 payload: Some(serde_json::json!({
189 "event_type": event.event_type.clone(),
190 "hops": event.hops,
191 "batch_id": event.batch_id,
192 "batch_index": event.batch_index,
193 "batch_size": event.batch_size,
194 "payload": event.payload.clone(),
195 })),
196 created_at: event.created_at,
197 });
198
199 let mut queue = self
201 .events
202 .entry(event.correlation_id)
203 .or_insert_with(VecDeque::new);
204 queue.push_back(event);
205 Ok(())
206 }
207
208 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
209 for mut entry in self.events.iter_mut() {
211 if let Some(event) = entry.value_mut().pop_front() {
212 return Ok(Some(event));
213 }
214 }
215 Ok(None)
216 }
217
218 async fn ack(&self, _id: i64) -> Result<()> {
219 Ok(())
221 }
222
223 async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
224 Ok(())
226 }
227
228 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
229 where
230 S: for<'de> Deserialize<'de> + Send,
231 {
232 if let Some(entry) = self.states.get(&correlation_id) {
233 let (json, version) = entry.value();
234 let state: S = serde_json::from_value(json.clone())?;
235 Ok(Some((state, *version)))
236 } else {
237 Ok(None)
238 }
239 }
240
241 async fn save_state<S>(
242 &self,
243 correlation_id: Uuid,
244 state: &S,
245 expected_version: i32,
246 ) -> Result<i32>
247 where
248 S: Serialize + Send + Sync,
249 {
250 let json = serde_json::to_value(state)?;
251 let new_version = expected_version + 1;
252
253 if let Some(mut entry) = self.states.get_mut(&correlation_id) {
255 let (_, current_version) = entry.value();
256 if *current_version != expected_version {
257 return Err(anyhow!(
258 "Version mismatch: expected {} but was {}",
259 expected_version,
260 current_version
261 ));
262 }
263 *entry.value_mut() = (json, new_version);
264 } else {
265 self.states.insert(correlation_id, (json, new_version));
266 }
267
268 Ok(new_version)
269 }
270
271 async fn insert_effect_intent(
272 &self,
273 event_id: Uuid,
274 effect_id: String,
275 correlation_id: Uuid,
276 event_type: String,
277 event_payload: serde_json::Value,
278 parent_event_id: Option<Uuid>,
279 batch_id: Option<Uuid>,
280 batch_index: Option<i32>,
281 batch_size: Option<i32>,
282 execute_at: DateTime<Utc>,
283 timeout_seconds: i32,
284 max_attempts: i32,
285 priority: i32,
286 ) -> Result<()> {
287 let execution = QueuedEffectExecution {
288 event_id,
289 effect_id: effect_id.clone(),
290 correlation_id,
291 event_type,
292 event_payload,
293 parent_event_id,
294 batch_id,
295 batch_index,
296 batch_size,
297 execute_at,
298 timeout_seconds,
299 max_attempts,
300 priority,
301 attempts: 0,
302 };
303
304 let now = Utc::now();
306 self.effect_history.insert(
307 (event_id, effect_id.clone()),
308 StoredEffect {
309 effect_id: effect_id.clone(),
310 event_id,
311 correlation_id,
312 event_type: execution.event_type.clone(),
313 event_payload: execution.event_payload.clone(),
314 batch_id: execution.batch_id,
315 batch_index: execution.batch_index,
316 batch_size: execution.batch_size,
317 status: "pending".to_string(),
318 result: None,
319 error: None,
320 attempts: 0,
321 created_at: now,
322 execute_at,
323 claimed_at: None,
324 last_attempted_at: None,
325 completed_at: None,
326 },
327 );
328
329 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
331 self.publish_insight(InsightEvent {
332 seq,
333 stream_type: StreamType::EffectStarted,
334 correlation_id,
335 event_id: None,
336 effect_event_id: Some(event_id),
337 effect_id: Some(effect_id),
338 event_type: None,
339 status: Some("pending".to_string()),
340 error: None,
341 payload: None,
342 created_at: now,
343 });
344
345 let mut queue = self.effects.lock();
346 queue.push_back(execution);
347 Ok(())
348 }
349
350 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
351 let mut queue = self.effects.lock();
352
353 let now = Utc::now();
355 if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
356 if let Some(next) = queue.remove(pos) {
357 if let Some(mut effect) = self
358 .effect_history
359 .get_mut(&(next.event_id, next.effect_id.clone()))
360 {
361 effect.status = "executing".to_string();
362 effect.claimed_at = Some(now);
363 effect.last_attempted_at = Some(now);
364 effect.attempts = next.attempts + 1;
365 }
366 Ok(Some(next))
367 } else {
368 Ok(None)
369 }
370 } else {
371 Ok(None)
372 }
373 }
374
375 async fn complete_effect(
376 &self,
377 event_id: Uuid,
378 effect_id: String,
379 result: serde_json::Value,
380 ) -> Result<()> {
381 self.completed_effects
382 .insert((event_id, effect_id.clone()), result.clone());
383
384 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
386 entry.status = "completed".to_string();
387 entry.result = Some(result.clone());
388 let completed_at = Utc::now();
389 entry.completed_at = Some(completed_at);
390 entry.last_attempted_at = Some(completed_at);
391
392 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
394 self.publish_insight(InsightEvent {
395 seq,
396 stream_type: StreamType::EffectCompleted,
397 correlation_id: entry.correlation_id,
398 event_id: None,
399 effect_event_id: Some(event_id),
400 effect_id: Some(effect_id),
401 event_type: None,
402 status: Some("completed".to_string()),
403 error: None,
404 payload: Some(result),
405 created_at: completed_at,
406 });
407 }
408
409 Ok(())
410 }
411
412 async fn complete_effect_with_events(
413 &self,
414 event_id: Uuid,
415 effect_id: String,
416 result: serde_json::Value,
417 emitted_events: Vec<EmittedEvent>,
418 ) -> Result<()> {
419 self.complete_effect(event_id, effect_id.clone(), result)
421 .await?;
422
423 let correlation_id = self
424 .effect_history
425 .get(&(event_id, effect_id.clone()))
426 .map(|entry| entry.correlation_id)
427 .or_else(|| {
428 self.event_history
429 .get(&event_id)
430 .map(|entry| entry.correlation_id)
431 })
432 .unwrap_or(event_id);
433 let parent_hops = self
434 .event_history
435 .get(&event_id)
436 .map(|entry| entry.hops)
437 .unwrap_or(0);
438
439 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
441 let new_event_id = Uuid::new_v5(
442 &NAMESPACE_SEESAW,
443 format!(
444 "{}-{}-{}-{}",
445 event_id, effect_id, emitted.event_type, emitted_index
446 )
447 .as_bytes(),
448 );
449
450 let queued = QueuedEvent {
451 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
452 event_id: new_event_id,
453 parent_id: Some(event_id),
454 correlation_id,
455 event_type: emitted.event_type,
456 payload: emitted.payload,
457 hops: parent_hops + 1,
458 batch_id: emitted.batch_id,
459 batch_index: emitted.batch_index,
460 batch_size: emitted.batch_size,
461 created_at: Utc::now(),
462 };
463
464 self.publish(queued).await?;
465 }
466
467 Ok(())
468 }
469
470 async fn fail_effect(
471 &self,
472 event_id: Uuid,
473 effect_id: String,
474 error: String,
475 _retry_after_secs: i32,
476 ) -> Result<()> {
477 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
479 let failed_at = Utc::now();
480 entry.status = "failed".to_string();
481 entry.error = Some(error.clone());
482 entry.attempts += 1;
483 entry.last_attempted_at = Some(failed_at);
484
485 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
487 self.publish_insight(InsightEvent {
488 seq,
489 stream_type: StreamType::EffectFailed,
490 correlation_id: entry.correlation_id,
491 event_id: None,
492 effect_event_id: Some(event_id),
493 effect_id: Some(effect_id),
494 event_type: None,
495 status: Some("failed".to_string()),
496 error: Some(error.clone()),
497 payload: None,
498 created_at: failed_at,
499 });
500 }
501
502 eprintln!("Effect failed: {}", error);
503 Ok(())
504 }
505
506 async fn dlq_effect(
507 &self,
508 event_id: Uuid,
509 effect_id: String,
510 error: String,
511 error_type: String,
512 attempts: i32,
513 ) -> Result<()> {
514 self.dlq_effect_with_events(event_id, effect_id, error, error_type, attempts, Vec::new())
515 .await
516 }
517
518 async fn dlq_effect_with_events(
519 &self,
520 event_id: Uuid,
521 effect_id: String,
522 error: String,
523 error_type: String,
524 attempts: i32,
525 emitted_events: Vec<EmittedEvent>,
526 ) -> Result<()> {
527 let effect_snapshot =
528 self.effect_history
529 .get(&(event_id, effect_id.clone()))
530 .map(|entry| {
531 (
532 entry.correlation_id,
533 entry.event_type.clone(),
534 entry.event_payload.clone(),
535 entry.batch_id,
536 entry.batch_index,
537 entry.batch_size,
538 )
539 });
540 let event_snapshot = self.event_history.get(&event_id).map(|event| {
541 (
542 event.correlation_id,
543 event.event_type.clone(),
544 event.payload.clone(),
545 event.batch_id,
546 event.batch_index,
547 event.batch_size,
548 event.hops,
549 )
550 });
551
552 let (correlation_id, event_type, event_payload, batch_id, batch_index, batch_size) =
553 if let Some(snapshot) = effect_snapshot.clone() {
554 (
555 snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
556 )
557 } else if let Some(snapshot) = event_snapshot.clone() {
558 (
559 snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
560 )
561 } else {
562 (
563 event_id,
564 "unknown".to_string(),
565 serde_json::Value::Null,
566 None,
567 None,
568 None,
569 )
570 };
571
572 if let Some(mut effect) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
573 effect.status = "failed".to_string();
574 effect.error = Some(error.clone());
575 effect.attempts = attempts;
576 effect.last_attempted_at = Some(Utc::now());
577 }
578
579 self.dead_letter_history.insert(
580 (event_id, effect_id.clone()),
581 StoredDeadLetter {
582 event_id,
583 effect_id: effect_id.clone(),
584 correlation_id,
585 event_type: event_type.clone(),
586 event_payload: event_payload.clone(),
587 error: error.clone(),
588 reason: error_type,
589 attempts,
590 failed_at: Utc::now(),
591 resolved_at: None,
592 },
593 );
594
595 let parent_hops = event_snapshot.map(|snapshot| snapshot.6).unwrap_or(0);
596 if emitted_events.is_empty() {
597 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
598 (batch_id, batch_index, batch_size)
599 {
600 let synthetic_event_id = Uuid::new_v5(
601 &NAMESPACE_SEESAW,
602 format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
603 );
604 self.publish(QueuedEvent {
605 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
606 event_id: synthetic_event_id,
607 parent_id: Some(event_id),
608 correlation_id,
609 event_type: event_type.clone(),
610 payload: event_payload.clone(),
611 hops: parent_hops + 1,
612 batch_id: Some(batch_id),
613 batch_index: Some(batch_index),
614 batch_size: Some(batch_size),
615 created_at: Utc::now(),
616 })
617 .await?;
618 }
619 } else {
620 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
621 let synthetic_event_id = Uuid::new_v5(
622 &NAMESPACE_SEESAW,
623 format!(
624 "{}-{}-dlq-terminal-{}-{}",
625 event_id, effect_id, emitted.event_type, emitted_index
626 )
627 .as_bytes(),
628 );
629 self.publish(QueuedEvent {
630 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
631 event_id: synthetic_event_id,
632 parent_id: Some(event_id),
633 correlation_id,
634 event_type: emitted.event_type,
635 payload: emitted.payload,
636 hops: parent_hops + 1,
637 batch_id: emitted.batch_id.or(batch_id),
638 batch_index: emitted.batch_index.or(batch_index),
639 batch_size: emitted.batch_size.or(batch_size),
640 created_at: Utc::now(),
641 })
642 .await?;
643 }
644 }
645
646 eprintln!(
647 "Effect sent to DLQ: {}:{} - {} (attempts: {})",
648 event_id, effect_id, error, attempts
649 );
650 Ok(())
651 }
652
653 async fn join_same_batch_append_and_maybe_claim(
654 &self,
655 join_effect_id: String,
656 correlation_id: Uuid,
657 source_event_id: Uuid,
658 source_event_type: String,
659 source_payload: serde_json::Value,
660 source_created_at: DateTime<Utc>,
661 batch_id: Uuid,
662 batch_index: i32,
663 batch_size: i32,
664 ) -> Result<Option<Vec<JoinEntry>>> {
665 let key = (join_effect_id.clone(), correlation_id, batch_id);
666 let mut windows = self.join_windows.lock();
667 let window = windows.entry(key).or_insert_with(|| MemoryJoinWindow {
668 target_count: batch_size,
669 status: MemoryJoinStatus::Open,
670 source_event_ids: HashSet::new(),
671 entries_by_index: HashMap::new(),
672 });
673
674 if window.status == MemoryJoinStatus::Completed {
675 return Ok(None);
676 }
677
678 if window.target_count != batch_size {
679 window.target_count = batch_size;
680 }
681
682 let already_seen_source = !window.source_event_ids.insert(source_event_id);
683 if !already_seen_source {
684 window
685 .entries_by_index
686 .entry(batch_index)
687 .or_insert_with(|| JoinEntry {
688 source_event_id,
689 event_type: source_event_type,
690 payload: source_payload,
691 batch_id,
692 batch_index,
693 batch_size,
694 created_at: source_created_at,
695 });
696 }
697
698 let ready = window.entries_by_index.len() as i32 >= window.target_count;
699 if ready && window.status == MemoryJoinStatus::Open {
700 window.status = MemoryJoinStatus::Processing;
701 let mut ordered = window
702 .entries_by_index
703 .values()
704 .cloned()
705 .collect::<Vec<_>>();
706 ordered.sort_by_key(|entry| entry.batch_index);
707 return Ok(Some(ordered));
708 }
709
710 Ok(None)
711 }
712
713 async fn join_same_batch_complete(
714 &self,
715 join_effect_id: String,
716 correlation_id: Uuid,
717 batch_id: Uuid,
718 ) -> Result<()> {
719 let key = (join_effect_id, correlation_id, batch_id);
720 if let Some(window) = self.join_windows.lock().get_mut(&key) {
721 window.status = MemoryJoinStatus::Completed;
722 }
723 self.join_windows.lock().remove(&key);
724 Ok(())
725 }
726
727 async fn join_same_batch_release(
728 &self,
729 join_effect_id: String,
730 correlation_id: Uuid,
731 batch_id: Uuid,
732 _error: String,
733 ) -> Result<()> {
734 let key = (join_effect_id, correlation_id, batch_id);
735 if let Some(window) = self.join_windows.lock().get_mut(&key) {
736 if window.status == MemoryJoinStatus::Processing {
737 window.status = MemoryJoinStatus::Open;
738 }
739 }
740 Ok(())
741 }
742
743 async fn subscribe_workflow_events(
744 &self,
745 _correlation_id: Uuid,
746 ) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
747 Err(anyhow!("Subscriptions not supported in memory store"))
749 }
750
751 async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
752 let has_events = self
754 .events
755 .get(&correlation_id)
756 .map(|q| !q.is_empty())
757 .unwrap_or(false);
758 let state = self
759 .states
760 .get(&correlation_id)
761 .map(|entry| entry.value().0.clone());
762 let pending_effects = 0i64; Ok(WorkflowStatus {
765 correlation_id,
766 state,
767 pending_effects,
768 is_settled: !has_events && pending_effects == 0,
769 last_event: None, })
771 }
772}
773
774#[async_trait]
775impl InsightStore for MemoryStore {
776 async fn subscribe_events(
777 &self,
778 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
779 let mut rx = self.insight_tx.subscribe();
780 let stream = async_stream::stream! {
781 while let Ok(event) = rx.recv().await {
782 yield event;
783 }
784 };
785
786 Ok(Box::new(Box::pin(stream)))
787 }
788
789 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
790 let mut events: Vec<_> = self
792 .event_history
793 .iter()
794 .filter(|e| e.value().correlation_id == correlation_id)
795 .map(|e| e.value().clone())
796 .collect();
797
798 events.sort_by_key(|e| e.created_at);
799
800 let event_ids: HashSet<Uuid> = events.iter().map(|e| e.event_id).collect();
803 let roots = self.build_event_nodes(&events, None, &event_ids, true);
804
805 let state = self
807 .states
808 .get(&correlation_id)
809 .map(|entry| entry.value().0.clone());
810
811 Ok(WorkflowTree {
812 correlation_id,
813 roots,
814 state,
815 event_count: events.len(),
816 effect_count: self
817 .effect_history
818 .iter()
819 .filter(|e| e.value().correlation_id == correlation_id)
820 .count(),
821 })
822 }
823
824 async fn get_stats(&self) -> Result<InsightStats> {
825 let total_events = self.event_history.len() as i64;
826
827 let mut active_effects = 0i64;
828 let mut completed_effects = 0i64;
829 let mut failed_effects = 0i64;
830
831 for entry in self.effect_history.iter() {
832 match entry.value().status.as_str() {
833 "pending" | "executing" => active_effects += 1,
834 "completed" => completed_effects += 1,
835 "failed" => failed_effects += 1,
836 _ => {}
837 }
838 }
839
840 Ok(InsightStats {
841 total_events,
842 active_effects,
843 completed_effects,
844 failed_effects,
845 })
846 }
847
848 async fn get_recent_events(
849 &self,
850 cursor: Option<i64>,
851 limit: usize,
852 ) -> Result<Vec<InsightEvent>> {
853 let mut events: Vec<_> = self
855 .event_history
856 .iter()
857 .filter_map(|e| {
858 let stored = e.value();
859 if let Some(cursor_seq) = cursor {
861 if stored.seq <= cursor_seq {
862 return None;
863 }
864 }
865 Some(InsightEvent {
866 seq: stored.seq,
867 stream_type: StreamType::EventDispatched,
868 correlation_id: stored.correlation_id,
869 event_id: Some(stored.event_id),
870 effect_event_id: None,
871 effect_id: None,
872 event_type: Some(stored.event_type.clone()),
873 status: None,
874 error: None,
875 payload: Some(serde_json::json!({
876 "event_type": stored.event_type.clone(),
877 "hops": stored.hops,
878 "batch_id": stored.batch_id,
879 "batch_index": stored.batch_index,
880 "batch_size": stored.batch_size,
881 "payload": stored.payload.clone(),
882 })),
883 created_at: stored.created_at,
884 })
885 })
886 .collect();
887
888 events.sort_by_key(|e| e.seq);
890 events.truncate(limit);
891
892 Ok(events)
893 }
894
895 async fn get_effect_logs(
896 &self,
897 correlation_id: Option<Uuid>,
898 limit: usize,
899 ) -> Result<Vec<EffectExecutionLog>> {
900 let mut logs: Vec<_> = self
901 .effect_history
902 .iter()
903 .filter_map(|entry| {
904 let effect = entry.value();
905 if let Some(filter) = correlation_id {
906 if effect.correlation_id != filter {
907 return None;
908 }
909 }
910
911 let started_at = effect.claimed_at.or(effect.last_attempted_at);
912 let duration_ms = match (started_at, effect.completed_at) {
913 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
914 _ => None,
915 };
916
917 Some(EffectExecutionLog {
918 correlation_id: effect.correlation_id,
919 event_id: effect.event_id,
920 effect_id: effect.effect_id.clone(),
921 status: effect.status.clone(),
922 attempts: effect.attempts,
923 event_type: Some(effect.event_type.clone()),
924 result: effect.result.clone(),
925 error: effect.error.clone(),
926 created_at: effect.created_at,
927 execute_at: Some(effect.execute_at),
928 claimed_at: effect.claimed_at,
929 last_attempted_at: effect.last_attempted_at,
930 completed_at: effect.completed_at,
931 duration_ms,
932 })
933 })
934 .collect();
935
936 logs.sort_by(|a, b| {
937 let a_time = a
938 .last_attempted_at
939 .or(a.completed_at)
940 .or(a.claimed_at)
941 .unwrap_or(a.created_at);
942 let b_time = b
943 .last_attempted_at
944 .or(b.completed_at)
945 .or(b.claimed_at)
946 .unwrap_or(b.created_at);
947 b_time.cmp(&a_time)
948 });
949 logs.truncate(limit);
950 Ok(logs)
951 }
952
953 async fn get_dead_letters(
954 &self,
955 unresolved_only: bool,
956 limit: usize,
957 ) -> Result<Vec<DeadLetterEntry>> {
958 let mut rows: Vec<_> = self
959 .dead_letter_history
960 .iter()
961 .filter_map(|entry| {
962 let dead = entry.value();
963 if unresolved_only && dead.resolved_at.is_some() {
964 return None;
965 }
966
967 Some(DeadLetterEntry {
968 correlation_id: dead.correlation_id,
969 event_id: dead.event_id,
970 effect_id: dead.effect_id.clone(),
971 event_type: dead.event_type.clone(),
972 event_payload: dead.event_payload.clone(),
973 error: dead.error.clone(),
974 reason: dead.reason.clone(),
975 attempts: dead.attempts,
976 failed_at: dead.failed_at,
977 resolved_at: dead.resolved_at,
978 })
979 })
980 .collect();
981
982 rows.sort_by(|a, b| b.failed_at.cmp(&a.failed_at));
983 rows.truncate(limit);
984 Ok(rows)
985 }
986
987 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
988 let mut workflows: HashMap<Uuid, FailedWorkflow> = HashMap::new();
989
990 for entry in self.effect_history.iter() {
991 let effect = entry.value();
992 let workflow = workflows
993 .entry(effect.correlation_id)
994 .or_insert(FailedWorkflow {
995 correlation_id: effect.correlation_id,
996 failed_effects: 0,
997 active_effects: 0,
998 dead_letters: 0,
999 last_failed_at: None,
1000 last_error: None,
1001 });
1002
1003 match effect.status.as_str() {
1004 "failed" => {
1005 workflow.failed_effects += 1;
1006 let at = effect.last_attempted_at.unwrap_or(effect.created_at);
1007 if workflow
1008 .last_failed_at
1009 .map(|current| at > current)
1010 .unwrap_or(true)
1011 {
1012 workflow.last_failed_at = Some(at);
1013 workflow.last_error = effect.error.clone();
1014 }
1015 }
1016 "pending" | "executing" => {
1017 workflow.active_effects += 1;
1018 }
1019 _ => {}
1020 }
1021 }
1022
1023 for entry in self.dead_letter_history.iter() {
1024 let dead = entry.value();
1025 if dead.resolved_at.is_some() {
1026 continue;
1027 }
1028
1029 let workflow = workflows
1030 .entry(dead.correlation_id)
1031 .or_insert(FailedWorkflow {
1032 correlation_id: dead.correlation_id,
1033 failed_effects: 0,
1034 active_effects: 0,
1035 dead_letters: 0,
1036 last_failed_at: None,
1037 last_error: None,
1038 });
1039
1040 workflow.dead_letters += 1;
1041 if workflow
1042 .last_failed_at
1043 .map(|current| dead.failed_at > current)
1044 .unwrap_or(true)
1045 {
1046 workflow.last_failed_at = Some(dead.failed_at);
1047 workflow.last_error = Some(dead.error.clone());
1048 }
1049 }
1050
1051 let mut rows: Vec<_> = workflows
1052 .into_values()
1053 .filter(|workflow| workflow.failed_effects > 0 || workflow.dead_letters > 0)
1054 .collect();
1055
1056 rows.sort_by(|a, b| b.last_failed_at.cmp(&a.last_failed_at));
1057 rows.truncate(limit);
1058 Ok(rows)
1059 }
1060}
1061
1062impl MemoryStore {
1063 fn build_event_nodes(
1065 &self,
1066 events: &[StoredEvent],
1067 parent_id: Option<Uuid>,
1068 event_ids: &HashSet<Uuid>,
1069 is_root_pass: bool,
1070 ) -> Vec<EventNode> {
1071 events
1072 .iter()
1073 .filter(|event| {
1074 if is_root_pass {
1075 event.parent_id.is_none()
1076 || event
1077 .parent_id
1078 .map(|parent| !event_ids.contains(&parent))
1079 .unwrap_or(false)
1080 } else {
1081 event.parent_id == parent_id
1082 }
1083 })
1084 .map(|event| {
1085 let effects = self
1087 .effect_history
1088 .iter()
1089 .filter(|e| e.value().event_id == event.event_id)
1090 .map(|e| {
1091 let effect = e.value();
1092 EffectNode {
1093 effect_id: effect.effect_id.clone(),
1094 event_id: effect.event_id,
1095 status: effect.status.clone(),
1096 result: effect.result.clone(),
1097 error: effect.error.clone(),
1098 attempts: effect.attempts,
1099 created_at: effect.created_at,
1100 batch_id: effect.batch_id,
1101 batch_index: effect.batch_index,
1102 batch_size: effect.batch_size,
1103 }
1104 })
1105 .collect();
1106
1107 let children =
1109 self.build_event_nodes(events, Some(event.event_id), event_ids, false);
1110
1111 EventNode {
1112 event_id: event.event_id,
1113 event_type: event.event_type.clone(),
1114 payload: event.payload.clone(),
1115 created_at: event.created_at,
1116 batch_id: event.batch_id,
1117 batch_index: event.batch_index,
1118 batch_size: event.batch_size,
1119 children,
1120 effects,
1121 }
1122 })
1123 .collect()
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use super::*;
1130 use seesaw_core::store::Store;
1131
1132 #[tokio::test]
1133 async fn test_insight_events_have_unique_seq() {
1134 let store = MemoryStore::new();
1135
1136 for i in 1..=3 {
1138 let event = QueuedEvent {
1139 id: i as i64,
1140 event_id: Uuid::new_v4(),
1141 parent_id: None,
1142 correlation_id: Uuid::new_v4(),
1143 event_type: format!("Event{}", i),
1144 payload: serde_json::json!({"n": i}),
1145 created_at: Utc::now(),
1146 hops: 0,
1147 batch_id: None,
1148 batch_index: None,
1149 batch_size: None,
1150 };
1151 store.publish(event).await.unwrap();
1152 }
1153
1154 let events = store.get_recent_events(None, 10).await.unwrap();
1156 assert_eq!(events.len(), 3);
1157 assert_eq!(events[0].seq, 1);
1158 assert_eq!(events[1].seq, 2);
1159 assert_eq!(events[2].seq, 3);
1160 }
1161
1162 #[tokio::test]
1163 async fn test_cursor_based_filtering() {
1164 let store = MemoryStore::new();
1165 let correlation_id = Uuid::new_v4();
1166
1167 for i in 1..=5 {
1169 let event = QueuedEvent {
1170 id: i as i64,
1171 event_id: Uuid::new_v4(),
1172 parent_id: None,
1173 correlation_id,
1174 event_type: format!("Event{}", i),
1175 payload: serde_json::json!({"n": i}),
1176 created_at: Utc::now(),
1177 hops: 0,
1178 batch_id: None,
1179 batch_index: None,
1180 batch_size: None,
1181 };
1182 store.publish(event).await.unwrap();
1183 }
1184
1185 let events = store.get_recent_events(None, 2).await.unwrap();
1187 assert_eq!(events.len(), 2);
1188 assert_eq!(events[0].seq, 1);
1189 assert_eq!(events[1].seq, 2);
1190
1191 let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
1193 assert_eq!(next_events.len(), 2);
1194 assert_eq!(next_events[0].seq, 3);
1195 assert_eq!(next_events[1].seq, 4);
1196
1197 let all_seqs: Vec<i64> = events
1199 .iter()
1200 .chain(next_events.iter())
1201 .map(|e| e.seq)
1202 .collect();
1203 assert_eq!(all_seqs, vec![1, 2, 3, 4]);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_no_events_before_cursor() {
1208 let store = MemoryStore::new();
1209
1210 for i in 1..=3 {
1212 let event = QueuedEvent {
1213 id: i as i64,
1214 event_id: Uuid::new_v4(),
1215 parent_id: None,
1216 correlation_id: Uuid::new_v4(),
1217 event_type: format!("Event{}", i),
1218 payload: serde_json::json!({"n": i}),
1219 created_at: Utc::now(),
1220 hops: 0,
1221 batch_id: None,
1222 batch_index: None,
1223 batch_size: None,
1224 };
1225 store.publish(event).await.unwrap();
1226 }
1227
1228 let events = store.get_recent_events(Some(10), 10).await.unwrap();
1230 assert_eq!(
1231 events.len(),
1232 0,
1233 "Should return no events when cursor is beyond all seq values"
1234 );
1235 }
1236
1237 #[tokio::test]
1238 async fn test_workflow_tree_treats_orphan_parent_as_root() {
1239 let store = MemoryStore::new();
1240 let correlation_id = Uuid::new_v4();
1241 let event_id = Uuid::new_v4();
1242 let missing_parent = Uuid::new_v4();
1243
1244 store
1245 .publish(QueuedEvent {
1246 id: 1,
1247 event_id,
1248 parent_id: Some(missing_parent),
1249 correlation_id,
1250 event_type: "OrphanEvent".to_string(),
1251 payload: serde_json::json!({"ok": true}),
1252 created_at: Utc::now(),
1253 hops: 1,
1254 batch_id: None,
1255 batch_index: None,
1256 batch_size: None,
1257 })
1258 .await
1259 .unwrap();
1260
1261 let tree = store.get_workflow_tree(correlation_id).await.unwrap();
1262 assert_eq!(tree.event_count, 1);
1263 assert_eq!(tree.roots.len(), 1);
1264 assert_eq!(tree.roots[0].event_id, event_id);
1265 }
1266
1267 #[tokio::test]
1268 async fn test_dlq_with_batch_metadata_publishes_synthetic_terminal_event() {
1269 let store = MemoryStore::new();
1270 let event_id = Uuid::new_v4();
1271 let correlation_id = Uuid::new_v4();
1272 let batch_id = Uuid::new_v4();
1273
1274 store
1275 .insert_effect_intent(
1276 event_id,
1277 "join_effect".to_string(),
1278 correlation_id,
1279 "BatchItemResult".to_string(),
1280 serde_json::json!({ "index": 2, "ok": false }),
1281 None,
1282 Some(batch_id),
1283 Some(2),
1284 Some(5),
1285 Utc::now(),
1286 30,
1287 1,
1288 10,
1289 )
1290 .await
1291 .expect("insert should succeed");
1292
1293 store
1294 .dlq_effect(
1295 event_id,
1296 "join_effect".to_string(),
1297 "forced failure".to_string(),
1298 "failed".to_string(),
1299 1,
1300 )
1301 .await
1302 .expect("dlq should succeed");
1303
1304 let synthetic = store.poll_next().await.expect("poll should succeed");
1305 assert!(
1306 synthetic.is_some(),
1307 "synthetic terminal event should be published"
1308 );
1309 let synthetic = synthetic.unwrap();
1310 assert_eq!(synthetic.correlation_id, correlation_id);
1311 assert_eq!(synthetic.event_type, "BatchItemResult");
1312 assert_eq!(synthetic.batch_id, Some(batch_id));
1313 assert_eq!(synthetic.batch_index, Some(2));
1314 assert_eq!(synthetic.batch_size, Some(5));
1315 }
1316
1317 #[tokio::test]
1318 async fn test_dlq_without_batch_metadata_does_not_publish_synthetic_terminal_event() {
1319 let store = MemoryStore::new();
1320 let event_id = Uuid::new_v4();
1321 let correlation_id = Uuid::new_v4();
1322
1323 store
1324 .insert_effect_intent(
1325 event_id,
1326 "normal_effect".to_string(),
1327 correlation_id,
1328 "NormalEvent".to_string(),
1329 serde_json::json!({ "ok": true }),
1330 None,
1331 None,
1332 None,
1333 None,
1334 Utc::now(),
1335 30,
1336 1,
1337 10,
1338 )
1339 .await
1340 .expect("insert should succeed");
1341
1342 store
1343 .dlq_effect(
1344 event_id,
1345 "normal_effect".to_string(),
1346 "forced failure".to_string(),
1347 "failed".to_string(),
1348 1,
1349 )
1350 .await
1351 .expect("dlq should succeed");
1352
1353 let next = store.poll_next().await.expect("poll should succeed");
1354 assert!(next.is_none(), "no synthetic terminal event expected");
1355 }
1356}