Skip to main content

ranvier_runtime/
persistence.rs

1use anyhow::{Result, anyhow};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8/// Minimal persisted envelope for Axon execution checkpoints.
9///
10/// M148 baseline contract fields:
11/// - trace
12/// - circuit
13/// - step
14/// - outcome
15/// - timestamp
16/// - payload hash
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PersistenceEnvelope {
19    pub trace_id: String,
20    pub circuit: String,
21    pub step: u64,
22    pub outcome_kind: String,
23    pub timestamp_ms: u64,
24    pub payload_hash: Option<String>,
25}
26
27/// Final completion state tracked for a persisted trace.
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
29pub enum CompletionState {
30    Success,
31    Fault,
32    Cancelled,
33    Compensated,
34}
35
36#[cfg(feature = "persistence-postgres")]
37fn completion_state_to_wire(state: &CompletionState) -> &'static str {
38    match state {
39        CompletionState::Success => "success",
40        CompletionState::Fault => "fault",
41        CompletionState::Cancelled => "cancelled",
42        CompletionState::Compensated => "compensated",
43    }
44}
45
46#[cfg(feature = "persistence-postgres")]
47fn completion_state_from_wire(value: &str) -> Result<CompletionState> {
48    match value {
49        "success" => Ok(CompletionState::Success),
50        "fault" => Ok(CompletionState::Fault),
51        "cancelled" => Ok(CompletionState::Cancelled),
52        "compensated" => Ok(CompletionState::Compensated),
53        other => Err(anyhow!("unknown completion state value: {}", other)),
54    }
55}
56
57/// Stored trace state returned from [`PersistenceStore::load`].
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct PersistedTrace {
60    pub trace_id: String,
61    pub circuit: String,
62    pub events: Vec<PersistenceEnvelope>,
63    pub resumed_from_step: Option<u64>,
64    pub completion: Option<CompletionState>,
65}
66
67/// Resume cursor returned from [`PersistenceStore::resume`].
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct ResumeCursor {
70    pub trace_id: String,
71    pub next_step: u64,
72}
73
74/// Optional trace identifier override for persistence hooks.
75///
76/// Insert into `Bus` when a stable trace identity is required across process restarts.
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct PersistenceTraceId(pub String);
79
80impl PersistenceTraceId {
81    pub fn new(value: impl Into<String>) -> Self {
82        Self(value.into())
83    }
84}
85
86/// Controls whether runtime execution should call `complete` automatically.
87///
88/// Default runtime behavior when this resource is absent: `true`.
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub struct PersistenceAutoComplete(pub bool);
91
92/// Runtime context delivered to compensation hooks.
93///
94/// The context is intentionally compact so hooks can map it to idempotent
95/// compensating actions in domain/infrastructure layers.
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct CompensationContext {
98    pub trace_id: String,
99    pub circuit: String,
100    pub fault_kind: String,
101    pub fault_step: u64,
102    pub timestamp_ms: u64,
103}
104
105/// Controls whether compensation hooks should run automatically on `Fault`.
106///
107/// Default runtime behavior when this resource is absent: `true`.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct CompensationAutoTrigger(pub bool);
110
111/// Retry policy for compensation hook execution.
112///
113/// Defaults to a single attempt (no retry).
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub struct CompensationRetryPolicy {
116    pub max_attempts: u32,
117    pub backoff_ms: u64,
118}
119
120impl Default for CompensationRetryPolicy {
121    fn default() -> Self {
122        Self {
123            max_attempts: 1,
124            backoff_ms: 0,
125        }
126    }
127}
128
129/// Compensation hook contract for irreversible side effects.
130#[deprecated(
131    since = "0.7.0",
132    note = "Experimental compensation contract (M148); API may change before stabilization."
133)]
134#[async_trait]
135pub trait CompensationHook: Send + Sync {
136    async fn compensate(&self, context: CompensationContext) -> Result<()>;
137}
138
139/// Bus-insertable compensation hook handle used by runtime execution hooks.
140#[derive(Clone)]
141pub struct CompensationHandle {
142    inner: Arc<dyn CompensationHook>,
143}
144
145impl std::fmt::Debug for CompensationHandle {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("CompensationHandle").finish_non_exhaustive()
148    }
149}
150
151impl CompensationHandle {
152    /// Create a handle from a concrete compensation hook implementation.
153    pub fn from_hook<H>(hook: H) -> Self
154    where
155        H: CompensationHook + 'static,
156    {
157        Self {
158            inner: Arc::new(hook),
159        }
160    }
161
162    /// Create a handle from an existing trait-object Arc.
163    pub fn from_arc(hook: Arc<dyn CompensationHook>) -> Self {
164        Self { inner: hook }
165    }
166
167    /// Access the shared compensation hook.
168    pub fn hook(&self) -> Arc<dyn CompensationHook> {
169        self.inner.clone()
170    }
171}
172
173/// Idempotency store contract for compensation execution deduplication.
174#[deprecated(
175    since = "0.7.0",
176    note = "Experimental compensation idempotency contract (M148); API may change before stabilization."
177)]
178#[async_trait]
179pub trait CompensationIdempotencyStore: Send + Sync {
180    async fn was_compensated(&self, key: &str) -> Result<bool>;
181    async fn mark_compensated(&self, key: &str) -> Result<()>;
182}
183
184/// Bus-insertable idempotency handle for compensation hooks.
185#[derive(Clone)]
186pub struct CompensationIdempotencyHandle {
187    inner: Arc<dyn CompensationIdempotencyStore>,
188}
189
190impl std::fmt::Debug for CompensationIdempotencyHandle {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("CompensationIdempotencyHandle")
193            .finish_non_exhaustive()
194    }
195}
196
197impl CompensationIdempotencyHandle {
198    pub fn from_store<S>(store: S) -> Self
199    where
200        S: CompensationIdempotencyStore + 'static,
201    {
202        Self {
203            inner: Arc::new(store),
204        }
205    }
206
207    pub fn from_arc(store: Arc<dyn CompensationIdempotencyStore>) -> Self {
208        Self { inner: store }
209    }
210
211    pub fn store(&self) -> Arc<dyn CompensationIdempotencyStore> {
212        self.inner.clone()
213    }
214}
215
216/// In-memory idempotency store for compensation deduplication.
217#[derive(Debug, Default, Clone)]
218pub struct InMemoryCompensationIdempotencyStore {
219    keys: Arc<RwLock<HashSet<String>>>,
220}
221
222impl InMemoryCompensationIdempotencyStore {
223    pub fn new() -> Self {
224        Self::default()
225    }
226}
227
228#[async_trait]
229impl CompensationIdempotencyStore for InMemoryCompensationIdempotencyStore {
230    async fn was_compensated(&self, key: &str) -> Result<bool> {
231        let guard = self.keys.read().await;
232        Ok(guard.contains(key))
233    }
234
235    async fn mark_compensated(&self, key: &str) -> Result<()> {
236        let mut guard = self.keys.write().await;
237        guard.insert(key.to_string());
238        Ok(())
239    }
240}
241
242#[cfg(feature = "persistence-postgres")]
243#[derive(Debug, Clone)]
244pub struct PostgresCompensationIdempotencyStore {
245    pool: sqlx::Pool<sqlx::Postgres>,
246    table: String,
247}
248
249#[cfg(feature = "persistence-postgres")]
250impl PostgresCompensationIdempotencyStore {
251    /// Create a PostgreSQL-backed compensation idempotency store.
252    pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
253        Self::with_table_prefix(pool, "ranvier_persistence")
254    }
255
256    /// Create with custom table prefix.
257    pub fn with_table_prefix(pool: sqlx::Pool<sqlx::Postgres>, prefix: impl Into<String>) -> Self {
258        let prefix = prefix.into();
259        Self {
260            pool,
261            table: format!("{}_compensation_idempotency", prefix),
262        }
263    }
264
265    /// Initialize adapter table when absent.
266    pub async fn ensure_schema(&self) -> Result<()> {
267        let create = format!(
268            "CREATE TABLE IF NOT EXISTS {} (
269                idempotency_key TEXT PRIMARY KEY,
270                created_at_ms BIGINT NOT NULL
271            )",
272            self.table
273        );
274        sqlx::query(&create).execute(&self.pool).await?;
275        Ok(())
276    }
277
278    /// Remove stale idempotency rows older than `cutoff_ms` (unix epoch ms).
279    pub async fn purge_older_than_ms(&self, cutoff_ms: i64) -> Result<u64> {
280        let query = format!(
281            "DELETE FROM {}
282             WHERE created_at_ms < $1",
283            self.table
284        );
285        let rows = sqlx::query(&query)
286            .bind(cutoff_ms)
287            .execute(&self.pool)
288            .await?
289            .rows_affected();
290        Ok(rows)
291    }
292}
293
294#[cfg(feature = "persistence-postgres")]
295#[async_trait]
296impl CompensationIdempotencyStore for PostgresCompensationIdempotencyStore {
297    async fn was_compensated(&self, key: &str) -> Result<bool> {
298        let query = format!(
299            "SELECT 1
300             FROM {}
301             WHERE idempotency_key = $1
302             LIMIT 1",
303            self.table
304        );
305        let row: Option<i32> = sqlx::query_scalar(&query)
306            .bind(key)
307            .fetch_optional(&self.pool)
308            .await?;
309        Ok(row.is_some())
310    }
311
312    async fn mark_compensated(&self, key: &str) -> Result<()> {
313        let query = format!(
314            "INSERT INTO {} (idempotency_key, created_at_ms)
315             VALUES ($1, $2)
316             ON CONFLICT (idempotency_key) DO NOTHING",
317            self.table
318        );
319        let now_ms = std::time::SystemTime::now()
320            .duration_since(std::time::UNIX_EPOCH)?
321            .as_millis();
322        sqlx::query(&query)
323            .bind(key)
324            .bind(i64::try_from(now_ms)?)
325            .execute(&self.pool)
326            .await?;
327        Ok(())
328    }
329}
330
331#[cfg(feature = "persistence-redis")]
332#[derive(Clone)]
333pub struct RedisCompensationIdempotencyStore {
334    manager: redis::aio::ConnectionManager,
335    key_prefix: String,
336    ttl_seconds: Option<u64>,
337}
338
339#[cfg(feature = "persistence-redis")]
340impl std::fmt::Debug for RedisCompensationIdempotencyStore {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        f.debug_struct("RedisCompensationIdempotencyStore")
343            .field("key_prefix", &self.key_prefix)
344            .field("ttl_seconds", &self.ttl_seconds)
345            .finish_non_exhaustive()
346    }
347}
348
349#[cfg(feature = "persistence-redis")]
350impl RedisCompensationIdempotencyStore {
351    /// Connect using Redis connection URL.
352    pub async fn connect(url: &str) -> Result<Self> {
353        let client = redis::Client::open(url)?;
354        let manager = redis::aio::ConnectionManager::new(client).await?;
355        Ok(Self {
356            manager,
357            key_prefix: "ranvier:compensation:idempotency".to_string(),
358            ttl_seconds: None,
359        })
360    }
361
362    pub fn with_prefix(
363        manager: redis::aio::ConnectionManager,
364        key_prefix: impl Into<String>,
365    ) -> Self {
366        Self {
367            manager,
368            key_prefix: key_prefix.into(),
369            ttl_seconds: None,
370        }
371    }
372
373    pub fn with_prefix_and_ttl(
374        manager: redis::aio::ConnectionManager,
375        key_prefix: impl Into<String>,
376        ttl_seconds: u64,
377    ) -> Self {
378        Self {
379            manager,
380            key_prefix: key_prefix.into(),
381            ttl_seconds: Some(ttl_seconds),
382        }
383    }
384
385    fn key(&self, idempotency_key: &str) -> String {
386        format!("{}:{}", self.key_prefix, idempotency_key)
387    }
388}
389
390#[cfg(feature = "persistence-redis")]
391#[async_trait]
392impl CompensationIdempotencyStore for RedisCompensationIdempotencyStore {
393    async fn was_compensated(&self, key: &str) -> Result<bool> {
394        use redis::AsyncCommands;
395        let mut conn = self.manager.clone();
396        let exists: bool = conn.exists(self.key(key)).await?;
397        Ok(exists)
398    }
399
400    async fn mark_compensated(&self, key: &str) -> Result<()> {
401        use redis::AsyncCommands;
402        let mut conn = self.manager.clone();
403        let redis_key = self.key(key);
404        let inserted: bool = conn.set_nx(&redis_key, "1").await?;
405        if inserted {
406            if let Some(ttl_seconds) = self.ttl_seconds {
407                let ttl_i64 = i64::try_from(ttl_seconds)?;
408                let _: bool = conn.expire(&redis_key, ttl_i64).await?;
409            }
410        }
411        Ok(())
412    }
413}
414
415/// Persistence abstraction draft for long-running workflow recovery.
416///
417/// This is intentionally minimal and marked experimental while M148 is active.
418#[deprecated(
419    since = "0.7.0",
420    note = "Experimental persistence contract (M148); API may change before stabilization."
421)]
422#[async_trait]
423pub trait PersistenceStore: Send + Sync {
424    async fn append(&self, envelope: PersistenceEnvelope) -> Result<()>;
425    async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>>;
426    async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor>;
427    async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()>;
428}
429
430/// Bus-insertable persistence handle used by runtime execution hooks.
431#[derive(Clone)]
432pub struct PersistenceHandle {
433    inner: Arc<dyn PersistenceStore>,
434}
435
436impl std::fmt::Debug for PersistenceHandle {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct("PersistenceHandle").finish_non_exhaustive()
439    }
440}
441
442impl PersistenceHandle {
443    /// Create a handle from a concrete store implementation.
444    pub fn from_store<S>(store: S) -> Self
445    where
446        S: PersistenceStore + 'static,
447    {
448        Self {
449            inner: Arc::new(store),
450        }
451    }
452
453    /// Create a handle from an existing trait-object Arc.
454    pub fn from_arc(store: Arc<dyn PersistenceStore>) -> Self {
455        Self { inner: store }
456    }
457
458    /// Access the shared persistence store.
459    pub fn store(&self) -> Arc<dyn PersistenceStore> {
460        self.inner.clone()
461    }
462}
463
464/// In-memory reference adapter for local testing and contract validation.
465#[derive(Debug, Default, Clone)]
466pub struct InMemoryPersistenceStore {
467    inner: Arc<RwLock<HashMap<String, PersistedTrace>>>,
468}
469
470impl InMemoryPersistenceStore {
471    pub fn new() -> Self {
472        Self::default()
473    }
474}
475
476#[async_trait]
477impl PersistenceStore for InMemoryPersistenceStore {
478    async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
479        let mut guard = self.inner.write().await;
480        let entry = guard
481            .entry(envelope.trace_id.clone())
482            .or_insert_with(|| PersistedTrace {
483                trace_id: envelope.trace_id.clone(),
484                circuit: envelope.circuit.clone(),
485                events: Vec::new(),
486                resumed_from_step: None,
487                completion: None,
488            });
489
490        if entry.circuit != envelope.circuit {
491            return Err(anyhow!(
492                "trace_id {} already exists for circuit {}, got {}",
493                envelope.trace_id,
494                entry.circuit,
495                envelope.circuit
496            ));
497        }
498        if entry.completion.is_some() {
499            return Err(anyhow!(
500                "trace_id {} is already completed and cannot accept new events",
501                envelope.trace_id
502            ));
503        }
504        entry.events.push(envelope);
505        entry.events.sort_by_key(|e| e.step);
506        Ok(())
507    }
508
509    async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
510        let guard = self.inner.read().await;
511        Ok(guard.get(trace_id).cloned())
512    }
513
514    async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
515        let mut guard = self.inner.write().await;
516        let trace = guard
517            .get_mut(trace_id)
518            .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
519        trace.resumed_from_step = Some(resume_from_step);
520        Ok(ResumeCursor {
521            trace_id: trace_id.to_string(),
522            next_step: resume_from_step.saturating_add(1),
523        })
524    }
525
526    async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
527        let mut guard = self.inner.write().await;
528        let trace = guard
529            .get_mut(trace_id)
530            .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
531        trace.completion = Some(completion);
532        Ok(())
533    }
534}
535
536#[cfg(feature = "persistence-postgres")]
537#[derive(Debug, Clone)]
538pub struct PostgresPersistenceStore {
539    pool: sqlx::Pool<sqlx::Postgres>,
540    events_table: String,
541    state_table: String,
542}
543
544#[cfg(feature = "persistence-postgres")]
545#[derive(sqlx::FromRow)]
546struct PostgresEventRow {
547    trace_id: String,
548    circuit: String,
549    step: i64,
550    outcome_kind: String,
551    timestamp_ms: i64,
552    payload_hash: Option<String>,
553}
554
555#[cfg(feature = "persistence-postgres")]
556#[derive(sqlx::FromRow)]
557struct PostgresStateRow {
558    trace_id: String,
559    circuit: String,
560    resumed_from_step: Option<i64>,
561    completion: Option<String>,
562}
563
564#[cfg(feature = "persistence-postgres")]
565impl PostgresPersistenceStore {
566    /// Create a PostgreSQL-backed persistence store.
567    ///
568    /// This is an alpha adapter intended for M148 validation.
569    pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
570        Self::with_table_prefix(pool, "ranvier_persistence")
571    }
572
573    /// Create with custom table prefix.
574    pub fn with_table_prefix(pool: sqlx::Pool<sqlx::Postgres>, prefix: impl Into<String>) -> Self {
575        let prefix = prefix.into();
576        Self {
577            pool,
578            events_table: format!("{}_events", prefix),
579            state_table: format!("{}_state", prefix),
580        }
581    }
582
583    /// Initialize adapter tables when absent.
584    pub async fn ensure_schema(&self) -> Result<()> {
585        let create_state = format!(
586            "CREATE TABLE IF NOT EXISTS {} (
587                trace_id TEXT PRIMARY KEY,
588                circuit TEXT NOT NULL,
589                resumed_from_step BIGINT NULL,
590                completion TEXT NULL
591            )",
592            self.state_table
593        );
594        sqlx::query(&create_state).execute(&self.pool).await?;
595
596        let create_events = format!(
597            "CREATE TABLE IF NOT EXISTS {} (
598                trace_id TEXT NOT NULL,
599                circuit TEXT NOT NULL,
600                step BIGINT NOT NULL,
601                outcome_kind TEXT NOT NULL,
602                timestamp_ms BIGINT NOT NULL,
603                payload_hash TEXT NULL,
604                PRIMARY KEY (trace_id, step)
605            )",
606            self.events_table
607        );
608        sqlx::query(&create_events).execute(&self.pool).await?;
609        Ok(())
610    }
611}
612
613#[cfg(feature = "persistence-postgres")]
614#[async_trait]
615impl PersistenceStore for PostgresPersistenceStore {
616    async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
617        let insert_state = format!(
618            "INSERT INTO {} (trace_id, circuit, resumed_from_step, completion)
619             VALUES ($1, $2, NULL, NULL)
620             ON CONFLICT (trace_id) DO NOTHING",
621            self.state_table
622        );
623        sqlx::query(&insert_state)
624            .bind(&envelope.trace_id)
625            .bind(&envelope.circuit)
626            .execute(&self.pool)
627            .await?;
628
629        let read_state = format!(
630            "SELECT circuit FROM {} WHERE trace_id = $1",
631            self.state_table
632        );
633        let existing_circuit: Option<String> = sqlx::query_scalar(&read_state)
634            .bind(&envelope.trace_id)
635            .fetch_optional(&self.pool)
636            .await?;
637        if existing_circuit.as_deref() != Some(envelope.circuit.as_str()) {
638            return Err(anyhow!(
639                "trace_id {} already exists for another circuit",
640                envelope.trace_id
641            ));
642        }
643
644        let completion_query = format!(
645            "SELECT completion FROM {} WHERE trace_id = $1",
646            self.state_table
647        );
648        let completion: Option<Option<String>> = sqlx::query_scalar(&completion_query)
649            .bind(&envelope.trace_id)
650            .fetch_optional(&self.pool)
651            .await?;
652        if completion.flatten().is_some() {
653            return Err(anyhow!(
654                "trace_id {} is already completed and cannot accept new events",
655                envelope.trace_id
656            ));
657        }
658
659        let step_i64 = i64::try_from(envelope.step)?;
660        let ts_i64 = i64::try_from(envelope.timestamp_ms)?;
661        let insert_event = format!(
662            "INSERT INTO {} (trace_id, circuit, step, outcome_kind, timestamp_ms, payload_hash)
663             VALUES ($1, $2, $3, $4, $5, $6)",
664            self.events_table
665        );
666        sqlx::query(&insert_event)
667            .bind(&envelope.trace_id)
668            .bind(&envelope.circuit)
669            .bind(step_i64)
670            .bind(&envelope.outcome_kind)
671            .bind(ts_i64)
672            .bind(&envelope.payload_hash)
673            .execute(&self.pool)
674            .await?;
675        Ok(())
676    }
677
678    async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
679        let state_query = format!(
680            "SELECT trace_id, circuit, resumed_from_step, completion
681             FROM {}
682             WHERE trace_id = $1",
683            self.state_table
684        );
685        let Some(state): Option<PostgresStateRow> = sqlx::query_as(&state_query)
686            .bind(trace_id)
687            .fetch_optional(&self.pool)
688            .await?
689        else {
690            return Ok(None);
691        };
692
693        let events_query = format!(
694            "SELECT trace_id, circuit, step, outcome_kind, timestamp_ms, payload_hash
695             FROM {}
696             WHERE trace_id = $1
697             ORDER BY step ASC",
698            self.events_table
699        );
700        let rows: Vec<PostgresEventRow> = sqlx::query_as(&events_query)
701            .bind(trace_id)
702            .fetch_all(&self.pool)
703            .await?;
704
705        let mut events = Vec::with_capacity(rows.len());
706        for row in rows {
707            events.push(PersistenceEnvelope {
708                trace_id: row.trace_id,
709                circuit: row.circuit,
710                step: u64::try_from(row.step)?,
711                outcome_kind: row.outcome_kind,
712                timestamp_ms: u64::try_from(row.timestamp_ms)?,
713                payload_hash: row.payload_hash,
714            });
715        }
716
717        let completion = match state.completion {
718            Some(value) => Some(completion_state_from_wire(&value)?),
719            None => None,
720        };
721
722        Ok(Some(PersistedTrace {
723            trace_id: state.trace_id,
724            circuit: state.circuit,
725            events,
726            resumed_from_step: state.resumed_from_step.map(u64::try_from).transpose()?,
727            completion,
728        }))
729    }
730
731    async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
732        let query = format!(
733            "UPDATE {}
734             SET resumed_from_step = $2
735             WHERE trace_id = $1",
736            self.state_table
737        );
738        let rows = sqlx::query(&query)
739            .bind(trace_id)
740            .bind(i64::try_from(resume_from_step)?)
741            .execute(&self.pool)
742            .await?
743            .rows_affected();
744        if rows == 0 {
745            return Err(anyhow!("trace_id {} not found", trace_id));
746        }
747        Ok(ResumeCursor {
748            trace_id: trace_id.to_string(),
749            next_step: resume_from_step.saturating_add(1),
750        })
751    }
752
753    async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
754        let query = format!(
755            "UPDATE {}
756             SET completion = $2
757             WHERE trace_id = $1",
758            self.state_table
759        );
760        let rows = sqlx::query(&query)
761            .bind(trace_id)
762            .bind(completion_state_to_wire(&completion))
763            .execute(&self.pool)
764            .await?
765            .rows_affected();
766        if rows == 0 {
767            return Err(anyhow!("trace_id {} not found", trace_id));
768        }
769        Ok(())
770    }
771}
772
773#[cfg(feature = "persistence-redis")]
774#[derive(Clone)]
775pub struct RedisPersistenceStore {
776    manager: redis::aio::ConnectionManager,
777    key_prefix: String,
778}
779
780#[cfg(feature = "persistence-redis")]
781impl std::fmt::Debug for RedisPersistenceStore {
782    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783        f.debug_struct("RedisPersistenceStore")
784            .field("key_prefix", &self.key_prefix)
785            .finish_non_exhaustive()
786    }
787}
788
789#[cfg(feature = "persistence-redis")]
790impl RedisPersistenceStore {
791    /// Connect using Redis connection URL.
792    ///
793    /// Example: `redis://127.0.0.1:6379`
794    pub async fn connect(url: &str) -> Result<Self> {
795        let client = redis::Client::open(url)?;
796        let manager = redis::aio::ConnectionManager::new(client).await?;
797        Ok(Self {
798            manager,
799            key_prefix: "ranvier:persistence".to_string(),
800        })
801    }
802
803    pub fn with_prefix(
804        manager: redis::aio::ConnectionManager,
805        key_prefix: impl Into<String>,
806    ) -> Self {
807        Self {
808            manager,
809            key_prefix: key_prefix.into(),
810        }
811    }
812
813    fn key(&self, trace_id: &str) -> String {
814        format!("{}:{}", self.key_prefix, trace_id)
815    }
816
817    async fn write_trace(&self, trace: &PersistedTrace) -> Result<()> {
818        use redis::AsyncCommands;
819        let key = self.key(&trace.trace_id);
820        let payload = serde_json::to_string(trace)?;
821        let mut conn = self.manager.clone();
822        conn.set::<_, _, ()>(key, payload).await?;
823        Ok(())
824    }
825}
826
827#[cfg(feature = "persistence-redis")]
828#[async_trait]
829impl PersistenceStore for RedisPersistenceStore {
830    async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
831        let mut trace = self
832            .load(&envelope.trace_id)
833            .await?
834            .unwrap_or_else(|| PersistedTrace {
835                trace_id: envelope.trace_id.clone(),
836                circuit: envelope.circuit.clone(),
837                events: Vec::new(),
838                resumed_from_step: None,
839                completion: None,
840            });
841
842        if trace.circuit != envelope.circuit {
843            return Err(anyhow!(
844                "trace_id {} already exists for circuit {}, got {}",
845                envelope.trace_id,
846                trace.circuit,
847                envelope.circuit
848            ));
849        }
850        if trace.completion.is_some() {
851            return Err(anyhow!(
852                "trace_id {} is already completed and cannot accept new events",
853                envelope.trace_id
854            ));
855        }
856
857        trace.events.push(envelope);
858        trace.events.sort_by_key(|event| event.step);
859        self.write_trace(&trace).await?;
860        Ok(())
861    }
862
863    async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
864        use redis::AsyncCommands;
865        let key = self.key(trace_id);
866        let mut conn = self.manager.clone();
867        let payload: Option<String> = conn.get(key).await?;
868        let trace = payload
869            .map(|raw| serde_json::from_str::<PersistedTrace>(&raw))
870            .transpose()?;
871        Ok(trace)
872    }
873
874    async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
875        let mut trace = self
876            .load(trace_id)
877            .await?
878            .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
879        trace.resumed_from_step = Some(resume_from_step);
880        self.write_trace(&trace).await?;
881        Ok(ResumeCursor {
882            trace_id: trace_id.to_string(),
883            next_step: resume_from_step.saturating_add(1),
884        })
885    }
886
887    async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
888        let mut trace = self
889            .load(trace_id)
890            .await?
891            .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
892        trace.completion = Some(completion);
893        self.write_trace(&trace).await?;
894        Ok(())
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use super::*;
901    #[cfg(any(feature = "persistence-postgres", feature = "persistence-redis"))]
902    use uuid::Uuid;
903
904    fn envelope(step: u64, outcome_kind: &str) -> PersistenceEnvelope {
905        PersistenceEnvelope {
906            trace_id: "trace-1".to_string(),
907            circuit: "OrderCircuit".to_string(),
908            step,
909            outcome_kind: outcome_kind.to_string(),
910            timestamp_ms: 1_700_000_000_000 + step,
911            payload_hash: Some(format!("hash-{}", step)),
912        }
913    }
914
915    #[tokio::test]
916    async fn append_and_load_roundtrip() {
917        let store = InMemoryPersistenceStore::new();
918        store.append(envelope(1, "Next")).await.unwrap();
919        store.append(envelope(2, "Branch")).await.unwrap();
920
921        let loaded = store.load("trace-1").await.unwrap().unwrap();
922        assert_eq!(loaded.trace_id, "trace-1");
923        assert_eq!(loaded.circuit, "OrderCircuit");
924        assert_eq!(loaded.events.len(), 2);
925        assert_eq!(loaded.events[0].step, 1);
926        assert_eq!(loaded.events[1].outcome_kind, "Branch");
927        assert_eq!(loaded.completion, None);
928    }
929
930    #[tokio::test]
931    async fn resume_records_cursor() {
932        let store = InMemoryPersistenceStore::new();
933        store.append(envelope(3, "Fault")).await.unwrap();
934
935        let cursor = store.resume("trace-1", 3).await.unwrap();
936        assert_eq!(
937            cursor,
938            ResumeCursor {
939                trace_id: "trace-1".to_string(),
940                next_step: 4
941            }
942        );
943
944        let loaded = store.load("trace-1").await.unwrap().unwrap();
945        assert_eq!(loaded.resumed_from_step, Some(3));
946    }
947
948    #[tokio::test]
949    async fn complete_marks_trace_and_blocks_append() {
950        let store = InMemoryPersistenceStore::new();
951        store.append(envelope(1, "Next")).await.unwrap();
952        store
953            .complete("trace-1", CompletionState::Success)
954            .await
955            .unwrap();
956
957        let loaded = store.load("trace-1").await.unwrap().unwrap();
958        assert_eq!(loaded.completion, Some(CompletionState::Success));
959
960        let err = store.append(envelope(2, "Next")).await.unwrap_err();
961        assert!(
962            err.to_string()
963                .contains("is already completed and cannot accept new events")
964        );
965    }
966
967    #[tokio::test]
968    async fn append_rejects_cross_circuit_trace_reuse() {
969        let store = InMemoryPersistenceStore::new();
970        store.append(envelope(1, "Next")).await.unwrap();
971
972        let mut invalid = envelope(2, "Next");
973        invalid.circuit = "AnotherCircuit".to_string();
974        let err = store.append(invalid).await.unwrap_err();
975        assert!(
976            err.to_string()
977                .contains("already exists for circuit OrderCircuit")
978        );
979    }
980
981    #[tokio::test]
982    async fn in_memory_compensation_idempotency_roundtrip() {
983        let store = InMemoryCompensationIdempotencyStore::new();
984        let key = "trace-a:OrderFlow:Fault";
985
986        assert!(!store.was_compensated(key).await.unwrap());
987        store.mark_compensated(key).await.unwrap();
988        assert!(store.was_compensated(key).await.unwrap());
989    }
990
991    #[cfg(feature = "persistence-postgres")]
992    #[tokio::test]
993    async fn postgres_store_roundtrip_when_configured() {
994        let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
995            Ok(value) => value,
996            Err(_) => return,
997        };
998
999        let pool = sqlx::postgres::PgPoolOptions::new()
1000            .max_connections(5)
1001            .connect(&url)
1002            .await
1003            .unwrap();
1004        let table_prefix = format!("ranvier_persistence_test_{}", Uuid::new_v4().simple());
1005        let store = PostgresPersistenceStore::with_table_prefix(pool.clone(), table_prefix.clone());
1006        store.ensure_schema().await.unwrap();
1007
1008        let trace_id = format!("trace-{}", Uuid::new_v4().simple());
1009        let circuit = "PgCircuit".to_string();
1010
1011        let mut first = envelope(1, "Next");
1012        first.trace_id = trace_id.clone();
1013        first.circuit = circuit.clone();
1014        store.append(first).await.unwrap();
1015
1016        let mut second = envelope(2, "Branch");
1017        second.trace_id = trace_id.clone();
1018        second.circuit = circuit.clone();
1019        store.append(second).await.unwrap();
1020
1021        let cursor = store.resume(&trace_id, 2).await.unwrap();
1022        assert_eq!(cursor.next_step, 3);
1023
1024        store
1025            .complete(&trace_id, CompletionState::Compensated)
1026            .await
1027            .unwrap();
1028
1029        let loaded = store.load(&trace_id).await.unwrap().unwrap();
1030        assert_eq!(loaded.trace_id, trace_id);
1031        assert_eq!(loaded.circuit, circuit);
1032        assert_eq!(loaded.events.len(), 2);
1033        assert_eq!(loaded.resumed_from_step, Some(2));
1034        assert_eq!(loaded.completion, Some(CompletionState::Compensated));
1035
1036        let drop_events = format!("DROP TABLE IF EXISTS {}", store.events_table);
1037        let drop_state = format!("DROP TABLE IF EXISTS {}", store.state_table);
1038        sqlx::query(&drop_events).execute(&pool).await.unwrap();
1039        sqlx::query(&drop_state).execute(&pool).await.unwrap();
1040    }
1041
1042    #[cfg(feature = "persistence-redis")]
1043    #[tokio::test]
1044    async fn redis_store_roundtrip_when_configured() {
1045        let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1046            Ok(value) => value,
1047            Err(_) => return,
1048        };
1049
1050        let base = RedisPersistenceStore::connect(&url).await.unwrap();
1051        let prefix = format!("ranvier:persistence:test:{}", Uuid::new_v4().simple());
1052        let store = RedisPersistenceStore::with_prefix(base.manager.clone(), prefix);
1053
1054        let trace_id = format!("trace-{}", Uuid::new_v4().simple());
1055        let circuit = "RedisCircuit".to_string();
1056
1057        let mut first = envelope(1, "Next");
1058        first.trace_id = trace_id.clone();
1059        first.circuit = circuit.clone();
1060        store.append(first).await.unwrap();
1061
1062        let mut second = envelope(2, "Fault");
1063        second.trace_id = trace_id.clone();
1064        second.circuit = circuit.clone();
1065        store.append(second).await.unwrap();
1066
1067        let cursor = store.resume(&trace_id, 2).await.unwrap();
1068        assert_eq!(cursor.next_step, 3);
1069
1070        store
1071            .complete(&trace_id, CompletionState::Fault)
1072            .await
1073            .unwrap();
1074
1075        let loaded = store.load(&trace_id).await.unwrap().unwrap();
1076        assert_eq!(loaded.trace_id, trace_id);
1077        assert_eq!(loaded.circuit, circuit);
1078        assert_eq!(loaded.events.len(), 2);
1079        assert_eq!(loaded.resumed_from_step, Some(2));
1080        assert_eq!(loaded.completion, Some(CompletionState::Fault));
1081
1082        use redis::AsyncCommands;
1083        let key = store.key(&trace_id);
1084        let mut conn = store.manager.clone();
1085        let _: () = conn.del(key).await.unwrap();
1086    }
1087
1088    #[cfg(feature = "persistence-postgres")]
1089    #[tokio::test]
1090    async fn postgres_compensation_idempotency_roundtrip_when_configured() {
1091        let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
1092            Ok(value) => value,
1093            Err(_) => return,
1094        };
1095
1096        let pool = sqlx::postgres::PgPoolOptions::new()
1097            .max_connections(5)
1098            .connect(&url)
1099            .await
1100            .unwrap();
1101        let table_prefix = format!(
1102            "ranvier_compensation_idempotency_test_{}",
1103            Uuid::new_v4().simple()
1104        );
1105        let store =
1106            PostgresCompensationIdempotencyStore::with_table_prefix(pool.clone(), &table_prefix);
1107        store.ensure_schema().await.unwrap();
1108
1109        let key = format!("trace-{}:OrderFlow:Fault", Uuid::new_v4().simple());
1110        assert!(!store.was_compensated(&key).await.unwrap());
1111        store.mark_compensated(&key).await.unwrap();
1112        assert!(store.was_compensated(&key).await.unwrap());
1113        store.mark_compensated(&key).await.unwrap();
1114        assert!(store.was_compensated(&key).await.unwrap());
1115
1116        let drop_table = format!("DROP TABLE IF EXISTS {}", store.table);
1117        sqlx::query(&drop_table).execute(&pool).await.unwrap();
1118    }
1119
1120    #[cfg(feature = "persistence-postgres")]
1121    #[tokio::test]
1122    async fn postgres_compensation_idempotency_purge_when_configured() {
1123        let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
1124            Ok(value) => value,
1125            Err(_) => return,
1126        };
1127
1128        let pool = sqlx::postgres::PgPoolOptions::new()
1129            .max_connections(5)
1130            .connect(&url)
1131            .await
1132            .unwrap();
1133        let table_prefix = format!(
1134            "ranvier_compensation_idempotency_purge_test_{}",
1135            Uuid::new_v4().simple()
1136        );
1137        let store =
1138            PostgresCompensationIdempotencyStore::with_table_prefix(pool.clone(), &table_prefix);
1139        store.ensure_schema().await.unwrap();
1140
1141        let stale_key = format!("stale-{}", Uuid::new_v4().simple());
1142        let fresh_key = format!("fresh-{}", Uuid::new_v4().simple());
1143        store.mark_compensated(&stale_key).await.unwrap();
1144        store.mark_compensated(&fresh_key).await.unwrap();
1145
1146        let force_stale_query = format!(
1147            "UPDATE {}
1148             SET created_at_ms = 0
1149             WHERE idempotency_key = $1",
1150            store.table
1151        );
1152        sqlx::query(&force_stale_query)
1153            .bind(&stale_key)
1154            .execute(&pool)
1155            .await
1156            .unwrap();
1157
1158        let purged = store.purge_older_than_ms(1).await.unwrap();
1159        assert!(purged >= 1);
1160        assert!(!store.was_compensated(&stale_key).await.unwrap());
1161        assert!(store.was_compensated(&fresh_key).await.unwrap());
1162
1163        let drop_table = format!("DROP TABLE IF EXISTS {}", store.table);
1164        sqlx::query(&drop_table).execute(&pool).await.unwrap();
1165    }
1166
1167    #[cfg(feature = "persistence-redis")]
1168    #[tokio::test]
1169    async fn redis_compensation_idempotency_roundtrip_when_configured() {
1170        let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1171            Ok(value) => value,
1172            Err(_) => return,
1173        };
1174
1175        let base = RedisCompensationIdempotencyStore::connect(&url)
1176            .await
1177            .unwrap();
1178        let prefix = format!(
1179            "ranvier:compensation:idempotency:test:{}",
1180            Uuid::new_v4().simple()
1181        );
1182        let store = RedisCompensationIdempotencyStore::with_prefix(base.manager.clone(), prefix);
1183        let key = format!("trace-{}:OrderFlow:Fault", Uuid::new_v4().simple());
1184
1185        assert!(!store.was_compensated(&key).await.unwrap());
1186        store.mark_compensated(&key).await.unwrap();
1187        assert!(store.was_compensated(&key).await.unwrap());
1188        store.mark_compensated(&key).await.unwrap();
1189        assert!(store.was_compensated(&key).await.unwrap());
1190
1191        use redis::AsyncCommands;
1192        let mut conn = store.manager.clone();
1193        let _: () = conn.del(store.key(&key)).await.unwrap();
1194    }
1195
1196    #[cfg(feature = "persistence-redis")]
1197    #[tokio::test]
1198    async fn redis_compensation_idempotency_ttl_when_configured() {
1199        let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1200            Ok(value) => value,
1201            Err(_) => return,
1202        };
1203
1204        let base = RedisCompensationIdempotencyStore::connect(&url)
1205            .await
1206            .unwrap();
1207        let prefix = format!(
1208            "ranvier:compensation:idempotency:ttl:test:{}",
1209            Uuid::new_v4().simple()
1210        );
1211        let store =
1212            RedisCompensationIdempotencyStore::with_prefix_and_ttl(base.manager.clone(), prefix, 1);
1213        let key = format!("ttl-{}", Uuid::new_v4().simple());
1214
1215        assert!(!store.was_compensated(&key).await.unwrap());
1216        store.mark_compensated(&key).await.unwrap();
1217        assert!(store.was_compensated(&key).await.unwrap());
1218
1219        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1220        assert!(!store.was_compensated(&key).await.unwrap());
1221    }
1222}