1use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use parking_lot::Mutex;
11use seesaw_core::insight::*;
12use seesaw_core::store::*;
13use serde::{Deserialize, Serialize};
14use std::collections::VecDeque;
15use tokio::sync::broadcast;
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicI64, Ordering};
19use uuid::Uuid;
20
21#[derive(Clone)]
23pub struct MemoryStore {
24 events: Arc<DashMap<Uuid, VecDeque<QueuedEvent>>>,
26 event_seq: Arc<AtomicI64>,
28 states: Arc<DashMap<Uuid, (serde_json::Value, i32)>>,
30 effects: Arc<Mutex<VecDeque<QueuedEffectExecution>>>,
32 completed_effects: Arc<DashMap<(Uuid, String), serde_json::Value>>,
34
35 insight_tx: Arc<broadcast::Sender<InsightEvent>>,
38 insight_seq: Arc<AtomicI64>,
40 event_history: Arc<DashMap<Uuid, StoredEvent>>,
42 effect_history: Arc<DashMap<(Uuid, String), StoredEffect>>,
44}
45
46#[derive(Debug, Clone)]
48struct StoredEvent {
49 seq: i64,
50 event_id: Uuid,
51 parent_id: Option<Uuid>,
52 correlation_id: Uuid,
53 event_type: String,
54 payload: serde_json::Value,
55 created_at: DateTime<Utc>,
56}
57
58#[derive(Debug, Clone)]
60struct StoredEffect {
61 effect_id: String,
62 event_id: Uuid,
63 correlation_id: Uuid,
64 status: String,
65 result: Option<serde_json::Value>,
66 error: Option<String>,
67 attempts: i32,
68 created_at: DateTime<Utc>,
69}
70
71impl MemoryStore {
72 pub fn new() -> Self {
73 let (insight_tx, _) = broadcast::channel(1000);
74 Self {
75 events: Arc::new(DashMap::new()),
76 event_seq: Arc::new(AtomicI64::new(1)),
77 states: Arc::new(DashMap::new()),
78 effects: Arc::new(Mutex::new(VecDeque::new())),
79 completed_effects: Arc::new(DashMap::new()),
80 insight_tx: Arc::new(insight_tx),
81 insight_seq: Arc::new(AtomicI64::new(1)),
82 event_history: Arc::new(DashMap::new()),
83 effect_history: Arc::new(DashMap::new()),
84 }
85 }
86
87 fn publish_insight(&self, event: InsightEvent) {
89 let _ = self.insight_tx.send(event);
91 }
92}
93
94impl Default for MemoryStore {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100#[async_trait]
101impl Store for MemoryStore {
102 async fn publish(&self, event: QueuedEvent) -> Result<()> {
103 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
105
106 self.event_history.insert(
108 event.event_id,
109 StoredEvent {
110 seq,
111 event_id: event.event_id,
112 parent_id: event.parent_id,
113 correlation_id: event.correlation_id,
114 event_type: event.event_type.clone(),
115 payload: event.payload.clone(),
116 created_at: event.created_at,
117 },
118 );
119
120 self.publish_insight(InsightEvent {
122 seq,
123 stream_type: StreamType::EventDispatched,
124 correlation_id: event.correlation_id,
125 event_id: Some(event.event_id),
126 effect_event_id: None,
127 effect_id: None,
128 event_type: Some(event.event_type.clone()),
129 status: None,
130 error: None,
131 payload: Some(event.payload.clone()),
132 created_at: event.created_at,
133 });
134
135 let mut queue = self.events.entry(event.correlation_id).or_insert_with(VecDeque::new);
137 queue.push_back(event);
138 Ok(())
139 }
140
141 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
142 for mut entry in self.events.iter_mut() {
144 if let Some(event) = entry.value_mut().pop_front() {
145 return Ok(Some(event));
146 }
147 }
148 Ok(None)
149 }
150
151 async fn ack(&self, _id: i64) -> Result<()> {
152 Ok(())
154 }
155
156 async fn nack(&self, _id: i64, _retry_after_secs: u64) -> Result<()> {
157 Ok(())
159 }
160
161 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
162 where
163 S: for<'de> Deserialize<'de> + Send,
164 {
165 if let Some(entry) = self.states.get(&correlation_id) {
166 let (json, version) = entry.value();
167 let state: S = serde_json::from_value(json.clone())?;
168 Ok(Some((state, *version)))
169 } else {
170 Ok(None)
171 }
172 }
173
174 async fn save_state<S>(
175 &self,
176 correlation_id: Uuid,
177 state: &S,
178 expected_version: i32,
179 ) -> Result<i32>
180 where
181 S: Serialize + Send + Sync,
182 {
183 let json = serde_json::to_value(state)?;
184 let new_version = expected_version + 1;
185
186 if let Some(mut entry) = self.states.get_mut(&correlation_id) {
188 let (_, current_version) = entry.value();
189 if *current_version != expected_version {
190 return Err(anyhow!("Version mismatch: expected {} but was {}", expected_version, current_version));
191 }
192 *entry.value_mut() = (json, new_version);
193 } else {
194 self.states.insert(correlation_id, (json, new_version));
195 }
196
197 Ok(new_version)
198 }
199
200 async fn insert_effect_intent(
201 &self,
202 event_id: Uuid,
203 effect_id: String,
204 correlation_id: Uuid,
205 event_type: String,
206 event_payload: serde_json::Value,
207 parent_event_id: Option<Uuid>,
208 execute_at: DateTime<Utc>,
209 timeout_seconds: i32,
210 max_attempts: i32,
211 priority: i32,
212 ) -> Result<()> {
213 let execution = QueuedEffectExecution {
214 event_id,
215 effect_id: effect_id.clone(),
216 correlation_id,
217 event_type,
218 event_payload,
219 parent_event_id,
220 execute_at,
221 timeout_seconds,
222 max_attempts,
223 priority,
224 attempts: 0,
225 };
226
227 let now = Utc::now();
229 self.effect_history.insert(
230 (event_id, effect_id.clone()),
231 StoredEffect {
232 effect_id: effect_id.clone(),
233 event_id,
234 correlation_id,
235 status: "pending".to_string(),
236 result: None,
237 error: None,
238 attempts: 0,
239 created_at: now,
240 },
241 );
242
243 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
245 self.publish_insight(InsightEvent {
246 seq,
247 stream_type: StreamType::EffectStarted,
248 correlation_id,
249 event_id: None,
250 effect_event_id: Some(event_id),
251 effect_id: Some(effect_id),
252 event_type: None,
253 status: Some("pending".to_string()),
254 error: None,
255 payload: None,
256 created_at: now,
257 });
258
259 let mut queue = self.effects.lock();
260 queue.push_back(execution);
261 Ok(())
262 }
263
264 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
265 let mut queue = self.effects.lock();
266
267 let now = Utc::now();
269 if let Some(pos) = queue.iter().position(|e| e.execute_at <= now) {
270 Ok(queue.remove(pos))
271 } else {
272 Ok(None)
273 }
274 }
275
276 async fn complete_effect(
277 &self,
278 event_id: Uuid,
279 effect_id: String,
280 result: serde_json::Value,
281 ) -> Result<()> {
282 self.completed_effects.insert((event_id, effect_id.clone()), result.clone());
283
284 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
286 entry.status = "completed".to_string();
287 entry.result = Some(result.clone());
288
289 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
291 self.publish_insight(InsightEvent {
292 seq,
293 stream_type: StreamType::EffectCompleted,
294 correlation_id: entry.correlation_id,
295 event_id: None,
296 effect_event_id: Some(event_id),
297 effect_id: Some(effect_id),
298 event_type: None,
299 status: Some("completed".to_string()),
300 error: None,
301 payload: Some(result),
302 created_at: Utc::now(),
303 });
304 }
305
306 Ok(())
307 }
308
309 async fn complete_effect_with_events(
310 &self,
311 event_id: Uuid,
312 effect_id: String,
313 result: serde_json::Value,
314 emitted_events: Vec<EmittedEvent>,
315 ) -> Result<()> {
316 self.complete_effect(event_id, effect_id.clone(), result).await?;
318
319 for emitted in emitted_events {
321 let new_event_id = Uuid::new_v5(&NAMESPACE_SEESAW, format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes());
322
323 let queued = QueuedEvent {
324 id: self.event_seq.fetch_add(1, Ordering::SeqCst),
325 event_id: new_event_id,
326 parent_id: Some(event_id),
327 correlation_id: event_id, event_type: emitted.event_type,
329 payload: emitted.payload,
330 hops: 0,
331 created_at: Utc::now(),
332 };
333
334 self.publish(queued).await?;
335 }
336
337 Ok(())
338 }
339
340 async fn fail_effect(
341 &self,
342 event_id: Uuid,
343 effect_id: String,
344 error: String,
345 _retry_after_secs: i32,
346 ) -> Result<()> {
347 if let Some(mut entry) = self.effect_history.get_mut(&(event_id, effect_id.clone())) {
349 entry.status = "failed".to_string();
350 entry.error = Some(error.clone());
351 entry.attempts += 1;
352
353 let seq = self.insight_seq.fetch_add(1, Ordering::SeqCst);
355 self.publish_insight(InsightEvent {
356 seq,
357 stream_type: StreamType::EffectFailed,
358 correlation_id: entry.correlation_id,
359 event_id: None,
360 effect_event_id: Some(event_id),
361 effect_id: Some(effect_id),
362 event_type: None,
363 status: Some("failed".to_string()),
364 error: Some(error.clone()),
365 payload: None,
366 created_at: Utc::now(),
367 });
368 }
369
370 eprintln!("Effect failed: {}", error);
371 Ok(())
372 }
373
374 async fn dlq_effect(
375 &self,
376 event_id: Uuid,
377 effect_id: String,
378 error: String,
379 _error_type: String,
380 attempts: i32,
381 ) -> Result<()> {
382 eprintln!("Effect sent to DLQ: {}:{} - {} (attempts: {})", event_id, effect_id, error, attempts);
383 Ok(())
384 }
385
386 async fn subscribe_workflow_events(&self, _correlation_id: Uuid) -> Result<Box<dyn futures::Stream<Item = WorkflowEvent> + Send + Unpin>> {
387 Err(anyhow!("Subscriptions not supported in memory store"))
389 }
390
391 async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<WorkflowStatus> {
392 let has_events = self.events.get(&correlation_id).map(|q| !q.is_empty()).unwrap_or(false);
394 let state = self.states.get(&correlation_id).map(|entry| entry.value().0.clone());
395 let pending_effects = 0i64; Ok(WorkflowStatus {
398 correlation_id,
399 state,
400 pending_effects,
401 is_settled: !has_events && pending_effects == 0,
402 last_event: None, })
404 }
405}
406
407#[async_trait]
408impl InsightStore for MemoryStore {
409 async fn subscribe_events(
410 &self,
411 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
412 let mut rx = self.insight_tx.subscribe();
413 let stream = async_stream::stream! {
414 while let Ok(event) = rx.recv().await {
415 yield event;
416 }
417 };
418
419 Ok(Box::new(Box::pin(stream)))
420 }
421
422 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
423 let mut events: Vec<_> = self
425 .event_history
426 .iter()
427 .filter(|e| e.value().correlation_id == correlation_id)
428 .map(|e| e.value().clone())
429 .collect();
430
431 events.sort_by_key(|e| e.created_at);
432
433 let roots = self.build_event_nodes(&events, None);
435
436 let state = self.states.get(&correlation_id).map(|entry| entry.value().0.clone());
438
439 Ok(WorkflowTree {
440 correlation_id,
441 roots,
442 state,
443 event_count: events.len(),
444 effect_count: self
445 .effect_history
446 .iter()
447 .filter(|e| e.value().correlation_id == correlation_id)
448 .count(),
449 })
450 }
451
452 async fn get_stats(&self) -> Result<InsightStats> {
453 let total_events = self.event_history.len() as i64;
454
455 let mut active_effects = 0i64;
456 let mut completed_effects = 0i64;
457 let mut failed_effects = 0i64;
458
459 for entry in self.effect_history.iter() {
460 match entry.value().status.as_str() {
461 "pending" | "executing" => active_effects += 1,
462 "completed" => completed_effects += 1,
463 "failed" => failed_effects += 1,
464 _ => {}
465 }
466 }
467
468 Ok(InsightStats {
469 total_events,
470 active_effects,
471 completed_effects,
472 failed_effects,
473 })
474 }
475
476 async fn get_recent_events(
477 &self,
478 cursor: Option<i64>,
479 limit: usize,
480 ) -> Result<Vec<InsightEvent>> {
481 let mut events: Vec<_> = self
483 .event_history
484 .iter()
485 .filter_map(|e| {
486 let stored = e.value();
487 if let Some(cursor_seq) = cursor {
489 if stored.seq <= cursor_seq {
490 return None;
491 }
492 }
493 Some(InsightEvent {
494 seq: stored.seq,
495 stream_type: StreamType::EventDispatched,
496 correlation_id: stored.correlation_id,
497 event_id: Some(stored.event_id),
498 effect_event_id: None,
499 effect_id: None,
500 event_type: Some(stored.event_type.clone()),
501 status: None,
502 error: None,
503 payload: Some(stored.payload.clone()),
504 created_at: stored.created_at,
505 })
506 })
507 .collect();
508
509 events.sort_by_key(|e| e.seq);
511 events.truncate(limit);
512
513 Ok(events)
514 }
515}
516
517impl MemoryStore {
518 fn build_event_nodes(&self, events: &[StoredEvent], parent_id: Option<Uuid>) -> Vec<EventNode> {
520 events
521 .iter()
522 .filter(|e| e.parent_id == parent_id)
523 .map(|event| {
524 let effects = self
526 .effect_history
527 .iter()
528 .filter(|e| e.value().event_id == event.event_id)
529 .map(|e| {
530 let effect = e.value();
531 EffectNode {
532 effect_id: effect.effect_id.clone(),
533 event_id: effect.event_id,
534 status: effect.status.clone(),
535 result: effect.result.clone(),
536 error: effect.error.clone(),
537 attempts: effect.attempts,
538 created_at: effect.created_at,
539 }
540 })
541 .collect();
542
543 let children = self.build_event_nodes(events, Some(event.event_id));
545
546 EventNode {
547 event_id: event.event_id,
548 event_type: event.event_type.clone(),
549 payload: event.payload.clone(),
550 created_at: event.created_at,
551 children,
552 effects,
553 }
554 })
555 .collect()
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use seesaw_core::store::Store;
563
564 #[tokio::test]
565 async fn test_insight_events_have_unique_seq() {
566 let store = MemoryStore::new();
567
568 for i in 1..=3 {
570 let event = QueuedEvent {
571 id: i as i64,
572 event_id: Uuid::new_v4(),
573 parent_id: None,
574 correlation_id: Uuid::new_v4(),
575 event_type: format!("Event{}", i),
576 payload: serde_json::json!({"n": i}),
577 created_at: Utc::now(),
578 hops: 0,
579 };
580 store.publish(event).await.unwrap();
581 }
582
583 let events = store.get_recent_events(None, 10).await.unwrap();
585 assert_eq!(events.len(), 3);
586 assert_eq!(events[0].seq, 1);
587 assert_eq!(events[1].seq, 2);
588 assert_eq!(events[2].seq, 3);
589 }
590
591 #[tokio::test]
592 async fn test_cursor_based_filtering() {
593 let store = MemoryStore::new();
594 let correlation_id = Uuid::new_v4();
595
596 for i in 1..=5 {
598 let event = QueuedEvent {
599 id: i as i64,
600 event_id: Uuid::new_v4(),
601 parent_id: None,
602 correlation_id,
603 event_type: format!("Event{}", i),
604 payload: serde_json::json!({"n": i}),
605 created_at: Utc::now(),
606 hops: 0,
607 };
608 store.publish(event).await.unwrap();
609 }
610
611 let events = store.get_recent_events(None, 2).await.unwrap();
613 assert_eq!(events.len(), 2);
614 assert_eq!(events[0].seq, 1);
615 assert_eq!(events[1].seq, 2);
616
617 let next_events = store.get_recent_events(Some(2), 2).await.unwrap();
619 assert_eq!(next_events.len(), 2);
620 assert_eq!(next_events[0].seq, 3);
621 assert_eq!(next_events[1].seq, 4);
622
623 let all_seqs: Vec<i64> = events.iter()
625 .chain(next_events.iter())
626 .map(|e| e.seq)
627 .collect();
628 assert_eq!(all_seqs, vec![1, 2, 3, 4]);
629 }
630
631 #[tokio::test]
632 async fn test_no_events_before_cursor() {
633 let store = MemoryStore::new();
634
635 for i in 1..=3 {
637 let event = QueuedEvent {
638 id: i as i64,
639 event_id: Uuid::new_v4(),
640 parent_id: None,
641 correlation_id: Uuid::new_v4(),
642 event_type: format!("Event{}", i),
643 payload: serde_json::json!({"n": i}),
644 created_at: Utc::now(),
645 hops: 0,
646 };
647 store.publish(event).await.unwrap();
648 }
649
650 let events = store.get_recent_events(Some(10), 10).await.unwrap();
652 assert_eq!(events.len(), 0, "Should return no events when cursor is beyond all seq values");
653 }
654}