Skip to main content

seesaw_memory/
lib.rs

1//! In-memory Store implementation for Seesaw
2//!
3//! This is a simple in-memory store suitable for development, testing, and demos.
4//! Not suitable for production use as all data is lost on restart.
5
6use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet, VecDeque};
15use tokio::sync::broadcast;
16
17use std::sync::atomic::{AtomicI64, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21/// In-memory store for workflows
22#[derive(Clone)]
23pub struct MemoryStore {
24    /// Event queue (FIFO per correlation_id)
25    events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26    /// Global event sequence for IDs
27    event_seq: Arc<AtomicI64>,
28    /// Workflow states
29    states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30    /// Effect executions queue
31    effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32    /// Completed effects (for idempotency)
33    completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35    // Insight/observability fields
36    /// Broadcast channel for live events
37    insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38    /// Insight event sequence
39    insight_seq: Arc<AtomicI64>,
40    /// Event history (for tree reconstruction)
41    event_history: Arc<DashMap<Uuid, StoredEvent>>,
42    /// Effect history
43    effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44    /// Dead letter history
45    dead_letter_history: Arc<DashMap<(Uuid, String), StoredDeadLetter>>,
46    /// Durable-ish join windows for same-batch fan-in (in-memory only).
47    join_windows: Arc<Mutex<HashMap<(String, Uuid, Uuid), MemoryJoinWindow>>>,
48}
49
50/// Stored event for history/tree reconstruction
51#[derive(Debug, Clone)]
52struct StoredEvent {
53    seq: i64,
54    event_id: Uuid,
55    parent_id: Option<Uuid>,
56    correlation_id: Uuid,
57    event_type: String,
58    payload: serde_json::Value,
59    hops: i32,
60    retry_count: i32,
61    batch_id: Option<Uuid>,
62    batch_index: Option<i32>,
63    batch_size: Option<i32>,
64    created_at: DateTime<Utc>,
65}
66
67/// Stored effect for history/tree reconstruction
68#[derive(Debug, Clone)]
69struct StoredEffect {
70    effect_id: String,
71    event_id: Uuid,
72    correlation_id: Uuid,
73    event_type: String,
74    event_payload: serde_json::Value,
75    batch_id: Option<Uuid>,
76    batch_index: Option<i32>,
77    batch_size: Option<i32>,
78    status: String,
79    result: Option<serde_json::Value>,
80    error: Option<String>,
81    attempts: i32,
82    created_at: DateTime<Utc>,
83    execute_at: DateTime<Utc>,
84    claimed_at: Option<DateTime<Utc>>,
85    last_attempted_at: Option<DateTime<Utc>>,
86    completed_at: Option<DateTime<Utc>>,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum MemoryJoinStatus {
91    Open,
92    Processing,
93    Completed,
94}
95
96#[derive(Debug, Clone)]
97struct MemoryJoinWindow {
98    target_count: i32,
99    status: MemoryJoinStatus,
100    source_event_ids: HashSet<Uuid>,
101    entries_by_index: HashMap<i32, JoinEntry>,
102}
103
104/// Stored dead letter entry for insight diagnostics.
105#[derive(Debug, Clone)]
106struct StoredDeadLetter {
107    event_id: Uuid,
108    effect_id: String,
109    correlation_id: Uuid,
110    event_type: String,
111    event_payload: serde_json::Value,
112    error: String,
113    reason: String,
114    attempts: i32,
115    failed_at: DateTime<Utc>,
116    resolved_at: Option<DateTime<Utc>>,
117}
118
119impl MemoryStore {
120    pub fn new() -> Self {
121        let (insight_tx, _) = broadcast::channel(1000);
122        Self {
123            events: Arc::new(DashMap::new()),
124            event_seq: Arc::new(AtomicI64::new(1)),
125            states: Arc::new(DashMap::new()),
126            effects: Arc::new(Mutex::new(VecDeque::new())),
127            completed_effects: Arc::new(DashMap::new()),
128            insight_tx: Arc::new(insight_tx),
129            insight_seq: Arc::new(AtomicI64::new(1)),
130            event_history: Arc::new(DashMap::new()),
131            effect_history: Arc::new(DashMap::new()),
132            dead_letter_history: Arc::new(DashMap::new()),
133            join_windows: Arc::new(Mutex::new(HashMap::new())),
134        }
135    }
136
137    /// Publish insight event to broadcast channel
138    fn publish_insight(&self, event: InsightEvent) {
139        // Ignore send errors (no subscribers is fine)
140        let _ = self.insight_tx.send(event);
141    }
142}
143
144impl Default for MemoryStore {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150#[async_trait]
151impl Store for MemoryStore {
152    async fn publish(&self, event: QueuedEvent) -> Result<()> {
153        if self.event_history.contains_key(&event.event_id) {
154            return Ok(());
155        }
156
157        // Generate sequence number
158        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
159
160        // Store in event history with seq
161        self.event_history.insert(
162            event.event_id,
163            StoredEvent {
164                seq,
165                event_id: event.event_id,
166                parent_id: event.parent_id,
167                correlation_id: event.correlation_id,
168                event_type: event.event_type.clone(),
169                payload: event.payload.clone(),
170                hops: event.hops,
171                retry_count: event.retry_count,
172                batch_id: event.batch_id,
173                batch_index: event.batch_index,
174                batch_size: event.batch_size,
175                created_at: event.created_at,
176            },
177        );
178
179        // Publish insight event
180        self.publish_insight(InsightEvent {
181            seq,
182            stream_type: StreamType::EventDispatched,
183            correlation_id: event.correlation_id,
184            event_id: Some(event.event_id),
185            effect_event_id: None,
186            effect_id: None,
187            event_type: Some(event.event_type.clone()),
188            status: None,
189            error: None,
190            payload: Some(serde_json::json!({
191                "event_type": event.event_type.clone(),
192                "hops": event.hops,
193                "batch_id": event.batch_id,
194                "batch_index": event.batch_index,
195                "batch_size": event.batch_size,
196                "payload": event.payload.clone(),
197            })),
198            created_at: event.created_at,
199        });
200
201        // Add to queue
202        let mut queue = self
203            .events
204            .entry(event.correlation_id)
205            .or_insert_with(VecDeque::new);
206        queue.push_back(event);
207        Ok(())
208    }
209
210    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
211        // Simple round-robin across workflows
212        for mut entry in self.events.iter_mut() {
213            if let Some(event) = entry.value_mut().pop_front() {
214                return Ok(Some(event));
215            }
216        }
217        Ok(None)
218    }
219
220    async fn ack(&self, _id: i64) -> Result<()> {
221        // No-op for in-memory (event already removed in poll_next)
222        Ok(())
223    }
224
225    async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
226        // For simplicity, just drop failed events in memory store
227        Ok(())
228    }
229
230    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231    where
232        S: for<'de> Deserialize<'de> + Send,
233    {
234        if let Some(entry) = self.states.get(&correlation_id) {
235            let (json, version) = entry.value();
236            let state: S = serde_json::from_value(json.clone())?;
237            Ok(Some((state, *version)))
238        } else {
239            Ok(None)
240        }
241    }
242
243    async fn save_state<S>(
244        &self,
245        correlation_id: Uuid,
246        state: &S,
247        expected_version: i32,
248    ) -> Result<i32>
249    where
250        S: Serialize + Send + Sync,
251    {
252        let json = serde_json::to_value(state)?;
253        let new_version = expected_version + 1;
254
255        // Simple optimistic locking check
256        if let Some(mut entry) = self.states.get_mut(&correlation_id) {
257            let (_, current_version) = entry.value();
258            if *current_version != expected_version {
259                return Err(anyhow!(
260                    "Version mismatch: expected {} but was {}",
261                    expected_version,
262                    current_version
263                ));
264            }
265            *entry.value_mut() = (json, new_version);
266        } else {
267            self.states.insert(correlation_id, (json, new_version));
268        }
269
270        Ok(new_version)
271    }
272
273    async fn insert_effect_intent(
274        &self,
275        event_id: Uuid,
276        effect_id: String,
277        correlation_id: Uuid,
278        event_type: String,
279        event_payload: serde_json::Value,
280        parent_event_id: Option<Uuid>,
281        batch_id: Option<Uuid>,
282        batch_index: Option<i32>,
283        batch_size: Option<i32>,
284        execute_at: DateTime<Utc>,
285        timeout_seconds: i32,
286        max_attempts: i32,
287        priority: i32,
288    ) -> Result<()> {
289        let execution = QueuedEffectExecution {
290            event_id,
291            effect_id: effect_id.clone(),
292            correlation_id,
293            event_type,
294            event_payload,
295            parent_event_id,
296            batch_id,
297            batch_index,
298            batch_size,
299            execute_at,
300            timeout_seconds,
301            max_attempts,
302            priority,
303            attempts: 0,
304        };
305
306        // Store in effect history
307        let now = Utc::now();
308        self.effect_history.insert(
309            (event_id, effect_id.clone()),
310            StoredEffect {
311                effect_id: effect_id.clone(),
312                event_id,
313                correlation_id,
314                event_type: execution.event_type.clone(),
315                event_payload: execution.event_payload.clone(),
316                batch_id: execution.batch_id,
317                batch_index: execution.batch_index,
318                batch_size: execution.batch_size,
319                status: "pending".to_string(),
320                result: None,
321                error: None,
322                attempts: 0,
323                created_at: now,
324                execute_at,
325                claimed_at: None,
326                last_attempted_at: None,
327                completed_at: None,
328            },
329        );
330
331        // Publish insight event
332        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
333        self.publish_insight(InsightEvent {
334            seq,
335            stream_type: StreamType::EffectStarted,
336            correlation_id,
337            event_id: None,
338            effect_event_id: Some(event_id),
339            effect_id: Some(effect_id),
340            event_type: None,
341            status: Some("pending".to_string()),
342            error: None,
343            payload: None,
344            created_at: now,
345        });
346
347        let mut queue = self.effects.lock();
348        queue.push_back(execution);
349        Ok(())
350    }
351
352    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
353        let mut queue = self.effects.lock();
354
355        // Find first effect that's ready to execute
356        let now = Utc::now();
357        if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
358            if let Some(next) = queue.remove(pos) {
359                if let Some(mut effect) = self
360                    .effect_history
361                    .get_mut(&(next.event_id, next.effect_id.clone()))
362                {
363                    effect.status = "executing".to_string();
364                    effect.claimed_at = Some(now);
365                    effect.last_attempted_at = Some(now);
366                    effect.attempts = next.attempts + 1;
367                }
368                Ok(Some(next))
369            } else {
370                Ok(None)
371            }
372        } else {
373            Ok(None)
374        }
375    }
376
377    async fn complete_effect(
378        &self,
379        event_id: Uuid,
380        effect_id: String,
381        result: serde_json::Value,
382    ) -> Result<()> {
383        self.completed_effects
384            .insert((event_id, effect_id.clone()), result.clone());
385
386        // Update effect history
387        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
388            entry.status = "completed".to_string();
389            entry.result = Some(result.clone());
390            let completed_at = Utc::now();
391            entry.completed_at = Some(completed_at);
392            entry.last_attempted_at = Some(completed_at);
393
394            // Publish insight event
395            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
396            self.publish_insight(InsightEvent {
397                seq,
398                stream_type: StreamType::EffectCompleted,
399                correlation_id: entry.correlation_id,
400                event_id: None,
401                effect_event_id: Some(event_id),
402                effect_id: Some(effect_id),
403                event_type: None,
404                status: Some("completed".to_string()),
405                error: None,
406                payload: Some(result),
407                created_at: completed_at,
408            });
409        }
410
411        Ok(())
412    }
413
414    async fn complete_effect_with_events(
415        &self,
416        event_id: Uuid,
417        effect_id: String,
418        result: serde_json::Value,
419        emitted_events: Vec<EmittedEvent>,
420    ) -> Result<()> {
421        // Mark effect complete
422        self.complete_effect(event_id, effect_id.clone(), result)
423            .await?;
424
425        let correlation_id = self
426            .effect_history
427            .get(&(event_id, effect_id.clone()))
428            .map(|entry| entry.correlation_id)
429            .or_else(|| {
430                self.event_history
431                    .get(&event_id)
432                    .map(|entry| entry.correlation_id)
433            })
434            .unwrap_or(event_id);
435        let parent_hops = self
436            .event_history
437            .get(&event_id)
438            .map(|entry| entry.hops)
439            .unwrap_or(0);
440
441        // Publish emitted events
442        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
443            let new_event_id = Uuid::new_v5(
444                &NAMESPACE_SEESAW,
445                format!(
446                    "{}-{}-{}-{}",
447                    event_id, effect_id, emitted.event_type, emitted_index
448                )
449                .as_bytes(),
450            );
451
452            let queued = QueuedEvent {
453                id: self.event_seq.fetch_add(1, Ordering::SeqCst),
454                event_id: new_event_id,
455                parent_id: Some(event_id),
456                correlation_id,
457                event_type: emitted.event_type,
458                payload: emitted.payload,
459                hops: parent_hops + 1,
460                retry_count: 0,
461                batch_id: emitted.batch_id,
462                batch_index: emitted.batch_index,
463                batch_size: emitted.batch_size,
464                created_at: Utc::now(),
465            };
466
467            self.publish(queued).await?;
468        }
469
470        Ok(())
471    }
472
473    async fn fail_effect(
474        &self,
475        event_id: Uuid,
476        effect_id: String,
477        error: String,
478        _retry_after_secs: i32,
479    ) -> Result<()> {
480        // Update effect history
481        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
482            let failed_at = Utc::now();
483            entry.status = "failed".to_string();
484            entry.error = Some(error.clone());
485            entry.attempts += 1;
486            entry.last_attempted_at = Some(failed_at);
487
488            // Publish insight event
489            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
490            self.publish_insight(InsightEvent {
491                seq,
492                stream_type: StreamType::EffectFailed,
493                correlation_id: entry.correlation_id,
494                event_id: None,
495                effect_event_id: Some(event_id),
496                effect_id: Some(effect_id),
497                event_type: None,
498                status: Some("failed".to_string()),
499                error: Some(error.clone()),
500                payload: None,
501                created_at: failed_at,
502            });
503        }
504
505        eprintln!("Effect failed: {}", error);
506        Ok(())
507    }
508
509    async fn dlq_effect(
510        &self,
511        event_id: Uuid,
512        effect_id: String,
513        error: String,
514        error_type: String,
515        attempts: i32,
516    ) -> Result<()> {
517        self.dlq_effect_with_events(event_id, effect_id, error, error_type, attempts, Vec::new())
518            .await
519    }
520
521    async fn dlq_effect_with_events(
522        &self,
523        event_id: Uuid,
524        effect_id: String,
525        error: String,
526        error_type: String,
527        attempts: i32,
528        emitted_events: Vec<EmittedEvent>,
529    ) -> Result<()> {
530        let effect_snapshot =
531            self.effect_history
532                .get(&(event_id, effect_id.clone()))
533                .map(|entry| {
534                    (
535                        entry.correlation_id,
536                        entry.event_type.clone(),
537                        entry.event_payload.clone(),
538                        entry.batch_id,
539                        entry.batch_index,
540                        entry.batch_size,
541                    )
542                });
543        let event_snapshot = self.event_history.get(&event_id).map(|event| {
544            (
545                event.correlation_id,
546                event.event_type.clone(),
547                event.payload.clone(),
548                event.batch_id,
549                event.batch_index,
550                event.batch_size,
551                event.hops,
552            )
553        });
554
555        let (correlation_id, event_type, event_payload, batch_id, batch_index, batch_size) =
556            if let Some(snapshot) = effect_snapshot.clone() {
557                (
558                    snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
559                )
560            } else if let Some(snapshot) = event_snapshot.clone() {
561                (
562                    snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
563                )
564            } else {
565                (
566                    event_id,
567                    "unknown".to_string(),
568                    serde_json::Value::Null,
569                    None,
570                    None,
571                    None,
572                )
573            };
574
575        if let Some(mut effect) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
576            effect.status = "failed".to_string();
577            effect.error = Some(error.clone());
578            effect.attempts = attempts;
579            effect.last_attempted_at = Some(Utc::now());
580        }
581
582        self.dead_letter_history.insert(
583            (event_id, effect_id.clone()),
584            StoredDeadLetter {
585                event_id,
586                effect_id: effect_id.clone(),
587                correlation_id,
588                event_type: event_type.clone(),
589                event_payload: event_payload.clone(),
590                error: error.clone(),
591                reason: error_type,
592                attempts,
593                failed_at: Utc::now(),
594                resolved_at: None,
595            },
596        );
597
598        let parent_hops = event_snapshot.map(|snapshot| snapshot.6).unwrap_or(0);
599        if emitted_events.is_empty() {
600            if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
601                (batch_id, batch_index, batch_size)
602            {
603                let synthetic_event_id = Uuid::new_v5(
604                    &NAMESPACE_SEESAW,
605                    format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
606                );
607                self.publish(QueuedEvent {
608                    id: self.event_seq.fetch_add(1, Ordering::SeqCst),
609                    event_id: synthetic_event_id,
610                    parent_id: Some(event_id),
611                    correlation_id,
612                    event_type: event_type.clone(),
613                    payload: event_payload.clone(),
614                    hops: parent_hops + 1,
615                    retry_count: 0,
616                    batch_id: Some(batch_id),
617                    batch_index: Some(batch_index),
618                    batch_size: Some(batch_size),
619                    created_at: Utc::now(),
620                })
621                .await?;
622            }
623        } else {
624            for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
625                let synthetic_event_id = Uuid::new_v5(
626                    &NAMESPACE_SEESAW,
627                    format!(
628                        "{}-{}-dlq-terminal-{}-{}",
629                        event_id, effect_id, emitted.event_type, emitted_index
630                    )
631                    .as_bytes(),
632                );
633                self.publish(QueuedEvent {
634                    id: self.event_seq.fetch_add(1, Ordering::SeqCst),
635                    event_id: synthetic_event_id,
636                    parent_id: Some(event_id),
637                    correlation_id,
638                    event_type: emitted.event_type,
639                    payload: emitted.payload,
640                    hops: parent_hops + 1,
641                    retry_count: 0,
642                    batch_id: emitted.batch_id.or(batch_id),
643                    batch_index: emitted.batch_index.or(batch_index),
644                    batch_size: emitted.batch_size.or(batch_size),
645                    created_at: Utc::now(),
646                })
647                .await?;
648            }
649        }
650
651        eprintln!(
652            "Effect sent to DLQ: {}:{} - {} (attempts: {})",
653            event_id, effect_id, error, attempts
654        );
655        Ok(())
656    }
657
658    async fn join_same_batch_append_and_maybe_claim(
659        &self,
660        join_effect_id: String,
661        correlation_id: Uuid,
662        source_event_id: Uuid,
663        source_event_type: String,
664        source_payload: serde_json::Value,
665        source_created_at: DateTime<Utc>,
666        batch_id: Uuid,
667        batch_index: i32,
668        batch_size: i32,
669    ) -> Result<Option<Vec<JoinEntry>>> {
670        let key = (join_effect_id.clone(), correlation_id, batch_id);
671        let mut windows = self.join_windows.lock();
672        let window = windows.entry(key).or_insert_with(|| MemoryJoinWindow {
673            target_count: batch_size,
674            status: MemoryJoinStatus::Open,
675            source_event_ids: HashSet::new(),
676            entries_by_index: HashMap::new(),
677        });
678
679        if window.status == MemoryJoinStatus::Completed {
680            return Ok(None);
681        }
682
683        if window.target_count != batch_size {
684            window.target_count = batch_size;
685        }
686
687        let already_seen_source = !window.source_event_ids.insert(source_event_id);
688        if !already_seen_source {
689            window
690                .entries_by_index
691                .entry(batch_index)
692                .or_insert_with(|| JoinEntry {
693                    source_event_id,
694                    event_type: source_event_type,
695                    payload: source_payload,
696                    batch_id,
697                    batch_index,
698                    batch_size,
699                    created_at: source_created_at,
700                });
701        }
702
703        let ready = window.entries_by_index.len() as i32 >= window.target_count;
704        if ready && window.status == MemoryJoinStatus::Open {
705            window.status = MemoryJoinStatus::Processing;
706            let mut ordered = window
707                .entries_by_index
708                .values()
709                .cloned()
710                .collect::<Vec<_>>();
711            ordered.sort_by_key(|entry| entry.batch_index);
712            return Ok(Some(ordered));
713        }
714
715        Ok(None)
716    }
717
718    async fn join_same_batch_complete(
719        &self,
720        join_effect_id: String,
721        correlation_id: Uuid,
722        batch_id: Uuid,
723    ) -> Result<()> {
724        let key = (join_effect_id, correlation_id, batch_id);
725        if let Some(window) = self.join_windows.lock().get_mut(&key) {
726            window.status = MemoryJoinStatus::Completed;
727        }
728        self.join_windows.lock().remove(&key);
729        Ok(())
730    }
731
732    async fn join_same_batch_release(
733        &self,
734        join_effect_id: String,
735        correlation_id: Uuid,
736        batch_id: Uuid,
737        _error: String,
738    ) -> Result<()> {
739        let key = (join_effect_id, correlation_id, batch_id);
740        if let Some(window) = self.join_windows.lock().get_mut(&key) {
741            if window.status == MemoryJoinStatus::Processing {
742                window.status = MemoryJoinStatus::Open;
743            }
744        }
745        Ok(())
746    }
747
748    async fn subscribe_workflow_events(
749        &self,
750        _correlation_id: Uuid,
751    ) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
752        // In-memory store doesn't support subscriptions
753        Err(anyhow!("Subscriptions not supported in memory store"))
754    }
755
756    async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
757        // Check if any events or effects are pending
758        let has_events = self
759            .events
760            .get(&correlation_id)
761            .map(|q| !q.is_empty())
762            .unwrap_or(false);
763        let state = self
764            .states
765            .get(&correlation_id)
766            .map(|entry| entry.value().0.clone());
767        let pending_effects = 0i64; // Simplified
768
769        Ok(WorkflowStatus {
770            correlation_id,
771            state,
772            pending_effects,
773            is_settled: !has_events && pending_effects == 0,
774            last_event: None, // Could track this if needed
775        })
776    }
777}
778
779#[async_trait]
780impl InsightStore for MemoryStore {
781    async fn subscribe_events(
782        &self,
783    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
784        let mut rx = self.insight_tx.subscribe();
785        let stream = async_stream::stream! {
786            while let Ok(event) = rx.recv().await {
787                yield event;
788            }
789        };
790
791        Ok(Box::new(Box::pin(stream)))
792    }
793
794    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
795        // Find all events for this correlation
796        let mut events: Vec<_> = self
797            .event_history
798            .iter()
799            .filter(|e| e.value().correlation_id == correlation_id)
800            .map(|e| e.value().clone())
801            .collect();
802
803        events.sort_by_key(|e| e.created_at);
804
805        // Build tree. Treat events with missing parents as roots so partially
806        // captured workflows still render.
807        let event_ids: HashSet<Uuid> = events.iter().map(|e| e.event_id).collect();
808        let roots = self.build_event_nodes(&events, None, &event_ids, true);
809
810        // Get state
811        let state = self
812            .states
813            .get(&correlation_id)
814            .map(|entry| entry.value().0.clone());
815
816        Ok(WorkflowTree {
817            correlation_id,
818            roots,
819            state,
820            event_count: events.len(),
821            effect_count: self
822                .effect_history
823                .iter()
824                .filter(|e| e.value().correlation_id == correlation_id)
825                .count(),
826        })
827    }
828
829    async fn get_stats(&self) -> Result<InsightStats> {
830        let total_events = self.event_history.len() as i64;
831
832        let mut active_effects = 0i64;
833        let mut completed_effects = 0i64;
834        let mut failed_effects = 0i64;
835
836        for entry in self.effect_history.iter() {
837            match entry.value().status.as_str() {
838                "pending" | "executing" => active_effects += 1,
839                "completed" => completed_effects += 1,
840                "failed" => failed_effects += 1,
841                _ => {}
842            }
843        }
844
845        Ok(InsightStats {
846            total_events,
847            active_effects,
848            completed_effects,
849            failed_effects,
850        })
851    }
852
853    async fn get_recent_events(
854        &self,
855        cursor: Option<i64>,
856        limit: usize,
857    ) -> Result<Vec<InsightEvent>> {
858        // Get events from history with proper seq
859        let mut events: Vec<_> = self
860            .event_history
861            .iter()
862            .filter_map(|e| {
863                let stored = e.value();
864                // Filter by cursor if provided
865                if let Some(cursor_seq) = cursor {
866                    if stored.seq <= cursor_seq {
867                        return None;
868                    }
869                }
870                Some(InsightEvent {
871                    seq: stored.seq,
872                    stream_type: StreamType::EventDispatched,
873                    correlation_id: stored.correlation_id,
874                    event_id: Some(stored.event_id),
875                    effect_event_id: None,
876                    effect_id: None,
877                    event_type: Some(stored.event_type.clone()),
878                    status: None,
879                    error: None,
880                    payload: Some(serde_json::json!({
881                        "event_type": stored.event_type.clone(),
882                        "hops": stored.hops,
883                        "batch_id": stored.batch_id,
884                        "batch_index": stored.batch_index,
885                        "batch_size": stored.batch_size,
886                        "payload": stored.payload.clone(),
887                    })),
888                    created_at: stored.created_at,
889                })
890            })
891            .collect();
892
893        // Sort by seq (oldest first for consistent cursor pagination)
894        events.sort_by_key(|e| e.seq);
895        events.truncate(limit);
896
897        Ok(events)
898    }
899
900    async fn get_effect_logs(
901        &self,
902        correlation_id: Option<Uuid>,
903        limit: usize,
904    ) -> Result<Vec<EffectExecutionLog>> {
905        let mut logs: Vec<_> = self
906            .effect_history
907            .iter()
908            .filter_map(|entry| {
909                let effect = entry.value();
910                if let Some(filter) = correlation_id {
911                    if effect.correlation_id != filter {
912                        return None;
913                    }
914                }
915
916                let started_at = effect.claimed_at.or(effect.last_attempted_at);
917                let duration_ms = match (started_at, effect.completed_at) {
918                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
919                    _ => None,
920                };
921
922                Some(EffectExecutionLog {
923                    correlation_id: effect.correlation_id,
924                    event_id: effect.event_id,
925                    effect_id: effect.effect_id.clone(),
926                    status: effect.status.clone(),
927                    attempts: effect.attempts,
928                    event_type: Some(effect.event_type.clone()),
929                    result: effect.result.clone(),
930                    error: effect.error.clone(),
931                    created_at: effect.created_at,
932                    execute_at: Some(effect.execute_at),
933                    claimed_at: effect.claimed_at,
934                    last_attempted_at: effect.last_attempted_at,
935                    completed_at: effect.completed_at,
936                    duration_ms,
937                })
938            })
939            .collect();
940
941        logs.sort_by(|a, b| {
942            let a_time = a
943                .last_attempted_at
944                .or(a.completed_at)
945                .or(a.claimed_at)
946                .unwrap_or(a.created_at);
947            let b_time = b
948                .last_attempted_at
949                .or(b.completed_at)
950                .or(b.claimed_at)
951                .unwrap_or(b.created_at);
952            b_time.cmp(&a_time)
953        });
954        logs.truncate(limit);
955        Ok(logs)
956    }
957
958    async fn get_dead_letters(
959        &self,
960        unresolved_only: bool,
961        limit: usize,
962    ) -> Result<Vec<DeadLetterEntry>> {
963        let mut rows: Vec<_> = self
964            .dead_letter_history
965            .iter()
966            .filter_map(|entry| {
967                let dead = entry.value();
968                if unresolved_only && dead.resolved_at.is_some() {
969                    return None;
970                }
971
972                Some(DeadLetterEntry {
973                    correlation_id: dead.correlation_id,
974                    event_id: dead.event_id,
975                    effect_id: dead.effect_id.clone(),
976                    event_type: dead.event_type.clone(),
977                    event_payload: dead.event_payload.clone(),
978                    error: dead.error.clone(),
979                    reason: dead.reason.clone(),
980                    attempts: dead.attempts,
981                    failed_at: dead.failed_at,
982                    resolved_at: dead.resolved_at,
983                })
984            })
985            .collect();
986
987        rows.sort_by(|a, b| b.failed_at.cmp(&a.failed_at));
988        rows.truncate(limit);
989        Ok(rows)
990    }
991
992    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
993        let mut workflows: HashMap<Uuid, FailedWorkflow> = HashMap::new();
994
995        for entry in self.effect_history.iter() {
996            let effect = entry.value();
997            let workflow = workflows
998                .entry(effect.correlation_id)
999                .or_insert(FailedWorkflow {
1000                    correlation_id: effect.correlation_id,
1001                    failed_effects: 0,
1002                    active_effects: 0,
1003                    dead_letters: 0,
1004                    last_failed_at: None,
1005                    last_error: None,
1006                });
1007
1008            match effect.status.as_str() {
1009                "failed" => {
1010                    workflow.failed_effects += 1;
1011                    let at = effect.last_attempted_at.unwrap_or(effect.created_at);
1012                    if workflow
1013                        .last_failed_at
1014                        .map(|current| at > current)
1015                        .unwrap_or(true)
1016                    {
1017                        workflow.last_failed_at = Some(at);
1018                        workflow.last_error = effect.error.clone();
1019                    }
1020                }
1021                "pending" | "executing" => {
1022                    workflow.active_effects += 1;
1023                }
1024                _ => {}
1025            }
1026        }
1027
1028        for entry in self.dead_letter_history.iter() {
1029            let dead = entry.value();
1030            if dead.resolved_at.is_some() {
1031                continue;
1032            }
1033
1034            let workflow = workflows
1035                .entry(dead.correlation_id)
1036                .or_insert(FailedWorkflow {
1037                    correlation_id: dead.correlation_id,
1038                    failed_effects: 0,
1039                    active_effects: 0,
1040                    dead_letters: 0,
1041                    last_failed_at: None,
1042                    last_error: None,
1043                });
1044
1045            workflow.dead_letters += 1;
1046            if workflow
1047                .last_failed_at
1048                .map(|current| dead.failed_at > current)
1049                .unwrap_or(true)
1050            {
1051                workflow.last_failed_at = Some(dead.failed_at);
1052                workflow.last_error = Some(dead.error.clone());
1053            }
1054        }
1055
1056        let mut rows: Vec<_> = workflows
1057            .into_values()
1058            .filter(|workflow| workflow.failed_effects > 0 || workflow.dead_letters > 0)
1059            .collect();
1060
1061        rows.sort_by(|a, b| b.last_failed_at.cmp(&a.last_failed_at));
1062        rows.truncate(limit);
1063        Ok(rows)
1064    }
1065}
1066
1067impl MemoryStore {
1068    /// Build event nodes recursively
1069    fn build_event_nodes(
1070        &self,
1071        events: &[StoredEvent],
1072        parent_id: Option<Uuid>,
1073        event_ids: &HashSet<Uuid>,
1074        is_root_pass: bool,
1075    ) -> Vec<EventNode> {
1076        events
1077            .iter()
1078            .filter(|event| {
1079                if is_root_pass {
1080                    event.parent_id.is_none()
1081                        || event
1082                            .parent_id
1083                            .map(|parent| !event_ids.contains(&parent))
1084                            .unwrap_or(false)
1085                } else {
1086                    event.parent_id == parent_id
1087                }
1088            })
1089            .map(|event| {
1090                // Get effects for this event
1091                let effects = self
1092                    .effect_history
1093                    .iter()
1094                    .filter(|e| e.value().event_id == event.event_id)
1095                    .map(|e| {
1096                        let effect = e.value();
1097                        EffectNode {
1098                            effect_id: effect.effect_id.clone(),
1099                            event_id: effect.event_id,
1100                            status: effect.status.clone(),
1101                            result: effect.result.clone(),
1102                            error: effect.error.clone(),
1103                            attempts: effect.attempts,
1104                            created_at: effect.created_at,
1105                            batch_id: effect.batch_id,
1106                            batch_index: effect.batch_index,
1107                            batch_size: effect.batch_size,
1108                        }
1109                    })
1110                    .collect();
1111
1112                // Recursively build children
1113                let children =
1114                    self.build_event_nodes(events, Some(event.event_id), event_ids, false);
1115
1116                EventNode {
1117                    event_id: event.event_id,
1118                    event_type: event.event_type.clone(),
1119                    payload: event.payload.clone(),
1120                    created_at: event.created_at,
1121                    batch_id: event.batch_id,
1122                    batch_index: event.batch_index,
1123                    batch_size: event.batch_size,
1124                    children,
1125                    effects,
1126                }
1127            })
1128            .collect()
1129    }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134    use super::*;
1135    use seesaw_core::store::Store;
1136
1137    #[tokio::test]
1138    async fn test_insight_events_have_unique_seq() {
1139        let store = MemoryStore::new();
1140
1141        // Publish 3 events
1142        for i in 1..=3 {
1143            let event = QueuedEvent {
1144                id: i as i64,
1145                event_id: Uuid::new_v4(),
1146                parent_id: None,
1147                correlation_id: Uuid::new_v4(),
1148                event_type: format!("Event{}", i),
1149                payload: serde_json::json!({"n": i}),
1150                created_at: Utc::now(),
1151                hops: 0,
1152                retry_count: 0,
1153                batch_id: None,
1154                batch_index: None,
1155                batch_size: None,
1156            };
1157            store.publish(event).await.unwrap();
1158        }
1159
1160        // Get recent events - should have seq 1, 2, 3
1161        let events = store.get_recent_events(None, 10).await.unwrap();
1162        assert_eq!(events.len(), 3);
1163        assert_eq!(events[0].seq, 1);
1164        assert_eq!(events[1].seq, 2);
1165        assert_eq!(events[2].seq, 3);
1166    }
1167
1168    #[tokio::test]
1169    async fn test_cursor_based_filtering() {
1170        let store = MemoryStore::new();
1171        let correlation_id = Uuid::new_v4();
1172
1173        // Publish 5 events
1174        for i in 1..=5 {
1175            let event = QueuedEvent {
1176                id: i as i64,
1177                event_id: Uuid::new_v4(),
1178                parent_id: None,
1179                correlation_id,
1180                event_type: format!("Event{}", i),
1181                payload: serde_json::json!({"n": i}),
1182                created_at: Utc::now(),
1183                hops: 0,
1184                retry_count: 0,
1185                batch_id: None,
1186                batch_index: None,
1187                batch_size: None,
1188            };
1189            store.publish(event).await.unwrap();
1190        }
1191
1192        // Get first 2 events (no cursor)
1193        let events = store.get_recent_events(None, 2).await.unwrap();
1194        assert_eq!(events.len(), 2);
1195        assert_eq!(events[0].seq, 1);
1196        assert_eq!(events[1].seq, 2);
1197
1198        // Get next events after cursor=2
1199        let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
1200        assert_eq!(next_events.len(), 2);
1201        assert_eq!(next_events[0].seq, 3);
1202        assert_eq!(next_events[1].seq, 4);
1203
1204        // Verify no duplicates between historical and cursor fetch
1205        let all_seqs: Vec<i64> = events
1206            .iter()
1207            .chain(next_events.iter())
1208            .map(|e| e.seq)
1209            .collect();
1210        assert_eq!(all_seqs, vec![1, 2, 3, 4]);
1211    }
1212
1213    #[tokio::test]
1214    async fn test_no_events_before_cursor() {
1215        let store = MemoryStore::new();
1216
1217        // Publish 3 events
1218        for i in 1..=3 {
1219            let event = QueuedEvent {
1220                id: i as i64,
1221                event_id: Uuid::new_v4(),
1222                parent_id: None,
1223                correlation_id: Uuid::new_v4(),
1224                event_type: format!("Event{}", i),
1225                payload: serde_json::json!({"n": i}),
1226                created_at: Utc::now(),
1227                hops: 0,
1228                retry_count: 0,
1229                batch_id: None,
1230                batch_index: None,
1231                batch_size: None,
1232            };
1233            store.publish(event).await.unwrap();
1234        }
1235
1236        // Request events after cursor=10 (beyond all events)
1237        let events = store.get_recent_events(Some(10), 10).await.unwrap();
1238        assert_eq!(
1239            events.len(),
1240            0,
1241            "Should return no events when cursor is beyond all seq values"
1242        );
1243    }
1244
1245    #[tokio::test]
1246    async fn test_workflow_tree_treats_orphan_parent_as_root() {
1247        let store = MemoryStore::new();
1248        let correlation_id = Uuid::new_v4();
1249        let event_id = Uuid::new_v4();
1250        let missing_parent = Uuid::new_v4();
1251
1252        store
1253            .publish(QueuedEvent {
1254                id: 1,
1255                event_id,
1256                parent_id: Some(missing_parent),
1257                correlation_id,
1258                event_type: "OrphanEvent".to_string(),
1259                payload: serde_json::json!({"ok": true}),
1260                created_at: Utc::now(),
1261                hops: 1,
1262                retry_count: 0,
1263                batch_id: None,
1264                batch_index: None,
1265                batch_size: None,
1266            })
1267            .await
1268            .unwrap();
1269
1270        let tree = store.get_workflow_tree(correlation_id).await.unwrap();
1271        assert_eq!(tree.event_count, 1);
1272        assert_eq!(tree.roots.len(), 1);
1273        assert_eq!(tree.roots[0].event_id, event_id);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_dlq_with_batch_metadata_publishes_synthetic_terminal_event() {
1278        let store = MemoryStore::new();
1279        let event_id = Uuid::new_v4();
1280        let correlation_id = Uuid::new_v4();
1281        let batch_id = Uuid::new_v4();
1282
1283        store
1284            .insert_effect_intent(
1285                event_id,
1286                "join_effect".to_string(),
1287                correlation_id,
1288                "BatchItemResult".to_string(),
1289                serde_json::json!({ "index": 2, "ok": false }),
1290                None,
1291                Some(batch_id),
1292                Some(2),
1293                Some(5),
1294                Utc::now(),
1295                30,
1296                1,
1297                10,
1298            )
1299            .await
1300            .expect("insert should succeed");
1301
1302        store
1303            .dlq_effect(
1304                event_id,
1305                "join_effect".to_string(),
1306                "forced failure".to_string(),
1307                "failed".to_string(),
1308                1,
1309            )
1310            .await
1311            .expect("dlq should succeed");
1312
1313        let synthetic = store.poll_next().await.expect("poll should succeed");
1314        assert!(
1315            synthetic.is_some(),
1316            "synthetic terminal event should be published"
1317        );
1318        let synthetic = synthetic.unwrap();
1319        assert_eq!(synthetic.correlation_id, correlation_id);
1320        assert_eq!(synthetic.event_type, "BatchItemResult");
1321        assert_eq!(synthetic.batch_id, Some(batch_id));
1322        assert_eq!(synthetic.batch_index, Some(2));
1323        assert_eq!(synthetic.batch_size, Some(5));
1324    }
1325
1326    #[tokio::test]
1327    async fn test_dlq_without_batch_metadata_does_not_publish_synthetic_terminal_event() {
1328        let store = MemoryStore::new();
1329        let event_id = Uuid::new_v4();
1330        let correlation_id = Uuid::new_v4();
1331
1332        store
1333            .insert_effect_intent(
1334                event_id,
1335                "normal_effect".to_string(),
1336                correlation_id,
1337                "NormalEvent".to_string(),
1338                serde_json::json!({ "ok": true }),
1339                None,
1340                None,
1341                None,
1342                None,
1343                Utc::now(),
1344                30,
1345                1,
1346                10,
1347            )
1348            .await
1349            .expect("insert should succeed");
1350
1351        store
1352            .dlq_effect(
1353                event_id,
1354                "normal_effect".to_string(),
1355                "forced failure".to_string(),
1356                "failed".to_string(),
1357                1,
1358            )
1359            .await
1360            .expect("dlq should succeed");
1361
1362        let next = store.poll_next().await.expect("poll should succeed");
1363        assert!(next.is_none(), "no synthetic terminal event expected");
1364    }
1365}