1use anyhow::{Result, anyhow};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PersistenceEnvelope {
19 pub trace_id: String,
20 pub circuit: String,
21 pub step: u64,
22 pub outcome_kind: String,
23 pub timestamp_ms: u64,
24 pub payload_hash: Option<String>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
29pub enum CompletionState {
30 Success,
31 Fault,
32 Cancelled,
33 Compensated,
34}
35
36#[cfg(feature = "persistence-postgres")]
37fn completion_state_to_wire(state: &CompletionState) -> &'static str {
38 match state {
39 CompletionState::Success => "success",
40 CompletionState::Fault => "fault",
41 CompletionState::Cancelled => "cancelled",
42 CompletionState::Compensated => "compensated",
43 }
44}
45
46#[cfg(feature = "persistence-postgres")]
47fn completion_state_from_wire(value: &str) -> Result<CompletionState> {
48 match value {
49 "success" => Ok(CompletionState::Success),
50 "fault" => Ok(CompletionState::Fault),
51 "cancelled" => Ok(CompletionState::Cancelled),
52 "compensated" => Ok(CompletionState::Compensated),
53 other => Err(anyhow!("unknown completion state value: {}", other)),
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct PersistedTrace {
60 pub trace_id: String,
61 pub circuit: String,
62 pub events: Vec<PersistenceEnvelope>,
63 pub resumed_from_step: Option<u64>,
64 pub completion: Option<CompletionState>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct ResumeCursor {
70 pub trace_id: String,
71 pub next_step: u64,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct PersistenceTraceId(pub String);
79
80impl PersistenceTraceId {
81 pub fn new(value: impl Into<String>) -> Self {
82 Self(value.into())
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub struct PersistenceAutoComplete(pub bool);
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct CompensationContext {
98 pub trace_id: String,
99 pub circuit: String,
100 pub fault_kind: String,
101 pub fault_step: u64,
102 pub timestamp_ms: u64,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct CompensationAutoTrigger(pub bool);
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub struct CompensationRetryPolicy {
116 pub max_attempts: u32,
117 pub backoff_ms: u64,
118}
119
120impl Default for CompensationRetryPolicy {
121 fn default() -> Self {
122 Self {
123 max_attempts: 1,
124 backoff_ms: 0,
125 }
126 }
127}
128
129#[deprecated(
131 since = "0.7.0",
132 note = "Experimental compensation contract (M148); API may change before stabilization."
133)]
134#[async_trait]
135pub trait CompensationHook: Send + Sync {
136 async fn compensate(&self, context: CompensationContext) -> Result<()>;
137}
138
139#[derive(Clone)]
141pub struct CompensationHandle {
142 inner: Arc<dyn CompensationHook>,
143}
144
145impl std::fmt::Debug for CompensationHandle {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("CompensationHandle").finish_non_exhaustive()
148 }
149}
150
151impl CompensationHandle {
152 pub fn from_hook<H>(hook: H) -> Self
154 where
155 H: CompensationHook + 'static,
156 {
157 Self {
158 inner: Arc::new(hook),
159 }
160 }
161
162 pub fn from_arc(hook: Arc<dyn CompensationHook>) -> Self {
164 Self { inner: hook }
165 }
166
167 pub fn hook(&self) -> Arc<dyn CompensationHook> {
169 self.inner.clone()
170 }
171}
172
173#[deprecated(
175 since = "0.7.0",
176 note = "Experimental compensation idempotency contract (M148); API may change before stabilization."
177)]
178#[async_trait]
179pub trait CompensationIdempotencyStore: Send + Sync {
180 async fn was_compensated(&self, key: &str) -> Result<bool>;
181 async fn mark_compensated(&self, key: &str) -> Result<()>;
182}
183
184#[derive(Clone)]
186pub struct CompensationIdempotencyHandle {
187 inner: Arc<dyn CompensationIdempotencyStore>,
188}
189
190impl std::fmt::Debug for CompensationIdempotencyHandle {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("CompensationIdempotencyHandle")
193 .finish_non_exhaustive()
194 }
195}
196
197impl CompensationIdempotencyHandle {
198 pub fn from_store<S>(store: S) -> Self
199 where
200 S: CompensationIdempotencyStore + 'static,
201 {
202 Self {
203 inner: Arc::new(store),
204 }
205 }
206
207 pub fn from_arc(store: Arc<dyn CompensationIdempotencyStore>) -> Self {
208 Self { inner: store }
209 }
210
211 pub fn store(&self) -> Arc<dyn CompensationIdempotencyStore> {
212 self.inner.clone()
213 }
214}
215
216#[derive(Debug, Default, Clone)]
218pub struct InMemoryCompensationIdempotencyStore {
219 keys: Arc<RwLock<HashSet<String>>>,
220}
221
222impl InMemoryCompensationIdempotencyStore {
223 pub fn new() -> Self {
224 Self::default()
225 }
226}
227
228#[async_trait]
229impl CompensationIdempotencyStore for InMemoryCompensationIdempotencyStore {
230 async fn was_compensated(&self, key: &str) -> Result<bool> {
231 let guard = self.keys.read().await;
232 Ok(guard.contains(key))
233 }
234
235 async fn mark_compensated(&self, key: &str) -> Result<()> {
236 let mut guard = self.keys.write().await;
237 guard.insert(key.to_string());
238 Ok(())
239 }
240}
241
242#[cfg(feature = "persistence-postgres")]
243#[derive(Debug, Clone)]
244pub struct PostgresCompensationIdempotencyStore {
245 pool: sqlx::Pool<sqlx::Postgres>,
246 table: String,
247}
248
249#[cfg(feature = "persistence-postgres")]
250impl PostgresCompensationIdempotencyStore {
251 pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
253 Self::with_table_prefix(pool, "ranvier_persistence")
254 }
255
256 pub fn with_table_prefix(pool: sqlx::Pool<sqlx::Postgres>, prefix: impl Into<String>) -> Self {
258 let prefix = prefix.into();
259 Self {
260 pool,
261 table: format!("{}_compensation_idempotency", prefix),
262 }
263 }
264
265 pub async fn ensure_schema(&self) -> Result<()> {
267 let create = format!(
268 "CREATE TABLE IF NOT EXISTS {} (
269 idempotency_key TEXT PRIMARY KEY,
270 created_at_ms BIGINT NOT NULL
271 )",
272 self.table
273 );
274 sqlx::query(&create).execute(&self.pool).await?;
275 Ok(())
276 }
277
278 pub async fn purge_older_than_ms(&self, cutoff_ms: i64) -> Result<u64> {
280 let query = format!(
281 "DELETE FROM {}
282 WHERE created_at_ms < $1",
283 self.table
284 );
285 let rows = sqlx::query(&query)
286 .bind(cutoff_ms)
287 .execute(&self.pool)
288 .await?
289 .rows_affected();
290 Ok(rows)
291 }
292}
293
294#[cfg(feature = "persistence-postgres")]
295#[async_trait]
296impl CompensationIdempotencyStore for PostgresCompensationIdempotencyStore {
297 async fn was_compensated(&self, key: &str) -> Result<bool> {
298 let query = format!(
299 "SELECT 1
300 FROM {}
301 WHERE idempotency_key = $1
302 LIMIT 1",
303 self.table
304 );
305 let row: Option<i32> = sqlx::query_scalar(&query)
306 .bind(key)
307 .fetch_optional(&self.pool)
308 .await?;
309 Ok(row.is_some())
310 }
311
312 async fn mark_compensated(&self, key: &str) -> Result<()> {
313 let query = format!(
314 "INSERT INTO {} (idempotency_key, created_at_ms)
315 VALUES ($1, $2)
316 ON CONFLICT (idempotency_key) DO NOTHING",
317 self.table
318 );
319 let now_ms = std::time::SystemTime::now()
320 .duration_since(std::time::UNIX_EPOCH)?
321 .as_millis();
322 sqlx::query(&query)
323 .bind(key)
324 .bind(i64::try_from(now_ms)?)
325 .execute(&self.pool)
326 .await?;
327 Ok(())
328 }
329}
330
331#[cfg(feature = "persistence-redis")]
332#[derive(Clone)]
333pub struct RedisCompensationIdempotencyStore {
334 manager: redis::aio::ConnectionManager,
335 key_prefix: String,
336 ttl_seconds: Option<u64>,
337}
338
339#[cfg(feature = "persistence-redis")]
340impl std::fmt::Debug for RedisCompensationIdempotencyStore {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 f.debug_struct("RedisCompensationIdempotencyStore")
343 .field("key_prefix", &self.key_prefix)
344 .field("ttl_seconds", &self.ttl_seconds)
345 .finish_non_exhaustive()
346 }
347}
348
349#[cfg(feature = "persistence-redis")]
350impl RedisCompensationIdempotencyStore {
351 pub async fn connect(url: &str) -> Result<Self> {
353 let client = redis::Client::open(url)?;
354 let manager = redis::aio::ConnectionManager::new(client).await?;
355 Ok(Self {
356 manager,
357 key_prefix: "ranvier:compensation:idempotency".to_string(),
358 ttl_seconds: None,
359 })
360 }
361
362 pub fn with_prefix(
363 manager: redis::aio::ConnectionManager,
364 key_prefix: impl Into<String>,
365 ) -> Self {
366 Self {
367 manager,
368 key_prefix: key_prefix.into(),
369 ttl_seconds: None,
370 }
371 }
372
373 pub fn with_prefix_and_ttl(
374 manager: redis::aio::ConnectionManager,
375 key_prefix: impl Into<String>,
376 ttl_seconds: u64,
377 ) -> Self {
378 Self {
379 manager,
380 key_prefix: key_prefix.into(),
381 ttl_seconds: Some(ttl_seconds),
382 }
383 }
384
385 fn key(&self, idempotency_key: &str) -> String {
386 format!("{}:{}", self.key_prefix, idempotency_key)
387 }
388}
389
390#[cfg(feature = "persistence-redis")]
391#[async_trait]
392impl CompensationIdempotencyStore for RedisCompensationIdempotencyStore {
393 async fn was_compensated(&self, key: &str) -> Result<bool> {
394 use redis::AsyncCommands;
395 let mut conn = self.manager.clone();
396 let exists: bool = conn.exists(self.key(key)).await?;
397 Ok(exists)
398 }
399
400 async fn mark_compensated(&self, key: &str) -> Result<()> {
401 use redis::AsyncCommands;
402 let mut conn = self.manager.clone();
403 let redis_key = self.key(key);
404 let inserted: bool = conn.set_nx(&redis_key, "1").await?;
405 if inserted {
406 if let Some(ttl_seconds) = self.ttl_seconds {
407 let ttl_i64 = i64::try_from(ttl_seconds)?;
408 let _: bool = conn.expire(&redis_key, ttl_i64).await?;
409 }
410 }
411 Ok(())
412 }
413}
414
415#[deprecated(
419 since = "0.7.0",
420 note = "Experimental persistence contract (M148); API may change before stabilization."
421)]
422#[async_trait]
423pub trait PersistenceStore: Send + Sync {
424 async fn append(&self, envelope: PersistenceEnvelope) -> Result<()>;
425 async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>>;
426 async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor>;
427 async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()>;
428}
429
430#[derive(Clone)]
432pub struct PersistenceHandle {
433 inner: Arc<dyn PersistenceStore>,
434}
435
436impl std::fmt::Debug for PersistenceHandle {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct("PersistenceHandle").finish_non_exhaustive()
439 }
440}
441
442impl PersistenceHandle {
443 pub fn from_store<S>(store: S) -> Self
445 where
446 S: PersistenceStore + 'static,
447 {
448 Self {
449 inner: Arc::new(store),
450 }
451 }
452
453 pub fn from_arc(store: Arc<dyn PersistenceStore>) -> Self {
455 Self { inner: store }
456 }
457
458 pub fn store(&self) -> Arc<dyn PersistenceStore> {
460 self.inner.clone()
461 }
462}
463
464#[derive(Debug, Default, Clone)]
466pub struct InMemoryPersistenceStore {
467 inner: Arc<RwLock<HashMap<String, PersistedTrace>>>,
468}
469
470impl InMemoryPersistenceStore {
471 pub fn new() -> Self {
472 Self::default()
473 }
474}
475
476#[async_trait]
477impl PersistenceStore for InMemoryPersistenceStore {
478 async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
479 let mut guard = self.inner.write().await;
480 let entry = guard
481 .entry(envelope.trace_id.clone())
482 .or_insert_with(|| PersistedTrace {
483 trace_id: envelope.trace_id.clone(),
484 circuit: envelope.circuit.clone(),
485 events: Vec::new(),
486 resumed_from_step: None,
487 completion: None,
488 });
489
490 if entry.circuit != envelope.circuit {
491 return Err(anyhow!(
492 "trace_id {} already exists for circuit {}, got {}",
493 envelope.trace_id,
494 entry.circuit,
495 envelope.circuit
496 ));
497 }
498 if entry.completion.is_some() {
499 return Err(anyhow!(
500 "trace_id {} is already completed and cannot accept new events",
501 envelope.trace_id
502 ));
503 }
504 entry.events.push(envelope);
505 entry.events.sort_by_key(|e| e.step);
506 Ok(())
507 }
508
509 async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
510 let guard = self.inner.read().await;
511 Ok(guard.get(trace_id).cloned())
512 }
513
514 async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
515 let mut guard = self.inner.write().await;
516 let trace = guard
517 .get_mut(trace_id)
518 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
519 trace.resumed_from_step = Some(resume_from_step);
520 Ok(ResumeCursor {
521 trace_id: trace_id.to_string(),
522 next_step: resume_from_step.saturating_add(1),
523 })
524 }
525
526 async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
527 let mut guard = self.inner.write().await;
528 let trace = guard
529 .get_mut(trace_id)
530 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
531 trace.completion = Some(completion);
532 Ok(())
533 }
534}
535
536#[cfg(feature = "persistence-postgres")]
537#[derive(Debug, Clone)]
538pub struct PostgresPersistenceStore {
539 pool: sqlx::Pool<sqlx::Postgres>,
540 events_table: String,
541 state_table: String,
542}
543
544#[cfg(feature = "persistence-postgres")]
545#[derive(sqlx::FromRow)]
546struct PostgresEventRow {
547 trace_id: String,
548 circuit: String,
549 step: i64,
550 outcome_kind: String,
551 timestamp_ms: i64,
552 payload_hash: Option<String>,
553}
554
555#[cfg(feature = "persistence-postgres")]
556#[derive(sqlx::FromRow)]
557struct PostgresStateRow {
558 trace_id: String,
559 circuit: String,
560 resumed_from_step: Option<i64>,
561 completion: Option<String>,
562}
563
564#[cfg(feature = "persistence-postgres")]
565impl PostgresPersistenceStore {
566 pub fn new(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
570 Self::with_table_prefix(pool, "ranvier_persistence")
571 }
572
573 pub fn with_table_prefix(pool: sqlx::Pool<sqlx::Postgres>, prefix: impl Into<String>) -> Self {
575 let prefix = prefix.into();
576 Self {
577 pool,
578 events_table: format!("{}_events", prefix),
579 state_table: format!("{}_state", prefix),
580 }
581 }
582
583 pub async fn ensure_schema(&self) -> Result<()> {
585 let create_state = format!(
586 "CREATE TABLE IF NOT EXISTS {} (
587 trace_id TEXT PRIMARY KEY,
588 circuit TEXT NOT NULL,
589 resumed_from_step BIGINT NULL,
590 completion TEXT NULL
591 )",
592 self.state_table
593 );
594 sqlx::query(&create_state).execute(&self.pool).await?;
595
596 let create_events = format!(
597 "CREATE TABLE IF NOT EXISTS {} (
598 trace_id TEXT NOT NULL,
599 circuit TEXT NOT NULL,
600 step BIGINT NOT NULL,
601 outcome_kind TEXT NOT NULL,
602 timestamp_ms BIGINT NOT NULL,
603 payload_hash TEXT NULL,
604 PRIMARY KEY (trace_id, step)
605 )",
606 self.events_table
607 );
608 sqlx::query(&create_events).execute(&self.pool).await?;
609 Ok(())
610 }
611}
612
613#[cfg(feature = "persistence-postgres")]
614#[async_trait]
615impl PersistenceStore for PostgresPersistenceStore {
616 async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
617 let insert_state = format!(
618 "INSERT INTO {} (trace_id, circuit, resumed_from_step, completion)
619 VALUES ($1, $2, NULL, NULL)
620 ON CONFLICT (trace_id) DO NOTHING",
621 self.state_table
622 );
623 sqlx::query(&insert_state)
624 .bind(&envelope.trace_id)
625 .bind(&envelope.circuit)
626 .execute(&self.pool)
627 .await?;
628
629 let read_state = format!(
630 "SELECT circuit FROM {} WHERE trace_id = $1",
631 self.state_table
632 );
633 let existing_circuit: Option<String> = sqlx::query_scalar(&read_state)
634 .bind(&envelope.trace_id)
635 .fetch_optional(&self.pool)
636 .await?;
637 if existing_circuit.as_deref() != Some(envelope.circuit.as_str()) {
638 return Err(anyhow!(
639 "trace_id {} already exists for another circuit",
640 envelope.trace_id
641 ));
642 }
643
644 let completion_query = format!(
645 "SELECT completion FROM {} WHERE trace_id = $1",
646 self.state_table
647 );
648 let completion: Option<Option<String>> = sqlx::query_scalar(&completion_query)
649 .bind(&envelope.trace_id)
650 .fetch_optional(&self.pool)
651 .await?;
652 if completion.flatten().is_some() {
653 return Err(anyhow!(
654 "trace_id {} is already completed and cannot accept new events",
655 envelope.trace_id
656 ));
657 }
658
659 let step_i64 = i64::try_from(envelope.step)?;
660 let ts_i64 = i64::try_from(envelope.timestamp_ms)?;
661 let insert_event = format!(
662 "INSERT INTO {} (trace_id, circuit, step, outcome_kind, timestamp_ms, payload_hash)
663 VALUES ($1, $2, $3, $4, $5, $6)",
664 self.events_table
665 );
666 sqlx::query(&insert_event)
667 .bind(&envelope.trace_id)
668 .bind(&envelope.circuit)
669 .bind(step_i64)
670 .bind(&envelope.outcome_kind)
671 .bind(ts_i64)
672 .bind(&envelope.payload_hash)
673 .execute(&self.pool)
674 .await?;
675 Ok(())
676 }
677
678 async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
679 let state_query = format!(
680 "SELECT trace_id, circuit, resumed_from_step, completion
681 FROM {}
682 WHERE trace_id = $1",
683 self.state_table
684 );
685 let Some(state): Option<PostgresStateRow> = sqlx::query_as(&state_query)
686 .bind(trace_id)
687 .fetch_optional(&self.pool)
688 .await?
689 else {
690 return Ok(None);
691 };
692
693 let events_query = format!(
694 "SELECT trace_id, circuit, step, outcome_kind, timestamp_ms, payload_hash
695 FROM {}
696 WHERE trace_id = $1
697 ORDER BY step ASC",
698 self.events_table
699 );
700 let rows: Vec<PostgresEventRow> = sqlx::query_as(&events_query)
701 .bind(trace_id)
702 .fetch_all(&self.pool)
703 .await?;
704
705 let mut events = Vec::with_capacity(rows.len());
706 for row in rows {
707 events.push(PersistenceEnvelope {
708 trace_id: row.trace_id,
709 circuit: row.circuit,
710 step: u64::try_from(row.step)?,
711 outcome_kind: row.outcome_kind,
712 timestamp_ms: u64::try_from(row.timestamp_ms)?,
713 payload_hash: row.payload_hash,
714 });
715 }
716
717 let completion = match state.completion {
718 Some(value) => Some(completion_state_from_wire(&value)?),
719 None => None,
720 };
721
722 Ok(Some(PersistedTrace {
723 trace_id: state.trace_id,
724 circuit: state.circuit,
725 events,
726 resumed_from_step: state.resumed_from_step.map(u64::try_from).transpose()?,
727 completion,
728 }))
729 }
730
731 async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
732 let query = format!(
733 "UPDATE {}
734 SET resumed_from_step = $2
735 WHERE trace_id = $1",
736 self.state_table
737 );
738 let rows = sqlx::query(&query)
739 .bind(trace_id)
740 .bind(i64::try_from(resume_from_step)?)
741 .execute(&self.pool)
742 .await?
743 .rows_affected();
744 if rows == 0 {
745 return Err(anyhow!("trace_id {} not found", trace_id));
746 }
747 Ok(ResumeCursor {
748 trace_id: trace_id.to_string(),
749 next_step: resume_from_step.saturating_add(1),
750 })
751 }
752
753 async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
754 let query = format!(
755 "UPDATE {}
756 SET completion = $2
757 WHERE trace_id = $1",
758 self.state_table
759 );
760 let rows = sqlx::query(&query)
761 .bind(trace_id)
762 .bind(completion_state_to_wire(&completion))
763 .execute(&self.pool)
764 .await?
765 .rows_affected();
766 if rows == 0 {
767 return Err(anyhow!("trace_id {} not found", trace_id));
768 }
769 Ok(())
770 }
771}
772
773#[cfg(feature = "persistence-redis")]
774#[derive(Clone)]
775pub struct RedisPersistenceStore {
776 manager: redis::aio::ConnectionManager,
777 key_prefix: String,
778}
779
780#[cfg(feature = "persistence-redis")]
781impl std::fmt::Debug for RedisPersistenceStore {
782 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783 f.debug_struct("RedisPersistenceStore")
784 .field("key_prefix", &self.key_prefix)
785 .finish_non_exhaustive()
786 }
787}
788
789#[cfg(feature = "persistence-redis")]
790impl RedisPersistenceStore {
791 pub async fn connect(url: &str) -> Result<Self> {
795 let client = redis::Client::open(url)?;
796 let manager = redis::aio::ConnectionManager::new(client).await?;
797 Ok(Self {
798 manager,
799 key_prefix: "ranvier:persistence".to_string(),
800 })
801 }
802
803 pub fn with_prefix(
804 manager: redis::aio::ConnectionManager,
805 key_prefix: impl Into<String>,
806 ) -> Self {
807 Self {
808 manager,
809 key_prefix: key_prefix.into(),
810 }
811 }
812
813 fn key(&self, trace_id: &str) -> String {
814 format!("{}:{}", self.key_prefix, trace_id)
815 }
816
817 async fn write_trace(&self, trace: &PersistedTrace) -> Result<()> {
818 use redis::AsyncCommands;
819 let key = self.key(&trace.trace_id);
820 let payload = serde_json::to_string(trace)?;
821 let mut conn = self.manager.clone();
822 conn.set::<_, _, ()>(key, payload).await?;
823 Ok(())
824 }
825}
826
827#[cfg(feature = "persistence-redis")]
828#[async_trait]
829impl PersistenceStore for RedisPersistenceStore {
830 async fn append(&self, envelope: PersistenceEnvelope) -> Result<()> {
831 let mut trace = self
832 .load(&envelope.trace_id)
833 .await?
834 .unwrap_or_else(|| PersistedTrace {
835 trace_id: envelope.trace_id.clone(),
836 circuit: envelope.circuit.clone(),
837 events: Vec::new(),
838 resumed_from_step: None,
839 completion: None,
840 });
841
842 if trace.circuit != envelope.circuit {
843 return Err(anyhow!(
844 "trace_id {} already exists for circuit {}, got {}",
845 envelope.trace_id,
846 trace.circuit,
847 envelope.circuit
848 ));
849 }
850 if trace.completion.is_some() {
851 return Err(anyhow!(
852 "trace_id {} is already completed and cannot accept new events",
853 envelope.trace_id
854 ));
855 }
856
857 trace.events.push(envelope);
858 trace.events.sort_by_key(|event| event.step);
859 self.write_trace(&trace).await?;
860 Ok(())
861 }
862
863 async fn load(&self, trace_id: &str) -> Result<Option<PersistedTrace>> {
864 use redis::AsyncCommands;
865 let key = self.key(trace_id);
866 let mut conn = self.manager.clone();
867 let payload: Option<String> = conn.get(key).await?;
868 let trace = payload
869 .map(|raw| serde_json::from_str::<PersistedTrace>(&raw))
870 .transpose()?;
871 Ok(trace)
872 }
873
874 async fn resume(&self, trace_id: &str, resume_from_step: u64) -> Result<ResumeCursor> {
875 let mut trace = self
876 .load(trace_id)
877 .await?
878 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
879 trace.resumed_from_step = Some(resume_from_step);
880 self.write_trace(&trace).await?;
881 Ok(ResumeCursor {
882 trace_id: trace_id.to_string(),
883 next_step: resume_from_step.saturating_add(1),
884 })
885 }
886
887 async fn complete(&self, trace_id: &str, completion: CompletionState) -> Result<()> {
888 let mut trace = self
889 .load(trace_id)
890 .await?
891 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
892 trace.completion = Some(completion);
893 self.write_trace(&trace).await?;
894 Ok(())
895 }
896}
897
898#[cfg(test)]
899mod tests {
900 use super::*;
901 #[cfg(any(feature = "persistence-postgres", feature = "persistence-redis"))]
902 use uuid::Uuid;
903
904 fn envelope(step: u64, outcome_kind: &str) -> PersistenceEnvelope {
905 PersistenceEnvelope {
906 trace_id: "trace-1".to_string(),
907 circuit: "OrderCircuit".to_string(),
908 step,
909 outcome_kind: outcome_kind.to_string(),
910 timestamp_ms: 1_700_000_000_000 + step,
911 payload_hash: Some(format!("hash-{}", step)),
912 }
913 }
914
915 #[tokio::test]
916 async fn append_and_load_roundtrip() {
917 let store = InMemoryPersistenceStore::new();
918 store.append(envelope(1, "Next")).await.unwrap();
919 store.append(envelope(2, "Branch")).await.unwrap();
920
921 let loaded = store.load("trace-1").await.unwrap().unwrap();
922 assert_eq!(loaded.trace_id, "trace-1");
923 assert_eq!(loaded.circuit, "OrderCircuit");
924 assert_eq!(loaded.events.len(), 2);
925 assert_eq!(loaded.events[0].step, 1);
926 assert_eq!(loaded.events[1].outcome_kind, "Branch");
927 assert_eq!(loaded.completion, None);
928 }
929
930 #[tokio::test]
931 async fn resume_records_cursor() {
932 let store = InMemoryPersistenceStore::new();
933 store.append(envelope(3, "Fault")).await.unwrap();
934
935 let cursor = store.resume("trace-1", 3).await.unwrap();
936 assert_eq!(
937 cursor,
938 ResumeCursor {
939 trace_id: "trace-1".to_string(),
940 next_step: 4
941 }
942 );
943
944 let loaded = store.load("trace-1").await.unwrap().unwrap();
945 assert_eq!(loaded.resumed_from_step, Some(3));
946 }
947
948 #[tokio::test]
949 async fn complete_marks_trace_and_blocks_append() {
950 let store = InMemoryPersistenceStore::new();
951 store.append(envelope(1, "Next")).await.unwrap();
952 store
953 .complete("trace-1", CompletionState::Success)
954 .await
955 .unwrap();
956
957 let loaded = store.load("trace-1").await.unwrap().unwrap();
958 assert_eq!(loaded.completion, Some(CompletionState::Success));
959
960 let err = store.append(envelope(2, "Next")).await.unwrap_err();
961 assert!(
962 err.to_string()
963 .contains("is already completed and cannot accept new events")
964 );
965 }
966
967 #[tokio::test]
968 async fn append_rejects_cross_circuit_trace_reuse() {
969 let store = InMemoryPersistenceStore::new();
970 store.append(envelope(1, "Next")).await.unwrap();
971
972 let mut invalid = envelope(2, "Next");
973 invalid.circuit = "AnotherCircuit".to_string();
974 let err = store.append(invalid).await.unwrap_err();
975 assert!(
976 err.to_string()
977 .contains("already exists for circuit OrderCircuit")
978 );
979 }
980
981 #[tokio::test]
982 async fn in_memory_compensation_idempotency_roundtrip() {
983 let store = InMemoryCompensationIdempotencyStore::new();
984 let key = "trace-a:OrderFlow:Fault";
985
986 assert!(!store.was_compensated(key).await.unwrap());
987 store.mark_compensated(key).await.unwrap();
988 assert!(store.was_compensated(key).await.unwrap());
989 }
990
991 #[cfg(feature = "persistence-postgres")]
992 #[tokio::test]
993 async fn postgres_store_roundtrip_when_configured() {
994 let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
995 Ok(value) => value,
996 Err(_) => return,
997 };
998
999 let pool = sqlx::postgres::PgPoolOptions::new()
1000 .max_connections(5)
1001 .connect(&url)
1002 .await
1003 .unwrap();
1004 let table_prefix = format!("ranvier_persistence_test_{}", Uuid::new_v4().simple());
1005 let store = PostgresPersistenceStore::with_table_prefix(pool.clone(), table_prefix.clone());
1006 store.ensure_schema().await.unwrap();
1007
1008 let trace_id = format!("trace-{}", Uuid::new_v4().simple());
1009 let circuit = "PgCircuit".to_string();
1010
1011 let mut first = envelope(1, "Next");
1012 first.trace_id = trace_id.clone();
1013 first.circuit = circuit.clone();
1014 store.append(first).await.unwrap();
1015
1016 let mut second = envelope(2, "Branch");
1017 second.trace_id = trace_id.clone();
1018 second.circuit = circuit.clone();
1019 store.append(second).await.unwrap();
1020
1021 let cursor = store.resume(&trace_id, 2).await.unwrap();
1022 assert_eq!(cursor.next_step, 3);
1023
1024 store
1025 .complete(&trace_id, CompletionState::Compensated)
1026 .await
1027 .unwrap();
1028
1029 let loaded = store.load(&trace_id).await.unwrap().unwrap();
1030 assert_eq!(loaded.trace_id, trace_id);
1031 assert_eq!(loaded.circuit, circuit);
1032 assert_eq!(loaded.events.len(), 2);
1033 assert_eq!(loaded.resumed_from_step, Some(2));
1034 assert_eq!(loaded.completion, Some(CompletionState::Compensated));
1035
1036 let drop_events = format!("DROP TABLE IF EXISTS {}", store.events_table);
1037 let drop_state = format!("DROP TABLE IF EXISTS {}", store.state_table);
1038 sqlx::query(&drop_events).execute(&pool).await.unwrap();
1039 sqlx::query(&drop_state).execute(&pool).await.unwrap();
1040 }
1041
1042 #[cfg(feature = "persistence-redis")]
1043 #[tokio::test]
1044 async fn redis_store_roundtrip_when_configured() {
1045 let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1046 Ok(value) => value,
1047 Err(_) => return,
1048 };
1049
1050 let base = RedisPersistenceStore::connect(&url).await.unwrap();
1051 let prefix = format!("ranvier:persistence:test:{}", Uuid::new_v4().simple());
1052 let store = RedisPersistenceStore::with_prefix(base.manager.clone(), prefix);
1053
1054 let trace_id = format!("trace-{}", Uuid::new_v4().simple());
1055 let circuit = "RedisCircuit".to_string();
1056
1057 let mut first = envelope(1, "Next");
1058 first.trace_id = trace_id.clone();
1059 first.circuit = circuit.clone();
1060 store.append(first).await.unwrap();
1061
1062 let mut second = envelope(2, "Fault");
1063 second.trace_id = trace_id.clone();
1064 second.circuit = circuit.clone();
1065 store.append(second).await.unwrap();
1066
1067 let cursor = store.resume(&trace_id, 2).await.unwrap();
1068 assert_eq!(cursor.next_step, 3);
1069
1070 store
1071 .complete(&trace_id, CompletionState::Fault)
1072 .await
1073 .unwrap();
1074
1075 let loaded = store.load(&trace_id).await.unwrap().unwrap();
1076 assert_eq!(loaded.trace_id, trace_id);
1077 assert_eq!(loaded.circuit, circuit);
1078 assert_eq!(loaded.events.len(), 2);
1079 assert_eq!(loaded.resumed_from_step, Some(2));
1080 assert_eq!(loaded.completion, Some(CompletionState::Fault));
1081
1082 use redis::AsyncCommands;
1083 let key = store.key(&trace_id);
1084 let mut conn = store.manager.clone();
1085 let _: () = conn.del(key).await.unwrap();
1086 }
1087
1088 #[cfg(feature = "persistence-postgres")]
1089 #[tokio::test]
1090 async fn postgres_compensation_idempotency_roundtrip_when_configured() {
1091 let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
1092 Ok(value) => value,
1093 Err(_) => return,
1094 };
1095
1096 let pool = sqlx::postgres::PgPoolOptions::new()
1097 .max_connections(5)
1098 .connect(&url)
1099 .await
1100 .unwrap();
1101 let table_prefix = format!(
1102 "ranvier_compensation_idempotency_test_{}",
1103 Uuid::new_v4().simple()
1104 );
1105 let store =
1106 PostgresCompensationIdempotencyStore::with_table_prefix(pool.clone(), &table_prefix);
1107 store.ensure_schema().await.unwrap();
1108
1109 let key = format!("trace-{}:OrderFlow:Fault", Uuid::new_v4().simple());
1110 assert!(!store.was_compensated(&key).await.unwrap());
1111 store.mark_compensated(&key).await.unwrap();
1112 assert!(store.was_compensated(&key).await.unwrap());
1113 store.mark_compensated(&key).await.unwrap();
1114 assert!(store.was_compensated(&key).await.unwrap());
1115
1116 let drop_table = format!("DROP TABLE IF EXISTS {}", store.table);
1117 sqlx::query(&drop_table).execute(&pool).await.unwrap();
1118 }
1119
1120 #[cfg(feature = "persistence-postgres")]
1121 #[tokio::test]
1122 async fn postgres_compensation_idempotency_purge_when_configured() {
1123 let url = match std::env::var("RANVIER_PERSISTENCE_POSTGRES_URL") {
1124 Ok(value) => value,
1125 Err(_) => return,
1126 };
1127
1128 let pool = sqlx::postgres::PgPoolOptions::new()
1129 .max_connections(5)
1130 .connect(&url)
1131 .await
1132 .unwrap();
1133 let table_prefix = format!(
1134 "ranvier_compensation_idempotency_purge_test_{}",
1135 Uuid::new_v4().simple()
1136 );
1137 let store =
1138 PostgresCompensationIdempotencyStore::with_table_prefix(pool.clone(), &table_prefix);
1139 store.ensure_schema().await.unwrap();
1140
1141 let stale_key = format!("stale-{}", Uuid::new_v4().simple());
1142 let fresh_key = format!("fresh-{}", Uuid::new_v4().simple());
1143 store.mark_compensated(&stale_key).await.unwrap();
1144 store.mark_compensated(&fresh_key).await.unwrap();
1145
1146 let force_stale_query = format!(
1147 "UPDATE {}
1148 SET created_at_ms = 0
1149 WHERE idempotency_key = $1",
1150 store.table
1151 );
1152 sqlx::query(&force_stale_query)
1153 .bind(&stale_key)
1154 .execute(&pool)
1155 .await
1156 .unwrap();
1157
1158 let purged = store.purge_older_than_ms(1).await.unwrap();
1159 assert!(purged >= 1);
1160 assert!(!store.was_compensated(&stale_key).await.unwrap());
1161 assert!(store.was_compensated(&fresh_key).await.unwrap());
1162
1163 let drop_table = format!("DROP TABLE IF EXISTS {}", store.table);
1164 sqlx::query(&drop_table).execute(&pool).await.unwrap();
1165 }
1166
1167 #[cfg(feature = "persistence-redis")]
1168 #[tokio::test]
1169 async fn redis_compensation_idempotency_roundtrip_when_configured() {
1170 let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1171 Ok(value) => value,
1172 Err(_) => return,
1173 };
1174
1175 let base = RedisCompensationIdempotencyStore::connect(&url)
1176 .await
1177 .unwrap();
1178 let prefix = format!(
1179 "ranvier:compensation:idempotency:test:{}",
1180 Uuid::new_v4().simple()
1181 );
1182 let store = RedisCompensationIdempotencyStore::with_prefix(base.manager.clone(), prefix);
1183 let key = format!("trace-{}:OrderFlow:Fault", Uuid::new_v4().simple());
1184
1185 assert!(!store.was_compensated(&key).await.unwrap());
1186 store.mark_compensated(&key).await.unwrap();
1187 assert!(store.was_compensated(&key).await.unwrap());
1188 store.mark_compensated(&key).await.unwrap();
1189 assert!(store.was_compensated(&key).await.unwrap());
1190
1191 use redis::AsyncCommands;
1192 let mut conn = store.manager.clone();
1193 let _: () = conn.del(store.key(&key)).await.unwrap();
1194 }
1195
1196 #[cfg(feature = "persistence-redis")]
1197 #[tokio::test]
1198 async fn redis_compensation_idempotency_ttl_when_configured() {
1199 let url = match std::env::var("RANVIER_PERSISTENCE_REDIS_URL") {
1200 Ok(value) => value,
1201 Err(_) => return,
1202 };
1203
1204 let base = RedisCompensationIdempotencyStore::connect(&url)
1205 .await
1206 .unwrap();
1207 let prefix = format!(
1208 "ranvier:compensation:idempotency:ttl:test:{}",
1209 Uuid::new_v4().simple()
1210 );
1211 let store =
1212 RedisCompensationIdempotencyStore::with_prefix_and_ttl(base.manager.clone(), prefix, 1);
1213 let key = format!("ttl-{}", Uuid::new_v4().simple());
1214
1215 assert!(!store.was_compensated(&key).await.unwrap());
1216 store.mark_compensated(&key).await.unwrap();
1217 assert!(store.was_compensated(&key).await.unwrap());
1218
1219 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1220 assert!(!store.was_compensated(&key).await.unwrap());
1221 }
1222}