Skip to main content

seesaw_memory/
lib.rs

1//! In-memory Store implementation for Seesaw
2//!
3//! This is a simple in-memory store suitable for development, testing, and demos.
4//! Not suitable for production use as all data is lost on restart.
5
6use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::VecDeque;
15use tokio::sync::broadcast;
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicI64, Ordering};
19use uuid::Uuid;
20
21/// In-memory store for workflows
22#[derive(Clone)]
23pub struct MemoryStore {
24    /// Event queue (FIFO per correlation_id)
25    events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26    /// Global event sequence for IDs
27    event_seq: Arc<AtomicI64>,
28    /// Workflow states
29    states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30    /// Effect executions queue
31    effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32    /// Completed effects (for idempotency)
33    completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35    // Insight/observability fields
36    /// Broadcast channel for live events
37    insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38    /// Insight event sequence
39    insight_seq: Arc<AtomicI64>,
40    /// Event history (for tree reconstruction)
41    event_history: Arc<DashMap<Uuid, StoredEvent>>,
42    /// Effect history
43    effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44}
45
46/// Stored event for history/tree reconstruction
47#[derive(Debug, Clone)]
48struct StoredEvent {
49    seq: i64,
50    event_id: Uuid,
51    parent_id: Option<Uuid>,
52    correlation_id: Uuid,
53    event_type: String,
54    payload: serde_json::Value,
55    created_at: DateTime<Utc>,
56}
57
58/// Stored effect for history/tree reconstruction
59#[derive(Debug, Clone)]
60struct StoredEffect {
61    effect_id: String,
62    event_id: Uuid,
63    correlation_id: Uuid,
64    status: String,
65    result: Option<serde_json::Value>,
66    error: Option<String>,
67    attempts: i32,
68    created_at: DateTime<Utc>,
69}
70
71impl MemoryStore {
72    pub fn new() -> Self {
73        let (insight_tx, _) = broadcast::channel(1000);
74        Self {
75            events: Arc::new(DashMap::new()),
76            event_seq: Arc::new(AtomicI64::new(1)),
77            states: Arc::new(DashMap::new()),
78            effects: Arc::new(Mutex::new(VecDeque::new())),
79            completed_effects: Arc::new(DashMap::new()),
80            insight_tx: Arc::new(insight_tx),
81            insight_seq: Arc::new(AtomicI64::new(1)),
82            event_history: Arc::new(DashMap::new()),
83            effect_history: Arc::new(DashMap::new()),
84        }
85    }
86
87    /// Publish insight event to broadcast channel
88    fn publish_insight(&self, event: InsightEvent) {
89        // Ignore send errors (no subscribers is fine)
90        let _ = self.insight_tx.send(event);
91    }
92}
93
94impl Default for MemoryStore {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100#[async_trait]
101impl Store for MemoryStore {
102    async fn publish(&self, event: QueuedEvent) -> Result<()> {
103        // Generate sequence number
104        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
105
106        // Store in event history with seq
107        self.event_history.insert(
108            event.event_id,
109            StoredEvent {
110                seq,
111                event_id: event.event_id,
112                parent_id: event.parent_id,
113                correlation_id: event.correlation_id,
114                event_type: event.event_type.clone(),
115                payload: event.payload.clone(),
116                created_at: event.created_at,
117            },
118        );
119
120        // Publish insight event
121        self.publish_insight(InsightEvent {
122            seq,
123            stream_type: StreamType::EventDispatched,
124            correlation_id: event.correlation_id,
125            event_id: Some(event.event_id),
126            effect_event_id: None,
127            effect_id: None,
128            event_type: Some(event.event_type.clone()),
129            status: None,
130            error: None,
131            payload: Some(event.payload.clone()),
132            created_at: event.created_at,
133        });
134
135        // Add to queue
136        let mut queue = self.events.entry(event.correlation_id).or_insert_with(VecDeque::new);
137        queue.push_back(event);
138        Ok(())
139    }
140
141    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
142        // Simple round-robin across workflows
143        for mut entry in self.events.iter_mut() {
144            if let Some(event) = entry.value_mut().pop_front() {
145                return Ok(Some(event));
146            }
147        }
148        Ok(None)
149    }
150
151    async fn ack(&self, _id: i64) -> Result<()> {
152        // No-op for in-memory (event already removed in poll_next)
153        Ok(())
154    }
155
156    async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
157        // For simplicity, just drop failed events in memory store
158        Ok(())
159    }
160
161    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
162    where
163        S: for<'de> Deserialize<'de> + Send,
164    {
165        if let Some(entry) = self.states.get(&correlation_id) {
166            let (json, version) = entry.value();
167            let state: S = serde_json::from_value(json.clone())?;
168            Ok(Some((state, *version)))
169        } else {
170            Ok(None)
171        }
172    }
173
174    async fn save_state<S>(
175        &self,
176        correlation_id: Uuid,
177        state: &S,
178        expected_version: i32,
179    ) -> Result<i32>
180    where
181        S: Serialize + Send + Sync,
182    {
183        let json = serde_json::to_value(state)?;
184        let new_version = expected_version + 1;
185
186        // Simple optimistic locking check
187        if let Some(mut entry) = self.states.get_mut(&correlation_id) {
188            let (_, current_version) = entry.value();
189            if *current_version != expected_version {
190                return Err(anyhow!("Version mismatch: expected {} but was {}", expected_version, current_version));
191            }
192            *entry.value_mut() = (json, new_version);
193        } else {
194            self.states.insert(correlation_id, (json, new_version));
195        }
196
197        Ok(new_version)
198    }
199
200    async fn insert_effect_intent(
201        &self,
202        event_id: Uuid,
203        effect_id: String,
204        correlation_id: Uuid,
205        event_type: String,
206        event_payload: serde_json::Value,
207        parent_event_id: Option<Uuid>,
208        execute_at: DateTime<Utc>,
209        timeout_seconds: i32,
210        max_attempts: i32,
211        priority: i32,
212    ) -> Result<()> {
213        let execution = QueuedEffectExecution {
214            event_id,
215            effect_id: effect_id.clone(),
216            correlation_id,
217            event_type,
218            event_payload,
219            parent_event_id,
220            execute_at,
221            timeout_seconds,
222            max_attempts,
223            priority,
224            attempts: 0,
225        };
226
227        // Store in effect history
228        let now = Utc::now();
229        self.effect_history.insert(
230            (event_id, effect_id.clone()),
231            StoredEffect {
232                effect_id: effect_id.clone(),
233                event_id,
234                correlation_id,
235                status: "pending".to_string(),
236                result: None,
237                error: None,
238                attempts: 0,
239                created_at: now,
240            },
241        );
242
243        // Publish insight event
244        let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
245        self.publish_insight(InsightEvent {
246            seq,
247            stream_type: StreamType::EffectStarted,
248            correlation_id,
249            event_id: None,
250            effect_event_id: Some(event_id),
251            effect_id: Some(effect_id),
252            event_type: None,
253            status: Some("pending".to_string()),
254            error: None,
255            payload: None,
256            created_at: now,
257        });
258
259        let mut queue = self.effects.lock();
260        queue.push_back(execution);
261        Ok(())
262    }
263
264    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
265        let mut queue = self.effects.lock();
266        
267        // Find first effect that's ready to execute
268        let now = Utc::now();
269        if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
270            Ok(queue.remove(pos))
271        } else {
272            Ok(None)
273        }
274    }
275
276    async fn complete_effect(
277        &self,
278        event_id: Uuid,
279        effect_id: String,
280        result: serde_json::Value,
281    ) -> Result<()> {
282        self.completed_effects.insert((event_id, effect_id.clone()), result.clone());
283
284        // Update effect history
285        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
286            entry.status = "completed".to_string();
287            entry.result = Some(result.clone());
288
289            // Publish insight event
290            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
291            self.publish_insight(InsightEvent {
292                seq,
293                stream_type: StreamType::EffectCompleted,
294                correlation_id: entry.correlation_id,
295                event_id: None,
296                effect_event_id: Some(event_id),
297                effect_id: Some(effect_id),
298                event_type: None,
299                status: Some("completed".to_string()),
300                error: None,
301                payload: Some(result),
302                created_at: Utc::now(),
303            });
304        }
305
306        Ok(())
307    }
308
309    async fn complete_effect_with_events(
310        &self,
311        event_id: Uuid,
312        effect_id: String,
313        result: serde_json::Value,
314        emitted_events: Vec<EmittedEvent>,
315    ) -> Result<()> {
316        // Mark effect complete
317        self.complete_effect(event_id, effect_id.clone(), result).await?;
318
319        // Publish emitted events
320        for emitted in emitted_events {
321            let new_event_id = Uuid::new_v5(&NAMESPACE_SEESAW, format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes());
322
323            let queued = QueuedEvent {
324                id: self.event_seq.fetch_add(1, Ordering::SeqCst),
325                event_id: new_event_id,
326                parent_id: Some(event_id),
327                correlation_id: event_id, // Simplified: use event_id as correlation for now
328                event_type: emitted.event_type,
329                payload: emitted.payload,
330                hops: 0,
331                created_at: Utc::now(),
332            };
333
334            self.publish(queued).await?;
335        }
336
337        Ok(())
338    }
339
340    async fn fail_effect(
341        &self,
342        event_id: Uuid,
343        effect_id: String,
344        error: String,
345        _retry_after_secs: i32,
346    ) -> Result<()> {
347        // Update effect history
348        if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
349            entry.status = "failed".to_string();
350            entry.error = Some(error.clone());
351            entry.attempts += 1;
352
353            // Publish insight event
354            let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
355            self.publish_insight(InsightEvent {
356                seq,
357                stream_type: StreamType::EffectFailed,
358                correlation_id: entry.correlation_id,
359                event_id: None,
360                effect_event_id: Some(event_id),
361                effect_id: Some(effect_id),
362                event_type: None,
363                status: Some("failed".to_string()),
364                error: Some(error.clone()),
365                payload: None,
366                created_at: Utc::now(),
367            });
368        }
369
370        eprintln!("Effect failed: {}", error);
371        Ok(())
372    }
373
374    async fn dlq_effect(
375        &self,
376        event_id: Uuid,
377        effect_id: String,
378        error: String,
379        _error_type: String,
380        attempts: i32,
381    ) -> Result<()> {
382        eprintln!("Effect sent to DLQ: {}:{} - {} (attempts: {})", event_id, effect_id, error, attempts);
383        Ok(())
384    }
385
386    async fn subscribe_workflow_events(&self, _correlation_id: Uuid) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
387        // In-memory store doesn't support subscriptions
388        Err(anyhow!("Subscriptions not supported in memory store"))
389    }
390
391    async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
392        // Check if any events or effects are pending
393        let has_events = self.events.get(&correlation_id).map(|q| !q.is_empty()).unwrap_or(false);
394        let state = self.states.get(&correlation_id).map(|entry| entry.value().0.clone());
395        let pending_effects = 0i64; // Simplified
396
397        Ok(WorkflowStatus {
398            correlation_id,
399            state,
400            pending_effects,
401            is_settled: !has_events && pending_effects == 0,
402            last_event: None, // Could track this if needed
403        })
404    }
405}
406
407#[async_trait]
408impl InsightStore for MemoryStore {
409    async fn subscribe_events(
410        &self,
411    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
412        let mut rx = self.insight_tx.subscribe();
413        let stream = async_stream::stream! {
414            while let Ok(event) = rx.recv().await {
415                yield event;
416            }
417        };
418
419        Ok(Box::new(Box::pin(stream)))
420    }
421
422    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
423        // Find all events for this correlation
424        let mut events: Vec<_> = self
425            .event_history
426            .iter()
427            .filter(|e| e.value().correlation_id == correlation_id)
428            .map(|e| e.value().clone())
429            .collect();
430
431        events.sort_by_key(|e| e.created_at);
432
433        // Build tree (find root events and recursively build children)
434        let roots = self.build_event_nodes(&events, None);
435
436        // Get state
437        let state = self.states.get(&correlation_id).map(|entry| entry.value().0.clone());
438
439        Ok(WorkflowTree {
440            correlation_id,
441            roots,
442            state,
443            event_count: events.len(),
444            effect_count: self
445                .effect_history
446                .iter()
447                .filter(|e| e.value().correlation_id == correlation_id)
448                .count(),
449        })
450    }
451
452    async fn get_stats(&self) -> Result<InsightStats> {
453        let total_events = self.event_history.len() as i64;
454
455        let mut active_effects = 0i64;
456        let mut completed_effects = 0i64;
457        let mut failed_effects = 0i64;
458
459        for entry in self.effect_history.iter() {
460            match entry.value().status.as_str() {
461                "pending" | "executing" => active_effects += 1,
462                "completed" => completed_effects += 1,
463                "failed" => failed_effects += 1,
464                _ => {}
465            }
466        }
467
468        Ok(InsightStats {
469            total_events,
470            active_effects,
471            completed_effects,
472            failed_effects,
473        })
474    }
475
476    async fn get_recent_events(
477        &self,
478        cursor: Option<i64>,
479        limit: usize,
480    ) -> Result<Vec<InsightEvent>> {
481        // Get events from history with proper seq
482        let mut events: Vec<_> = self
483            .event_history
484            .iter()
485            .filter_map(|e| {
486                let stored = e.value();
487                // Filter by cursor if provided
488                if let Some(cursor_seq) = cursor {
489                    if stored.seq <= cursor_seq {
490                        return None;
491                    }
492                }
493                Some(InsightEvent {
494                    seq: stored.seq,
495                    stream_type: StreamType::EventDispatched,
496                    correlation_id: stored.correlation_id,
497                    event_id: Some(stored.event_id),
498                    effect_event_id: None,
499                    effect_id: None,
500                    event_type: Some(stored.event_type.clone()),
501                    status: None,
502                    error: None,
503                    payload: Some(stored.payload.clone()),
504                    created_at: stored.created_at,
505                })
506            })
507            .collect();
508
509        // Sort by seq (oldest first for consistent cursor pagination)
510        events.sort_by_key(|e| e.seq);
511        events.truncate(limit);
512
513        Ok(events)
514    }
515}
516
517impl MemoryStore {
518    /// Build event nodes recursively
519    fn build_event_nodes(&self, events: &[StoredEvent], parent_id: Option<Uuid>) -> Vec<EventNode> {
520        events
521            .iter()
522            .filter(|e| e.parent_id == parent_id)
523            .map(|event| {
524                // Get effects for this event
525                let effects = self
526                    .effect_history
527                    .iter()
528                    .filter(|e| e.value().event_id == event.event_id)
529                    .map(|e| {
530                        let effect = e.value();
531                        EffectNode {
532                            effect_id: effect.effect_id.clone(),
533                            event_id: effect.event_id,
534                            status: effect.status.clone(),
535                            result: effect.result.clone(),
536                            error: effect.error.clone(),
537                            attempts: effect.attempts,
538                            created_at: effect.created_at,
539                        }
540                    })
541                    .collect();
542
543                // Recursively build children
544                let children = self.build_event_nodes(events, Some(event.event_id));
545
546                EventNode {
547                    event_id: event.event_id,
548                    event_type: event.event_type.clone(),
549                    payload: event.payload.clone(),
550                    created_at: event.created_at,
551                    children,
552                    effects,
553                }
554            })
555            .collect()
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use seesaw_core::store::Store;
563
564    #[tokio::test]
565    async fn test_insight_events_have_unique_seq() {
566        let store = MemoryStore::new();
567
568        // Publish 3 events
569        for i in 1..=3 {
570            let event = QueuedEvent {
571                id: i as i64,
572                event_id: Uuid::new_v4(),
573                parent_id: None,
574                correlation_id: Uuid::new_v4(),
575                event_type: format!("Event{}", i),
576                payload: serde_json::json!({"n": i}),
577                created_at: Utc::now(),
578                hops: 0,
579            };
580            store.publish(event).await.unwrap();
581        }
582
583        // Get recent events - should have seq 1, 2, 3
584        let events = store.get_recent_events(None, 10).await.unwrap();
585        assert_eq!(events.len(), 3);
586        assert_eq!(events[0].seq, 1);
587        assert_eq!(events[1].seq, 2);
588        assert_eq!(events[2].seq, 3);
589    }
590
591    #[tokio::test]
592    async fn test_cursor_based_filtering() {
593        let store = MemoryStore::new();
594        let correlation_id = Uuid::new_v4();
595
596        // Publish 5 events
597        for i in 1..=5 {
598            let event = QueuedEvent {
599                id: i as i64,
600                event_id: Uuid::new_v4(),
601                parent_id: None,
602                correlation_id,
603                event_type: format!("Event{}", i),
604                payload: serde_json::json!({"n": i}),
605                created_at: Utc::now(),
606                hops: 0,
607            };
608            store.publish(event).await.unwrap();
609        }
610
611        // Get first 2 events (no cursor)
612        let events = store.get_recent_events(None, 2).await.unwrap();
613        assert_eq!(events.len(), 2);
614        assert_eq!(events[0].seq, 1);
615        assert_eq!(events[1].seq, 2);
616
617        // Get next events after cursor=2
618        let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
619        assert_eq!(next_events.len(), 2);
620        assert_eq!(next_events[0].seq, 3);
621        assert_eq!(next_events[1].seq, 4);
622
623        // Verify no duplicates between historical and cursor fetch
624        let all_seqs: Vec<i64> = events.iter()
625            .chain(next_events.iter())
626            .map(|e| e.seq)
627            .collect();
628        assert_eq!(all_seqs, vec![1, 2, 3, 4]);
629    }
630
631    #[tokio::test]
632    async fn test_no_events_before_cursor() {
633        let store = MemoryStore::new();
634
635        // Publish 3 events
636        for i in 1..=3 {
637            let event = QueuedEvent {
638                id: i as i64,
639                event_id: Uuid::new_v4(),
640                parent_id: None,
641                correlation_id: Uuid::new_v4(),
642                event_type: format!("Event{}", i),
643                payload: serde_json::json!({"n": i}),
644                created_at: Utc::now(),
645                hops: 0,
646            };
647            store.publish(event).await.unwrap();
648        }
649
650        // Request events after cursor=10 (beyond all events)
651        let events = store.get_recent_events(Some(10), 10).await.unwrap();
652        assert_eq!(events.len(), 0, "Should return no events when cursor is beyond all seq values");
653    }
654}