Skip to main content

seesaw_memory/
lib.rs

1//! In-memory Store implementation for Seesaw
2//!
3//! This is a simple in-memory store suitable for development, testing, and demos.
4//! Not suitable for production use as all data is lost on restart.
5
6use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet, VecDeque};
15use tokio::sync::broadcast;
16
17use std::sync::atomic::{AtomicI64, Ordering};
18use std::sync::Arc;
19use uuid::Uuid;
20
21/// In-memory store for workflows
22#[derive(Clone)]
23pub struct MemoryStore {
24    /// Event queue (FIFO per correlation_id)
25    events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26    /// Global event sequence for IDs
27    event_seq: Arc<AtomicI64>,
28    /// Workflow states
29    states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30    /// Effect executions queue
31    effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32    /// Completed effects (for idempotency)
33    completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35    // Insight/observability fields
36    /// Broadcast channel for live events
37    insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38    /// Insight event sequence
39    insight_seq: Arc<AtomicI64>,
40    /// Event history (for tree reconstruction)
41    event_history: Arc<DashMap<Uuid, StoredEvent>>,
42    /// Effect history
43    effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44    /// Dead letter history
45    dead_letter_history: Arc<DashMap<(Uuid, String), StoredDeadLetter>>,
46    /// Durable-ish join windows for same-batch fan-in (in-memory only).
47    join_windows: Arc<Mutex<HashMap<(String, Uuid, Uuid), MemoryJoinWindow>>>,
48}
49
50/// Stored event for history/tree reconstruction
51#[derive(Debug, Clone)]
52struct StoredEvent {
53    seq: i64,
54    event_id: Uuid,
55    parent_id: Option<Uuid>,
56    correlation_id: Uuid,
57    event_type: String,
58    payload: serde_json::Value,
59    hops: i32,
60    batch_id: Option<Uuid>,
61    batch_index: Option<i32>,
62    batch_size: Option<i32>,
63    created_at: DateTime<Utc>,
64}
65
66/// Stored effect for history/tree reconstruction
67#[derive(Debug, Clone)]
68struct StoredEffect {
69    effect_id: String,
70    event_id: Uuid,
71    correlation_id: Uuid,
72    event_type: String,
73    event_payload: serde_json::Value,
74    batch_id: Option<Uuid>,
75    batch_index: Option<i32>,
76    batch_size: Option<i32>,
77    status: String,
78    result: Option<serde_json::Value>,
79    error: Option<String>,
80    attempts: i32,
81    created_at: DateTime<Utc>,
82    execute_at: DateTime<Utc>,
83    claimed_at: Option<DateTime<Utc>>,
84    last_attempted_at: Option<DateTime<Utc>>,
85    completed_at: Option<DateTime<Utc>>,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89enum MemoryJoinStatus {
90    Open,
91    Processing,
92    Completed,
93}
94
95#[derive(Debug, Clone)]
96struct MemoryJoinWindow {
97    target_count: i32,
98    status: MemoryJoinStatus,
99    source_event_ids: HashSet<Uuid>,
100    entries_by_index: HashMap<i32, JoinEntry>,
101}
102
103/// Stored dead letter entry for insight diagnostics.
104#[derive(Debug, Clone)]
105struct StoredDeadLetter {
106    event_id: Uuid,
107    effect_id: String,
108    correlation_id: Uuid,
109    event_type: String,
110    event_payload: serde_json::Value,
111    error: String,
112    reason: String,
113    attempts: i32,
114    failed_at: DateTime<Utc>,
115    resolved_at: Option<DateTime<Utc>>,
116}
117
118impl MemoryStore {
119    pub fn new() -> Self {
120        let (insight_tx, _) = broadcast::channel(1000);
121        Self {
122            events: Arc::new(DashMap::new()),
123            event_seq: Arc::new(AtomicI64::new(1)),
124            states: Arc::new(DashMap::new()),
125            effects: Arc::new(Mutex::new(VecDeque::new())),
126            completed_effects: Arc::new(DashMap::new()),
127            insight_tx: Arc::new(insight_tx),
128            insight_seq: Arc::new(AtomicI64::new(1)),
129            event_history: Arc::new(DashMap::new()),
130            effect_history: Arc::new(DashMap::new()),
131            dead_letter_history: Arc::new(DashMap::new()),
132            join_windows: Arc::new(Mutex::new(HashMap::new())),
133        }
134    }
135
136    /// Publish insight event to broadcast channel
137    fn publish_insight(&self, event: InsightEvent) {
138        // Ignore send errors (no subscribers is fine)
139        let _ = self.insight_tx.send(event);
140    }
141}
142
143impl Default for MemoryStore {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149#[async_trait]
150impl Store for MemoryStore {
151    async fn publish(&self, event: QueuedEvent) -> Result<()> {
152        if self.event_history.contains_key(&event.event_id) {
153            return Ok(());
154        }
155
156        // Generate sequence number
157        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
158
159        // Store in event history with seq
160        self.event_history.insert(
161            event.event_id,
162            StoredEvent {
163                seq,
164                event_id: event.event_id,
165                parent_id: event.parent_id,
166                correlation_id: event.correlation_id,
167                event_type: event.event_type.clone(),
168                payload: event.payload.clone(),
169                hops: event.hops,
170                batch_id: event.batch_id,
171                batch_index: event.batch_index,
172                batch_size: event.batch_size,
173                created_at: event.created_at,
174            },
175        );
176
177        // Publish insight event
178        self.publish_insight(InsightEvent {
179            seq,
180            stream_type: StreamType::EventDispatched,
181            correlation_id: event.correlation_id,
182            event_id: Some(event.event_id),
183            effect_event_id: None,
184            effect_id: None,
185            event_type: Some(event.event_type.clone()),
186            status: None,
187            error: None,
188            payload: Some(serde_json::json!({
189                "event_type": event.event_type.clone(),
190                "hops": event.hops,
191                "batch_id": event.batch_id,
192                "batch_index": event.batch_index,
193                "batch_size": event.batch_size,
194                "payload": event.payload.clone(),
195            })),
196            created_at: event.created_at,
197        });
198
199        // Add to queue
200        let mut queue = self
201            .events
202            .entry(event.correlation_id)
203            .or_insert_with(VecDeque::new);
204        queue.push_back(event);
205        Ok(())
206    }
207
208    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
209        // Simple round-robin across workflows
210        for mut entry in self.events.iter_mut() {
211            if let Some(event) = entry.value_mut().pop_front() {
212                return Ok(Some(event));
213            }
214        }
215        Ok(None)
216    }
217
218    async fn ack(&self, _id: i64) -> Result<()> {
219        // No-op for in-memory (event already removed in poll_next)
220        Ok(())
221    }
222
223    async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
224        // For simplicity, just drop failed events in memory store
225        Ok(())
226    }
227
228    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
229    where
230        S: for<'de> Deserialize<'de> + Send,
231    {
232        if let Some(entry) = self.states.get(&correlation_id) {
233            let (json, version) = entry.value();
234            let state: S = serde_json::from_value(json.clone())?;
235            Ok(Some((state, *version)))
236        } else {
237            Ok(None)
238        }
239    }
240
241    async fn save_state<S>(
242        &self,
243        correlation_id: Uuid,
244        state: &S,
245        expected_version: i32,
246    ) -> Result<i32>
247    where
248        S: Serialize + Send + Sync,
249    {
250        let json = serde_json::to_value(state)?;
251        let new_version = expected_version + 1;
252
253        // Simple optimistic locking check
254        if let Some(mut entry) = self.states.get_mut(&correlation_id) {
255            let (_, current_version) = entry.value();
256            if *current_version != expected_version {
257                return Err(anyhow!(
258                    "Version mismatch: expected {} but was {}",
259                    expected_version,
260                    current_version
261                ));
262            }
263            *entry.value_mut() = (json, new_version);
264        } else {
265            self.states.insert(correlation_id, (json, new_version));
266        }
267
268        Ok(new_version)
269    }
270
271    async fn insert_effect_intent(
272        &self,
273        event_id: Uuid,
274        effect_id: String,
275        correlation_id: Uuid,
276        event_type: String,
277        event_payload: serde_json::Value,
278        parent_event_id: Option<Uuid>,
279        batch_id: Option<Uuid>,
280        batch_index: Option<i32>,
281        batch_size: Option<i32>,
282        execute_at: DateTime<Utc>,
283        timeout_seconds: i32,
284        max_attempts: i32,
285        priority: i32,
286    ) -> Result<()> {
287        let execution = QueuedEffectExecution {
288            event_id,
289            effect_id: effect_id.clone(),
290            correlation_id,
291            event_type,
292            event_payload,
293            parent_event_id,
294            batch_id,
295            batch_index,
296            batch_size,
297            execute_at,
298            timeout_seconds,
299            max_attempts,
300            priority,
301            attempts: 0,
302        };
303
304        // Store in effect history
305        let now = Utc::now();
306        self.effect_history.insert(
307            (event_id, effect_id.clone()),
308            StoredEffect {
309                effect_id: effect_id.clone(),
310                event_id,
311                correlation_id,
312                event_type: execution.event_type.clone(),
313                event_payload: execution.event_payload.clone(),
314                batch_id: execution.batch_id,
315                batch_index: execution.batch_index,
316                batch_size: execution.batch_size,
317                status: "pending".to_string(),
318                result: None,
319                error: None,
320                attempts: 0,
321                created_at: now,
322                execute_at,
323                claimed_at: None,
324                last_attempted_at: None,
325                completed_at: None,
326            },
327        );
328
329        // Publish insight event
330        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
331        self.publish_insight(InsightEvent {
332            seq,
333            stream_type: StreamType::EffectStarted,
334            correlation_id,
335            event_id: None,
336            effect_event_id: Some(event_id),
337            effect_id: Some(effect_id),
338            event_type: None,
339            status: Some("pending".to_string()),
340            error: None,
341            payload: None,
342            created_at: now,
343        });
344
345        let mut queue = self.effects.lock();
346        queue.push_back(execution);
347        Ok(())
348    }
349
350    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
351        let mut queue = self.effects.lock();
352
353        // Find first effect that's ready to execute
354        let now = Utc::now();
355        if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
356            if let Some(next) = queue.remove(pos) {
357                if let Some(mut effect) = self
358                    .effect_history
359                    .get_mut(&(next.event_id, next.effect_id.clone()))
360                {
361                    effect.status = "executing".to_string();
362                    effect.claimed_at = Some(now);
363                    effect.last_attempted_at = Some(now);
364                    effect.attempts = next.attempts + 1;
365                }
366                Ok(Some(next))
367            } else {
368                Ok(None)
369            }
370        } else {
371            Ok(None)
372        }
373    }
374
375    async fn complete_effect(
376        &self,
377        event_id: Uuid,
378        effect_id: String,
379        result: serde_json::Value,
380    ) -> Result<()> {
381        self.completed_effects
382            .insert((event_id, effect_id.clone()), result.clone());
383
384        // Update effect history
385        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
386            entry.status = "completed".to_string();
387            entry.result = Some(result.clone());
388            let completed_at = Utc::now();
389            entry.completed_at = Some(completed_at);
390            entry.last_attempted_at = Some(completed_at);
391
392            // Publish insight event
393            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
394            self.publish_insight(InsightEvent {
395                seq,
396                stream_type: StreamType::EffectCompleted,
397                correlation_id: entry.correlation_id,
398                event_id: None,
399                effect_event_id: Some(event_id),
400                effect_id: Some(effect_id),
401                event_type: None,
402                status: Some("completed".to_string()),
403                error: None,
404                payload: Some(result),
405                created_at: completed_at,
406            });
407        }
408
409        Ok(())
410    }
411
412    async fn complete_effect_with_events(
413        &self,
414        event_id: Uuid,
415        effect_id: String,
416        result: serde_json::Value,
417        emitted_events: Vec<EmittedEvent>,
418    ) -> Result<()> {
419        // Mark effect complete
420        self.complete_effect(event_id, effect_id.clone(), result)
421            .await?;
422
423        let correlation_id = self
424            .effect_history
425            .get(&(event_id, effect_id.clone()))
426            .map(|entry| entry.correlation_id)
427            .or_else(|| {
428                self.event_history
429                    .get(&event_id)
430                    .map(|entry| entry.correlation_id)
431            })
432            .unwrap_or(event_id);
433        let parent_hops = self
434            .event_history
435            .get(&event_id)
436            .map(|entry| entry.hops)
437            .unwrap_or(0);
438
439        // Publish emitted events
440        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
441            let new_event_id = Uuid::new_v5(
442                &NAMESPACE_SEESAW,
443                format!(
444                    "{}-{}-{}-{}",
445                    event_id, effect_id, emitted.event_type, emitted_index
446                )
447                .as_bytes(),
448            );
449
450            let queued = QueuedEvent {
451                id: self.event_seq.fetch_add(1, Ordering::SeqCst),
452                event_id: new_event_id,
453                parent_id: Some(event_id),
454                correlation_id,
455                event_type: emitted.event_type,
456                payload: emitted.payload,
457                hops: parent_hops + 1,
458                batch_id: emitted.batch_id,
459                batch_index: emitted.batch_index,
460                batch_size: emitted.batch_size,
461                created_at: Utc::now(),
462            };
463
464            self.publish(queued).await?;
465        }
466
467        Ok(())
468    }
469
470    async fn fail_effect(
471        &self,
472        event_id: Uuid,
473        effect_id: String,
474        error: String,
475        _retry_after_secs: i32,
476    ) -> Result<()> {
477        // Update effect history
478        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
479            let failed_at = Utc::now();
480            entry.status = "failed".to_string();
481            entry.error = Some(error.clone());
482            entry.attempts += 1;
483            entry.last_attempted_at = Some(failed_at);
484
485            // Publish insight event
486            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
487            self.publish_insight(InsightEvent {
488                seq,
489                stream_type: StreamType::EffectFailed,
490                correlation_id: entry.correlation_id,
491                event_id: None,
492                effect_event_id: Some(event_id),
493                effect_id: Some(effect_id),
494                event_type: None,
495                status: Some("failed".to_string()),
496                error: Some(error.clone()),
497                payload: None,
498                created_at: failed_at,
499            });
500        }
501
502        eprintln!("Effect failed: {}", error);
503        Ok(())
504    }
505
506    async fn dlq_effect(
507        &self,
508        event_id: Uuid,
509        effect_id: String,
510        error: String,
511        error_type: String,
512        attempts: i32,
513    ) -> Result<()> {
514        self.dlq_effect_with_events(event_id, effect_id, error, error_type, attempts, Vec::new())
515            .await
516    }
517
518    async fn dlq_effect_with_events(
519        &self,
520        event_id: Uuid,
521        effect_id: String,
522        error: String,
523        error_type: String,
524        attempts: i32,
525        emitted_events: Vec<EmittedEvent>,
526    ) -> Result<()> {
527        let effect_snapshot =
528            self.effect_history
529                .get(&(event_id, effect_id.clone()))
530                .map(|entry| {
531                    (
532                        entry.correlation_id,
533                        entry.event_type.clone(),
534                        entry.event_payload.clone(),
535                        entry.batch_id,
536                        entry.batch_index,
537                        entry.batch_size,
538                    )
539                });
540        let event_snapshot = self.event_history.get(&event_id).map(|event| {
541            (
542                event.correlation_id,
543                event.event_type.clone(),
544                event.payload.clone(),
545                event.batch_id,
546                event.batch_index,
547                event.batch_size,
548                event.hops,
549            )
550        });
551
552        let (correlation_id, event_type, event_payload, batch_id, batch_index, batch_size) =
553            if let Some(snapshot) = effect_snapshot.clone() {
554                (
555                    snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
556                )
557            } else if let Some(snapshot) = event_snapshot.clone() {
558                (
559                    snapshot.0, snapshot.1, snapshot.2, snapshot.3, snapshot.4, snapshot.5,
560                )
561            } else {
562                (
563                    event_id,
564                    "unknown".to_string(),
565                    serde_json::Value::Null,
566                    None,
567                    None,
568                    None,
569                )
570            };
571
572        if let Some(mut effect) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
573            effect.status = "failed".to_string();
574            effect.error = Some(error.clone());
575            effect.attempts = attempts;
576            effect.last_attempted_at = Some(Utc::now());
577        }
578
579        self.dead_letter_history.insert(
580            (event_id, effect_id.clone()),
581            StoredDeadLetter {
582                event_id,
583                effect_id: effect_id.clone(),
584                correlation_id,
585                event_type: event_type.clone(),
586                event_payload: event_payload.clone(),
587                error: error.clone(),
588                reason: error_type,
589                attempts,
590                failed_at: Utc::now(),
591                resolved_at: None,
592            },
593        );
594
595        let parent_hops = event_snapshot.map(|snapshot| snapshot.6).unwrap_or(0);
596        if emitted_events.is_empty() {
597            if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
598                (batch_id, batch_index, batch_size)
599            {
600                let synthetic_event_id = Uuid::new_v5(
601                    &NAMESPACE_SEESAW,
602                    format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
603                );
604                self.publish(QueuedEvent {
605                    id: self.event_seq.fetch_add(1, Ordering::SeqCst),
606                    event_id: synthetic_event_id,
607                    parent_id: Some(event_id),
608                    correlation_id,
609                    event_type: event_type.clone(),
610                    payload: event_payload.clone(),
611                    hops: parent_hops + 1,
612                    batch_id: Some(batch_id),
613                    batch_index: Some(batch_index),
614                    batch_size: Some(batch_size),
615                    created_at: Utc::now(),
616                })
617                .await?;
618            }
619        } else {
620            for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
621                let synthetic_event_id = Uuid::new_v5(
622                    &NAMESPACE_SEESAW,
623                    format!(
624                        "{}-{}-dlq-terminal-{}-{}",
625                        event_id, effect_id, emitted.event_type, emitted_index
626                    )
627                    .as_bytes(),
628                );
629                self.publish(QueuedEvent {
630                    id: self.event_seq.fetch_add(1, Ordering::SeqCst),
631                    event_id: synthetic_event_id,
632                    parent_id: Some(event_id),
633                    correlation_id,
634                    event_type: emitted.event_type,
635                    payload: emitted.payload,
636                    hops: parent_hops + 1,
637                    batch_id: emitted.batch_id.or(batch_id),
638                    batch_index: emitted.batch_index.or(batch_index),
639                    batch_size: emitted.batch_size.or(batch_size),
640                    created_at: Utc::now(),
641                })
642                .await?;
643            }
644        }
645
646        eprintln!(
647            "Effect sent to DLQ: {}:{} - {} (attempts: {})",
648            event_id, effect_id, error, attempts
649        );
650        Ok(())
651    }
652
653    async fn join_same_batch_append_and_maybe_claim(
654        &self,
655        join_effect_id: String,
656        correlation_id: Uuid,
657        source_event_id: Uuid,
658        source_event_type: String,
659        source_payload: serde_json::Value,
660        source_created_at: DateTime<Utc>,
661        batch_id: Uuid,
662        batch_index: i32,
663        batch_size: i32,
664    ) -> Result<Option<Vec<JoinEntry>>> {
665        let key = (join_effect_id.clone(), correlation_id, batch_id);
666        let mut windows = self.join_windows.lock();
667        let window = windows.entry(key).or_insert_with(|| MemoryJoinWindow {
668            target_count: batch_size,
669            status: MemoryJoinStatus::Open,
670            source_event_ids: HashSet::new(),
671            entries_by_index: HashMap::new(),
672        });
673
674        if window.status == MemoryJoinStatus::Completed {
675            return Ok(None);
676        }
677
678        if window.target_count != batch_size {
679            window.target_count = batch_size;
680        }
681
682        let already_seen_source = !window.source_event_ids.insert(source_event_id);
683        if !already_seen_source {
684            window
685                .entries_by_index
686                .entry(batch_index)
687                .or_insert_with(|| JoinEntry {
688                    source_event_id,
689                    event_type: source_event_type,
690                    payload: source_payload,
691                    batch_id,
692                    batch_index,
693                    batch_size,
694                    created_at: source_created_at,
695                });
696        }
697
698        let ready = window.entries_by_index.len() as i32 >= window.target_count;
699        if ready && window.status == MemoryJoinStatus::Open {
700            window.status = MemoryJoinStatus::Processing;
701            let mut ordered = window
702                .entries_by_index
703                .values()
704                .cloned()
705                .collect::<Vec<_>>();
706            ordered.sort_by_key(|entry| entry.batch_index);
707            return Ok(Some(ordered));
708        }
709
710        Ok(None)
711    }
712
713    async fn join_same_batch_complete(
714        &self,
715        join_effect_id: String,
716        correlation_id: Uuid,
717        batch_id: Uuid,
718    ) -> Result<()> {
719        let key = (join_effect_id, correlation_id, batch_id);
720        if let Some(window) = self.join_windows.lock().get_mut(&key) {
721            window.status = MemoryJoinStatus::Completed;
722        }
723        self.join_windows.lock().remove(&key);
724        Ok(())
725    }
726
727    async fn join_same_batch_release(
728        &self,
729        join_effect_id: String,
730        correlation_id: Uuid,
731        batch_id: Uuid,
732        _error: String,
733    ) -> Result<()> {
734        let key = (join_effect_id, correlation_id, batch_id);
735        if let Some(window) = self.join_windows.lock().get_mut(&key) {
736            if window.status == MemoryJoinStatus::Processing {
737                window.status = MemoryJoinStatus::Open;
738            }
739        }
740        Ok(())
741    }
742
743    async fn subscribe_workflow_events(
744        &self,
745        _correlation_id: Uuid,
746    ) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
747        // In-memory store doesn't support subscriptions
748        Err(anyhow!("Subscriptions not supported in memory store"))
749    }
750
751    async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
752        // Check if any events or effects are pending
753        let has_events = self
754            .events
755            .get(&correlation_id)
756            .map(|q| !q.is_empty())
757            .unwrap_or(false);
758        let state = self
759            .states
760            .get(&correlation_id)
761            .map(|entry| entry.value().0.clone());
762        let pending_effects = 0i64; // Simplified
763
764        Ok(WorkflowStatus {
765            correlation_id,
766            state,
767            pending_effects,
768            is_settled: !has_events && pending_effects == 0,
769            last_event: None, // Could track this if needed
770        })
771    }
772}
773
774#[async_trait]
775impl InsightStore for MemoryStore {
776    async fn subscribe_events(
777        &self,
778    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
779        let mut rx = self.insight_tx.subscribe();
780        let stream = async_stream::stream! {
781            while let Ok(event) = rx.recv().await {
782                yield event;
783            }
784        };
785
786        Ok(Box::new(Box::pin(stream)))
787    }
788
789    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
790        // Find all events for this correlation
791        let mut events: Vec<_> = self
792            .event_history
793            .iter()
794            .filter(|e| e.value().correlation_id == correlation_id)
795            .map(|e| e.value().clone())
796            .collect();
797
798        events.sort_by_key(|e| e.created_at);
799
800        // Build tree. Treat events with missing parents as roots so partially
801        // captured workflows still render.
802        let event_ids: HashSet<Uuid> = events.iter().map(|e| e.event_id).collect();
803        let roots = self.build_event_nodes(&events, None, &event_ids, true);
804
805        // Get state
806        let state = self
807            .states
808            .get(&correlation_id)
809            .map(|entry| entry.value().0.clone());
810
811        Ok(WorkflowTree {
812            correlation_id,
813            roots,
814            state,
815            event_count: events.len(),
816            effect_count: self
817                .effect_history
818                .iter()
819                .filter(|e| e.value().correlation_id == correlation_id)
820                .count(),
821        })
822    }
823
824    async fn get_stats(&self) -> Result<InsightStats> {
825        let total_events = self.event_history.len() as i64;
826
827        let mut active_effects = 0i64;
828        let mut completed_effects = 0i64;
829        let mut failed_effects = 0i64;
830
831        for entry in self.effect_history.iter() {
832            match entry.value().status.as_str() {
833                "pending" | "executing" => active_effects += 1,
834                "completed" => completed_effects += 1,
835                "failed" => failed_effects += 1,
836                _ => {}
837            }
838        }
839
840        Ok(InsightStats {
841            total_events,
842            active_effects,
843            completed_effects,
844            failed_effects,
845        })
846    }
847
848    async fn get_recent_events(
849        &self,
850        cursor: Option<i64>,
851        limit: usize,
852    ) -> Result<Vec<InsightEvent>> {
853        // Get events from history with proper seq
854        let mut events: Vec<_> = self
855            .event_history
856            .iter()
857            .filter_map(|e| {
858                let stored = e.value();
859                // Filter by cursor if provided
860                if let Some(cursor_seq) = cursor {
861                    if stored.seq <= cursor_seq {
862                        return None;
863                    }
864                }
865                Some(InsightEvent {
866                    seq: stored.seq,
867                    stream_type: StreamType::EventDispatched,
868                    correlation_id: stored.correlation_id,
869                    event_id: Some(stored.event_id),
870                    effect_event_id: None,
871                    effect_id: None,
872                    event_type: Some(stored.event_type.clone()),
873                    status: None,
874                    error: None,
875                    payload: Some(serde_json::json!({
876                        "event_type": stored.event_type.clone(),
877                        "hops": stored.hops,
878                        "batch_id": stored.batch_id,
879                        "batch_index": stored.batch_index,
880                        "batch_size": stored.batch_size,
881                        "payload": stored.payload.clone(),
882                    })),
883                    created_at: stored.created_at,
884                })
885            })
886            .collect();
887
888        // Sort by seq (oldest first for consistent cursor pagination)
889        events.sort_by_key(|e| e.seq);
890        events.truncate(limit);
891
892        Ok(events)
893    }
894
895    async fn get_effect_logs(
896        &self,
897        correlation_id: Option<Uuid>,
898        limit: usize,
899    ) -> Result<Vec<EffectExecutionLog>> {
900        let mut logs: Vec<_> = self
901            .effect_history
902            .iter()
903            .filter_map(|entry| {
904                let effect = entry.value();
905                if let Some(filter) = correlation_id {
906                    if effect.correlation_id != filter {
907                        return None;
908                    }
909                }
910
911                let started_at = effect.claimed_at.or(effect.last_attempted_at);
912                let duration_ms = match (started_at, effect.completed_at) {
913                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
914                    _ => None,
915                };
916
917                Some(EffectExecutionLog {
918                    correlation_id: effect.correlation_id,
919                    event_id: effect.event_id,
920                    effect_id: effect.effect_id.clone(),
921                    status: effect.status.clone(),
922                    attempts: effect.attempts,
923                    event_type: Some(effect.event_type.clone()),
924                    result: effect.result.clone(),
925                    error: effect.error.clone(),
926                    created_at: effect.created_at,
927                    execute_at: Some(effect.execute_at),
928                    claimed_at: effect.claimed_at,
929                    last_attempted_at: effect.last_attempted_at,
930                    completed_at: effect.completed_at,
931                    duration_ms,
932                })
933            })
934            .collect();
935
936        logs.sort_by(|a, b| {
937            let a_time = a
938                .last_attempted_at
939                .or(a.completed_at)
940                .or(a.claimed_at)
941                .unwrap_or(a.created_at);
942            let b_time = b
943                .last_attempted_at
944                .or(b.completed_at)
945                .or(b.claimed_at)
946                .unwrap_or(b.created_at);
947            b_time.cmp(&a_time)
948        });
949        logs.truncate(limit);
950        Ok(logs)
951    }
952
953    async fn get_dead_letters(
954        &self,
955        unresolved_only: bool,
956        limit: usize,
957    ) -> Result<Vec<DeadLetterEntry>> {
958        let mut rows: Vec<_> = self
959            .dead_letter_history
960            .iter()
961            .filter_map(|entry| {
962                let dead = entry.value();
963                if unresolved_only && dead.resolved_at.is_some() {
964                    return None;
965                }
966
967                Some(DeadLetterEntry {
968                    correlation_id: dead.correlation_id,
969                    event_id: dead.event_id,
970                    effect_id: dead.effect_id.clone(),
971                    event_type: dead.event_type.clone(),
972                    event_payload: dead.event_payload.clone(),
973                    error: dead.error.clone(),
974                    reason: dead.reason.clone(),
975                    attempts: dead.attempts,
976                    failed_at: dead.failed_at,
977                    resolved_at: dead.resolved_at,
978                })
979            })
980            .collect();
981
982        rows.sort_by(|a, b| b.failed_at.cmp(&a.failed_at));
983        rows.truncate(limit);
984        Ok(rows)
985    }
986
987    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
988        let mut workflows: HashMap<Uuid, FailedWorkflow> = HashMap::new();
989
990        for entry in self.effect_history.iter() {
991            let effect = entry.value();
992            let workflow = workflows
993                .entry(effect.correlation_id)
994                .or_insert(FailedWorkflow {
995                    correlation_id: effect.correlation_id,
996                    failed_effects: 0,
997                    active_effects: 0,
998                    dead_letters: 0,
999                    last_failed_at: None,
1000                    last_error: None,
1001                });
1002
1003            match effect.status.as_str() {
1004                "failed" => {
1005                    workflow.failed_effects += 1;
1006                    let at = effect.last_attempted_at.unwrap_or(effect.created_at);
1007                    if workflow
1008                        .last_failed_at
1009                        .map(|current| at > current)
1010                        .unwrap_or(true)
1011                    {
1012                        workflow.last_failed_at = Some(at);
1013                        workflow.last_error = effect.error.clone();
1014                    }
1015                }
1016                "pending" | "executing" => {
1017                    workflow.active_effects += 1;
1018                }
1019                _ => {}
1020            }
1021        }
1022
1023        for entry in self.dead_letter_history.iter() {
1024            let dead = entry.value();
1025            if dead.resolved_at.is_some() {
1026                continue;
1027            }
1028
1029            let workflow = workflows
1030                .entry(dead.correlation_id)
1031                .or_insert(FailedWorkflow {
1032                    correlation_id: dead.correlation_id,
1033                    failed_effects: 0,
1034                    active_effects: 0,
1035                    dead_letters: 0,
1036                    last_failed_at: None,
1037                    last_error: None,
1038                });
1039
1040            workflow.dead_letters += 1;
1041            if workflow
1042                .last_failed_at
1043                .map(|current| dead.failed_at > current)
1044                .unwrap_or(true)
1045            {
1046                workflow.last_failed_at = Some(dead.failed_at);
1047                workflow.last_error = Some(dead.error.clone());
1048            }
1049        }
1050
1051        let mut rows: Vec<_> = workflows
1052            .into_values()
1053            .filter(|workflow| workflow.failed_effects > 0 || workflow.dead_letters > 0)
1054            .collect();
1055
1056        rows.sort_by(|a, b| b.last_failed_at.cmp(&a.last_failed_at));
1057        rows.truncate(limit);
1058        Ok(rows)
1059    }
1060}
1061
1062impl MemoryStore {
1063    /// Build event nodes recursively
1064    fn build_event_nodes(
1065        &self,
1066        events: &[StoredEvent],
1067        parent_id: Option<Uuid>,
1068        event_ids: &HashSet<Uuid>,
1069        is_root_pass: bool,
1070    ) -> Vec<EventNode> {
1071        events
1072            .iter()
1073            .filter(|event| {
1074                if is_root_pass {
1075                    event.parent_id.is_none()
1076                        || event
1077                            .parent_id
1078                            .map(|parent| !event_ids.contains(&parent))
1079                            .unwrap_or(false)
1080                } else {
1081                    event.parent_id == parent_id
1082                }
1083            })
1084            .map(|event| {
1085                // Get effects for this event
1086                let effects = self
1087                    .effect_history
1088                    .iter()
1089                    .filter(|e| e.value().event_id == event.event_id)
1090                    .map(|e| {
1091                        let effect = e.value();
1092                        EffectNode {
1093                            effect_id: effect.effect_id.clone(),
1094                            event_id: effect.event_id,
1095                            status: effect.status.clone(),
1096                            result: effect.result.clone(),
1097                            error: effect.error.clone(),
1098                            attempts: effect.attempts,
1099                            created_at: effect.created_at,
1100                            batch_id: effect.batch_id,
1101                            batch_index: effect.batch_index,
1102                            batch_size: effect.batch_size,
1103                        }
1104                    })
1105                    .collect();
1106
1107                // Recursively build children
1108                let children =
1109                    self.build_event_nodes(events, Some(event.event_id), event_ids, false);
1110
1111                EventNode {
1112                    event_id: event.event_id,
1113                    event_type: event.event_type.clone(),
1114                    payload: event.payload.clone(),
1115                    created_at: event.created_at,
1116                    batch_id: event.batch_id,
1117                    batch_index: event.batch_index,
1118                    batch_size: event.batch_size,
1119                    children,
1120                    effects,
1121                }
1122            })
1123            .collect()
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::*;
1130    use seesaw_core::store::Store;
1131
1132    #[tokio::test]
1133    async fn test_insight_events_have_unique_seq() {
1134        let store = MemoryStore::new();
1135
1136        // Publish 3 events
1137        for i in 1..=3 {
1138            let event = QueuedEvent {
1139                id: i as i64,
1140                event_id: Uuid::new_v4(),
1141                parent_id: None,
1142                correlation_id: Uuid::new_v4(),
1143                event_type: format!("Event{}", i),
1144                payload: serde_json::json!({"n": i}),
1145                created_at: Utc::now(),
1146                hops: 0,
1147                batch_id: None,
1148                batch_index: None,
1149                batch_size: None,
1150            };
1151            store.publish(event).await.unwrap();
1152        }
1153
1154        // Get recent events - should have seq 1, 2, 3
1155        let events = store.get_recent_events(None, 10).await.unwrap();
1156        assert_eq!(events.len(), 3);
1157        assert_eq!(events[0].seq, 1);
1158        assert_eq!(events[1].seq, 2);
1159        assert_eq!(events[2].seq, 3);
1160    }
1161
1162    #[tokio::test]
1163    async fn test_cursor_based_filtering() {
1164        let store = MemoryStore::new();
1165        let correlation_id = Uuid::new_v4();
1166
1167        // Publish 5 events
1168        for i in 1..=5 {
1169            let event = QueuedEvent {
1170                id: i as i64,
1171                event_id: Uuid::new_v4(),
1172                parent_id: None,
1173                correlation_id,
1174                event_type: format!("Event{}", i),
1175                payload: serde_json::json!({"n": i}),
1176                created_at: Utc::now(),
1177                hops: 0,
1178                batch_id: None,
1179                batch_index: None,
1180                batch_size: None,
1181            };
1182            store.publish(event).await.unwrap();
1183        }
1184
1185        // Get first 2 events (no cursor)
1186        let events = store.get_recent_events(None, 2).await.unwrap();
1187        assert_eq!(events.len(), 2);
1188        assert_eq!(events[0].seq, 1);
1189        assert_eq!(events[1].seq, 2);
1190
1191        // Get next events after cursor=2
1192        let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
1193        assert_eq!(next_events.len(), 2);
1194        assert_eq!(next_events[0].seq, 3);
1195        assert_eq!(next_events[1].seq, 4);
1196
1197        // Verify no duplicates between historical and cursor fetch
1198        let all_seqs: Vec<i64> = events
1199            .iter()
1200            .chain(next_events.iter())
1201            .map(|e| e.seq)
1202            .collect();
1203        assert_eq!(all_seqs, vec![1, 2, 3, 4]);
1204    }
1205
1206    #[tokio::test]
1207    async fn test_no_events_before_cursor() {
1208        let store = MemoryStore::new();
1209
1210        // Publish 3 events
1211        for i in 1..=3 {
1212            let event = QueuedEvent {
1213                id: i as i64,
1214                event_id: Uuid::new_v4(),
1215                parent_id: None,
1216                correlation_id: Uuid::new_v4(),
1217                event_type: format!("Event{}", i),
1218                payload: serde_json::json!({"n": i}),
1219                created_at: Utc::now(),
1220                hops: 0,
1221                batch_id: None,
1222                batch_index: None,
1223                batch_size: None,
1224            };
1225            store.publish(event).await.unwrap();
1226        }
1227
1228        // Request events after cursor=10 (beyond all events)
1229        let events = store.get_recent_events(Some(10), 10).await.unwrap();
1230        assert_eq!(
1231            events.len(),
1232            0,
1233            "Should return no events when cursor is beyond all seq values"
1234        );
1235    }
1236
1237    #[tokio::test]
1238    async fn test_workflow_tree_treats_orphan_parent_as_root() {
1239        let store = MemoryStore::new();
1240        let correlation_id = Uuid::new_v4();
1241        let event_id = Uuid::new_v4();
1242        let missing_parent = Uuid::new_v4();
1243
1244        store
1245            .publish(QueuedEvent {
1246                id: 1,
1247                event_id,
1248                parent_id: Some(missing_parent),
1249                correlation_id,
1250                event_type: "OrphanEvent".to_string(),
1251                payload: serde_json::json!({"ok": true}),
1252                created_at: Utc::now(),
1253                hops: 1,
1254                batch_id: None,
1255                batch_index: None,
1256                batch_size: None,
1257            })
1258            .await
1259            .unwrap();
1260
1261        let tree = store.get_workflow_tree(correlation_id).await.unwrap();
1262        assert_eq!(tree.event_count, 1);
1263        assert_eq!(tree.roots.len(), 1);
1264        assert_eq!(tree.roots[0].event_id, event_id);
1265    }
1266
1267    #[tokio::test]
1268    async fn test_dlq_with_batch_metadata_publishes_synthetic_terminal_event() {
1269        let store = MemoryStore::new();
1270        let event_id = Uuid::new_v4();
1271        let correlation_id = Uuid::new_v4();
1272        let batch_id = Uuid::new_v4();
1273
1274        store
1275            .insert_effect_intent(
1276                event_id,
1277                "join_effect".to_string(),
1278                correlation_id,
1279                "BatchItemResult".to_string(),
1280                serde_json::json!({ "index": 2, "ok": false }),
1281                None,
1282                Some(batch_id),
1283                Some(2),
1284                Some(5),
1285                Utc::now(),
1286                30,
1287                1,
1288                10,
1289            )
1290            .await
1291            .expect("insert should succeed");
1292
1293        store
1294            .dlq_effect(
1295                event_id,
1296                "join_effect".to_string(),
1297                "forced failure".to_string(),
1298                "failed".to_string(),
1299                1,
1300            )
1301            .await
1302            .expect("dlq should succeed");
1303
1304        let synthetic = store.poll_next().await.expect("poll should succeed");
1305        assert!(
1306            synthetic.is_some(),
1307            "synthetic terminal event should be published"
1308        );
1309        let synthetic = synthetic.unwrap();
1310        assert_eq!(synthetic.correlation_id, correlation_id);
1311        assert_eq!(synthetic.event_type, "BatchItemResult");
1312        assert_eq!(synthetic.batch_id, Some(batch_id));
1313        assert_eq!(synthetic.batch_index, Some(2));
1314        assert_eq!(synthetic.batch_size, Some(5));
1315    }
1316
1317    #[tokio::test]
1318    async fn test_dlq_without_batch_metadata_does_not_publish_synthetic_terminal_event() {
1319        let store = MemoryStore::new();
1320        let event_id = Uuid::new_v4();
1321        let correlation_id = Uuid::new_v4();
1322
1323        store
1324            .insert_effect_intent(
1325                event_id,
1326                "normal_effect".to_string(),
1327                correlation_id,
1328                "NormalEvent".to_string(),
1329                serde_json::json!({ "ok": true }),
1330                None,
1331                None,
1332                None,
1333                None,
1334                Utc::now(),
1335                30,
1336                1,
1337                10,
1338            )
1339            .await
1340            .expect("insert should succeed");
1341
1342        store
1343            .dlq_effect(
1344                event_id,
1345                "normal_effect".to_string(),
1346                "forced failure".to_string(),
1347                "failed".to_string(),
1348                1,
1349            )
1350            .await
1351            .expect("dlq should succeed");
1352
1353        let next = store.poll_next().await.expect("poll should succeed");
1354        assert!(next.is_none(), "no synthetic terminal event expected");
1355    }
1356}