Skip to main content

fraiseql_core/federation/
saga_store.rs

1//! PostgreSQL-backed Saga Store for distributed transaction persistence.
2//!
3//! This module provides a production-grade persistent store for sagas using PostgreSQL,
4//! enabling crash recovery, distributed coordination, and saga tracking across instances.
5//!
6//! # Architecture
7//!
8//! The saga store implements the saga pattern for distributed transactions:
9//! - **Forward phase**: Execute steps sequentially across subgraphs
10//! - **Compensation phase**: Rollback failures by executing inverse operations
11//! - **Persistence**: All saga state and steps stored in PostgreSQL
12//! - **Recovery**: Background processes recover interrupted sagas on restart
13//!
14//! # State Machine
15//!
16//! ```text
17//! Pending → Executing → Completed (success)
18//!           ↓
19//!       Failed → Compensating → Compensated (rolledback)
20//! ```
21//!
22//! # Example
23//!
24//! ```ignore
25//! let store = PostgresSagaStore::new("postgresql://localhost/fraiseql").await?;
26//! store.migrate_schema().await?;
27//!
28//! let saga = Saga {
29//!     id: Uuid::new_v4(),
30//!     state: SagaState::Pending,
31//!     created_at: chrono::Utc::now(),
32//!     completed_at: None,
33//!     metadata: None,
34//! };
35//!
36//! store.save_saga(&saga).await?;
37//! ```
38
39use std::sync::Arc;
40
41use deadpool_postgres::Pool;
42use serde_json::Value;
43use uuid::Uuid;
44
45/// Error type for saga store operations
46#[derive(Debug)]
47pub enum SagaStoreError {
48    /// Database connection or query error
49    Database(String),
50    /// Invalid state transition
51    InvalidStateTransition { from: String, to: String },
52    /// Saga not found
53    SagaNotFound(Uuid),
54    /// Step not found
55    StepNotFound(Uuid),
56}
57
58impl std::fmt::Display for SagaStoreError {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::Database(msg) => write!(f, "Database error: {}", msg),
62            Self::InvalidStateTransition { from, to } => {
63                write!(f, "Invalid state transition from {} to {}", from, to)
64            },
65            Self::SagaNotFound(id) => write!(f, "Saga {} not found", id),
66            Self::StepNotFound(id) => write!(f, "Step {} not found", id),
67        }
68    }
69}
70
71impl std::error::Error for SagaStoreError {}
72
73impl From<tokio_postgres::Error> for SagaStoreError {
74    fn from(err: tokio_postgres::Error) -> Self {
75        Self::Database(err.to_string())
76    }
77}
78
79impl From<deadpool_postgres::PoolError> for SagaStoreError {
80    fn from(err: deadpool_postgres::PoolError) -> Self {
81        Self::Database(err.to_string())
82    }
83}
84
85impl<E> From<deadpool::managed::CreatePoolError<E>> for SagaStoreError
86where
87    E: std::fmt::Display,
88{
89    fn from(err: deadpool::managed::CreatePoolError<E>) -> Self {
90        Self::Database(format!("Failed to create connection pool: {}", err))
91    }
92}
93
94pub type Result<T> = std::result::Result<T, SagaStoreError>;
95
96/// SagaState enum
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum SagaState {
99    Pending,
100    Executing,
101    Completed,
102    Failed,
103    Compensating,
104    Compensated,
105}
106
107impl SagaState {
108    pub fn as_str(&self) -> &'static str {
109        match self {
110            SagaState::Pending => "pending",
111            SagaState::Executing => "executing",
112            SagaState::Completed => "completed",
113            SagaState::Failed => "failed",
114            SagaState::Compensating => "compensating",
115            SagaState::Compensated => "compensated",
116        }
117    }
118
119    pub fn from_str(s: &str) -> Option<Self> {
120        match s {
121            "pending" => Some(SagaState::Pending),
122            "executing" => Some(SagaState::Executing),
123            "completed" => Some(SagaState::Completed),
124            "failed" => Some(SagaState::Failed),
125            "compensating" => Some(SagaState::Compensating),
126            "compensated" => Some(SagaState::Compensated),
127            _ => None,
128        }
129    }
130}
131
132/// StepState enum
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum StepState {
135    Pending,
136    Executing,
137    Completed,
138    Failed,
139}
140
141impl StepState {
142    pub fn as_str(&self) -> &'static str {
143        match self {
144            StepState::Pending => "pending",
145            StepState::Executing => "executing",
146            StepState::Completed => "completed",
147            StepState::Failed => "failed",
148        }
149    }
150
151    pub fn from_str(s: &str) -> Option<Self> {
152        match s {
153            "pending" => Some(StepState::Pending),
154            "executing" => Some(StepState::Executing),
155            "completed" => Some(StepState::Completed),
156            "failed" => Some(StepState::Failed),
157            _ => None,
158        }
159    }
160}
161
162/// MutationType enum
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum MutationType {
165    Create,
166    Update,
167    Delete,
168}
169
170impl MutationType {
171    /// Convert to string representation
172    pub fn as_str(&self) -> &'static str {
173        match self {
174            MutationType::Create => "create",
175            MutationType::Update => "update",
176            MutationType::Delete => "delete",
177        }
178    }
179
180    /// Parse from string representation
181    pub fn from_str(s: &str) -> Option<Self> {
182        match s {
183            "create" => Some(MutationType::Create),
184            "update" => Some(MutationType::Update),
185            "delete" => Some(MutationType::Delete),
186            _ => None,
187        }
188    }
189}
190
191/// Saga struct
192#[derive(Debug, Clone)]
193pub struct Saga {
194    pub id:           Uuid,
195    pub state:        SagaState,
196    pub created_at:   chrono::DateTime<chrono::Utc>,
197    pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
198    pub metadata:     Option<Value>,
199}
200
201/// SagaStep struct
202#[derive(Debug, Clone)]
203pub struct SagaStep {
204    pub id:            Uuid,
205    pub saga_id:       Uuid,
206    pub order:         usize,
207    pub subgraph:      String,
208    pub mutation_type: MutationType,
209    pub typename:      String,
210    pub variables:     Value,
211    pub state:         StepState,
212    pub result:        Option<Value>,
213    pub started_at:    Option<chrono::DateTime<chrono::Utc>>,
214    pub completed_at:  Option<chrono::DateTime<chrono::Utc>>,
215}
216
217/// SagaRecovery struct
218#[derive(Debug, Clone)]
219pub struct SagaRecovery {
220    pub id:            Uuid,
221    pub saga_id:       Uuid,
222    pub recovery_type: String,
223    pub attempted_at:  chrono::DateTime<chrono::Utc>,
224    pub last_attempt:  Option<chrono::DateTime<chrono::Utc>>,
225    pub attempt_count: i32,
226    pub last_error:    Option<String>,
227}
228
229/// PostgreSQL-backed Saga Store
230///
231/// Manages persistent storage of sagas and their execution state using PostgreSQL.
232/// Provides crash recovery and distributed coordination across federation instances.
233pub struct PostgresSagaStore {
234    pool: Arc<Pool>,
235}
236
237impl PostgresSagaStore {
238    /// Create a new PostgreSQL saga store with default configuration.
239    ///
240    /// Connects to PostgreSQL and verifies connectivity.
241    ///
242    /// # Arguments
243    ///
244    /// * `_connection_string` - PostgreSQL connection string (currently unused, uses default
245    ///   config)
246    ///
247    /// # Errors
248    ///
249    /// Returns `SagaStoreError::Database` if connection fails.
250    ///
251    /// # Example
252    ///
253    /// ```ignore
254    /// let store = PostgresSagaStore::new("postgresql://localhost/fraiseql").await?;
255    /// ```
256    pub async fn new(_connection_string: &str) -> Result<Self> {
257        // Parse connection string and create pool
258        let cfg = deadpool_postgres::Config {
259            dbname: Some("fraiseql".to_string()),
260            host: Some("localhost".to_string()),
261            port: Some(5432),
262            user: Some("postgres".to_string()),
263            password: Some("postgres".to_string()),
264            ..Default::default()
265        };
266
267        let pool = cfg.create_pool(
268            Some(deadpool_postgres::Runtime::Tokio1),
269            deadpool_postgres::tokio_postgres::NoTls,
270        )?;
271
272        // Test connection
273        let _conn = pool.get().await?;
274
275        Ok(Self {
276            pool: Arc::new(pool),
277        })
278    }
279
280    /// Create database schema and indices if they don't exist
281    ///
282    /// Uses the trinity pattern with proper table naming:
283    /// - `pk_` (BIGINT PRIMARY KEY): Surrogate key for efficient internal joins
284    /// - `id` (UUID NOT NULL UNIQUE): Natural key for distributed systems
285    /// - `tb_` prefix: Table naming convention for trinity pattern
286    /// - Foreign keys use surrogate keys for better performance
287    ///
288    /// # Errors
289    ///
290    /// Returns `SagaStoreError::Database` if schema creation fails.
291    pub async fn migrate_schema(&self) -> Result<()> {
292        let conn = self.pool.get().await?;
293
294        // Create sequence for auto-increment (with tb_ prefix)
295        conn.execute(
296            "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_sagas START 1 INCREMENT 1",
297            &[],
298        )
299        .await?;
300
301        // Create tb_tb_federation_sagas table (trinity pattern)
302        conn.execute(
303            "
304            CREATE TABLE IF NOT EXISTS tb_tb_federation_sagas (
305                pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_sagas'),
306                id UUID NOT NULL UNIQUE,
307                state TEXT NOT NULL,
308                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
309                completed_at TIMESTAMPTZ,
310                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
311                metadata JSONB
312            )
313            ",
314            &[],
315        )
316        .await?;
317
318        // Create sequence for steps
319        conn.execute(
320            "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_saga_steps START 1 INCREMENT 1",
321            &[],
322        )
323        .await?;
324
325        // Create tb_tb_federation_saga_steps table (trinity pattern)
326        conn.execute(
327            "
328            CREATE TABLE IF NOT EXISTS tb_tb_federation_saga_steps (
329                pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_saga_steps'),
330                id UUID NOT NULL UNIQUE,
331                saga_pk_ BIGINT NOT NULL REFERENCES tb_tb_federation_sagas(pk_) ON DELETE CASCADE,
332                step_number INTEGER NOT NULL,
333                subgraph TEXT NOT NULL,
334                mutation_type TEXT NOT NULL,
335                typename TEXT NOT NULL,
336                variables JSONB NOT NULL,
337                state TEXT NOT NULL,
338                result JSONB,
339                started_at TIMESTAMPTZ,
340                completed_at TIMESTAMPTZ,
341                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
342                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
343            )
344            ",
345            &[],
346        )
347        .await?;
348
349        // Create sequence for recovery
350        conn.execute(
351            "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_saga_recovery START 1 INCREMENT 1",
352            &[],
353        )
354        .await?;
355
356        // Create tb_tb_federation_saga_recovery table (trinity pattern)
357        conn.execute(
358            "
359            CREATE TABLE IF NOT EXISTS tb_tb_federation_saga_recovery (
360                pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_saga_recovery'),
361                id UUID NOT NULL UNIQUE,
362                saga_pk_ BIGINT NOT NULL REFERENCES tb_tb_federation_sagas(pk_) ON DELETE CASCADE,
363                recovery_type TEXT NOT NULL,
364                attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
365                last_attempt TIMESTAMPTZ,
366                attempt_count INTEGER DEFAULT 0,
367                last_error TEXT,
368                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
369                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
370            )
371            ",
372            &[],
373        )
374        .await?;
375
376        // Create indices (primary composite indices for natural + surrogate keys)
377        conn.execute(
378            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_id ON tb_tb_federation_sagas(id)",
379            &[],
380        )
381        .await?;
382
383        conn.execute(
384            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_state ON tb_tb_federation_sagas(state)",
385            &[],
386        )
387        .await?;
388
389        conn.execute(
390            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_created ON tb_tb_federation_sagas(created_at)",
391            &[],
392        )
393        .await?;
394
395        conn.execute(
396            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_steps_id ON tb_tb_federation_saga_steps(id)",
397            &[],
398        )
399        .await?;
400
401        conn.execute(
402            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_steps_saga_pk ON tb_tb_federation_saga_steps(saga_pk_)",
403            &[],
404        )
405        .await?;
406
407        conn.execute(
408            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_recovery_id ON tb_tb_federation_saga_recovery(id)",
409            &[],
410        )
411        .await?;
412
413        conn.execute(
414            "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_recovery_saga_pk ON tb_tb_federation_saga_recovery(saga_pk_)",
415            &[],
416        )
417        .await?;
418
419        Ok(())
420    }
421
422    /// Health check - verifies database connectivity
423    ///
424    /// # Errors
425    ///
426    /// Returns `SagaStoreError::Database` if connection fails.
427    pub async fn health_check(&self) -> Result<()> {
428        let _conn = self.pool.get().await?;
429        Ok(())
430    }
431
432    // Helper functions for row mapping to reduce duplication
433
434    /// Map a database row to a Saga struct
435    fn map_saga_row(row: &tokio_postgres::Row) -> Saga {
436        Saga {
437            id:           row.get(0),
438            state:        SagaState::from_str(row.get::<_, String>(1).as_str())
439                .unwrap_or(SagaState::Pending),
440            created_at:   row.get(2),
441            completed_at: row.get(3),
442            metadata:     row.get(4),
443        }
444    }
445
446    /// Map a database row to a SagaStep struct
447    fn map_saga_step_row(row: &tokio_postgres::Row) -> SagaStep {
448        SagaStep {
449            id:            row.get(0),
450            saga_id:       row.get(1),
451            order:         row.get::<_, i32>(2) as usize,
452            subgraph:      row.get(3),
453            mutation_type: MutationType::from_str(row.get::<_, String>(4).as_str())
454                .unwrap_or(MutationType::Update),
455            typename:      row.get(5),
456            variables:     row.get(6),
457            state:         StepState::from_str(row.get::<_, String>(7).as_str())
458                .unwrap_or(StepState::Pending),
459            result:        row.get(8),
460            started_at:    row.get(9),
461            completed_at:  row.get(10),
462        }
463    }
464
465    /// Save or update a saga
466    ///
467    /// Uses upsert semantics - inserts if new, updates if exists.
468    /// Trinity pattern: surrogate pk_ auto-generated, natural key id (UUID) maintained.
469    ///
470    /// # Errors
471    ///
472    /// Returns `SagaStoreError::Database` if the operation fails.
473    pub async fn save_saga(&self, saga: &Saga) -> Result<()> {
474        let conn = self.pool.get().await?;
475        let state = saga.state.as_str();
476        let now = chrono::Utc::now();
477
478        conn.execute(
479            "INSERT INTO tb_federation_sagas (id, state, created_at, completed_at, updated_at, metadata)
480             VALUES ($1, $2, $3, $4, $5, $6)
481             ON CONFLICT (id) DO UPDATE SET
482                 state = $2, completed_at = $4, updated_at = $5, metadata = $6",
483            &[&saga.id, &state, &saga.created_at, &saga.completed_at, &now, &saga.metadata],
484        )
485        .await?;
486
487        Ok(())
488    }
489
490    /// Load a saga by ID
491    ///
492    /// # Errors
493    ///
494    /// Returns `SagaStoreError::Database` if the query fails.
495    pub async fn load_saga(&self, saga_id: Uuid) -> Result<Option<Saga>> {
496        let conn = self.pool.get().await?;
497
498        let row = conn
499            .query_opt(
500                "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas WHERE id = $1",
501                &[&saga_id],
502            )
503            .await?;
504
505        Ok(row.map(|r| Self::map_saga_row(&r)))
506    }
507
508    /// Load all sagas ordered by creation time (newest first)
509    ///
510    /// # Errors
511    ///
512    /// Returns `SagaStoreError::Database` if the query fails.
513    pub async fn load_all_sagas(&self) -> Result<Vec<Saga>> {
514        let conn = self.pool.get().await?;
515
516        let rows = conn
517            .query(
518                "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas ORDER BY created_at DESC",
519                &[],
520            )
521            .await?;
522
523        Ok(rows.into_iter().map(|r| Self::map_saga_row(&r)).collect())
524    }
525
526    /// Load sagas filtered by state
527    ///
528    /// # Errors
529    ///
530    /// Returns `SagaStoreError::Database` if the query fails.
531    pub async fn load_sagas_by_state(&self, state: &SagaState) -> Result<Vec<Saga>> {
532        let conn = self.pool.get().await?;
533        let state_str = state.as_str();
534
535        let rows = conn
536            .query(
537                "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas WHERE state = $1 ORDER BY created_at DESC",
538                &[&state_str],
539            )
540            .await?;
541
542        Ok(rows.into_iter().map(|r| Self::map_saga_row(&r)).collect())
543    }
544
545    /// Update saga state and automatically set completion time for terminal states
546    ///
547    /// Terminal states (Completed, Compensated) automatically receive completed_at timestamp.
548    ///
549    /// # Errors
550    ///
551    /// Returns `SagaStoreError::Database` if the update fails.
552    pub async fn update_saga_state(&self, saga_id: Uuid, state: &SagaState) -> Result<()> {
553        let conn = self.pool.get().await?;
554        let state_str = state.as_str();
555        let now = chrono::Utc::now();
556
557        let completed_at = if matches!(state, SagaState::Completed | SagaState::Compensated) {
558            Some(now)
559        } else {
560            None
561        };
562
563        conn.execute(
564            "UPDATE tb_federation_sagas SET state = $1, completed_at = $2, updated_at = $3 WHERE id = $4",
565            &[&state_str, &completed_at, &now, &saga_id],
566        )
567        .await?;
568
569        Ok(())
570    }
571
572    /// Load a saga step by ID
573    ///
574    /// # Errors
575    ///
576    /// Returns `SagaStoreError::Database` if the query fails.
577    pub async fn load_saga_step(&self, step_id: Uuid) -> Result<Option<SagaStep>> {
578        let conn = self.pool.get().await?;
579
580        let row = conn
581            .query_opt(
582                "SELECT fss.id, fs.id as saga_id, fss.step_number, fss.subgraph, fss.mutation_type, fss.typename, fss.variables, fss.state, fss.result, fss.started_at, fss.completed_at
583                 FROM tb_federation_saga_steps fss
584                 INNER JOIN tb_federation_sagas fs ON fss.saga_pk_ = fs.pk_
585                 WHERE fss.id = $1",
586                &[&step_id],
587            )
588            .await?;
589
590        Ok(row.map(|r| Self::map_saga_step_row(&r)))
591    }
592
593    /// Load all saga steps for a saga, ordered by step number (Trinity pattern with JOIN)
594    ///
595    /// # Errors
596    ///
597    /// Returns `SagaStoreError::Database` if the query fails.
598    pub async fn load_saga_steps(&self, saga_id: Uuid) -> Result<Vec<SagaStep>> {
599        let conn = self.pool.get().await?;
600
601        let rows = conn
602            .query(
603                "SELECT fss.id, fs.id as saga_id, fss.step_number, fss.subgraph, fss.mutation_type, fss.typename, fss.variables, fss.state, fss.result, fss.started_at, fss.completed_at
604                 FROM tb_federation_saga_steps fss
605                 INNER JOIN tb_federation_sagas fs ON fss.saga_pk_ = fs.pk_
606                 WHERE fs.id = $1
607                 ORDER BY fss.step_number ASC",
608                &[&saga_id],
609            )
610            .await?;
611
612        Ok(rows.into_iter().map(|r| Self::map_saga_step_row(&r)).collect())
613    }
614
615    /// Update saga step state and automatically set completion time for terminal states
616    ///
617    /// Terminal states (Completed, Failed) automatically receive completed_at timestamp.
618    ///
619    /// # Errors
620    ///
621    /// Returns `SagaStoreError::Database` if the update fails.
622    pub async fn update_saga_step_state(&self, step_id: Uuid, state: &StepState) -> Result<()> {
623        let conn = self.pool.get().await?;
624        let state_str = state.as_str();
625        let now = chrono::Utc::now();
626
627        let completed_at = if matches!(state, StepState::Completed | StepState::Failed) {
628            Some(now)
629        } else {
630            None
631        };
632
633        conn.execute(
634            "UPDATE tb_federation_saga_steps SET state = $1, completed_at = $2, updated_at = $3 WHERE id = $4",
635            &[&state_str, &completed_at, &now, &step_id],
636        )
637        .await?;
638
639        Ok(())
640    }
641
642    /// Save or update a saga step
643    ///
644    /// Uses upsert semantics - inserts if new, updates if exists.
645    /// Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
646    ///
647    /// # Errors
648    ///
649    /// Returns `SagaStoreError::Database` if the operation fails.
650    pub async fn save_saga_step(&self, step: &SagaStep) -> Result<()> {
651        let conn = self.pool.get().await?;
652        let mutation_type = step.mutation_type.as_str();
653        let state = step.state.as_str();
654        let now = chrono::Utc::now();
655
656        // Note: step.order is casted to i32 for PostgreSQL storage.
657        // In practice, sagas rarely exceed 2 billion steps, so this is safe.
658        #[allow(clippy::cast_possible_wrap)]
659        // Reason: saga count is bounded by configuration and won't exceed i64::MAX
660        let step_number = step.order as i32;
661
662        // Use subquery to convert saga natural key (UUID) to surrogate key (BIGINT) for foreign key
663        conn.execute(
664            "INSERT INTO tb_federation_saga_steps (id, saga_pk_, step_number, subgraph, mutation_type, typename, variables, state, result, started_at, completed_at, created_at, updated_at)
665             SELECT $1, fs.pk_, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
666             FROM tb_federation_sagas fs
667             WHERE fs.id = $2
668             ON CONFLICT (id) DO UPDATE SET state = $8, result = $9, completed_at = $11, updated_at = $13",
669            &[
670                &step.id,
671                &step.saga_id,  // Used in subquery to find saga_pk_
672                &step_number,
673                &step.subgraph,
674                &mutation_type,
675                &step.typename,
676                &step.variables,
677                &state,
678                &step.result,
679                &step.started_at,
680                &step.completed_at,
681                &chrono::Utc::now(),
682                &now,
683            ],
684        )
685        .await?;
686
687        Ok(())
688    }
689
690    /// Update the result of a completed saga step
691    ///
692    /// # Errors
693    ///
694    /// Returns `SagaStoreError::Database` if the update fails.
695    pub async fn update_saga_step_result(&self, step_id: Uuid, result: &Value) -> Result<()> {
696        let conn = self.pool.get().await?;
697        let now = chrono::Utc::now();
698
699        conn.execute(
700            "UPDATE tb_federation_saga_steps SET result = $1, updated_at = $2 WHERE id = $3",
701            &[&result, &now, &step_id],
702        )
703        .await?;
704
705        Ok(())
706    }
707
708    /// Mark a saga for recovery
709    ///
710    /// Creates a recovery record tracking an attempt to recover a failed saga.
711    /// Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
712    ///
713    /// # Errors
714    ///
715    /// Returns `SagaStoreError::Database` if the operation fails.
716    pub async fn mark_saga_for_recovery(&self, saga_id: Uuid, reason: &str) -> Result<()> {
717        let conn = self.pool.get().await?;
718        let recovery_id = Uuid::new_v4();
719        let now = chrono::Utc::now();
720
721        // Use subquery to convert saga natural key to surrogate key
722        conn.execute(
723            "INSERT INTO tb_federation_saga_recovery (id, saga_pk_, recovery_type, attempted_at, attempt_count)
724             SELECT $1, fs.pk_, $3, $4, $5
725             FROM tb_federation_sagas fs
726             WHERE fs.id = $2",
727            &[&recovery_id, &saga_id, &reason, &now, &0i32],
728        )
729        .await?;
730
731        Ok(())
732    }
733
734    /// Find all pending sagas (not yet started)
735    ///
736    /// # Errors
737    ///
738    /// Returns `SagaStoreError::Database` if the query fails.
739    pub async fn find_pending_sagas(&self) -> Result<Vec<Saga>> {
740        self.load_sagas_by_state(&SagaState::Pending).await
741    }
742
743    /// Clear recovery record for a saga
744    ///
745    /// Trinity pattern: uses subquery to convert saga natural key to surrogate key.
746    ///
747    /// # Errors
748    ///
749    /// Returns `SagaStoreError::Database` if the operation fails.
750    pub async fn clear_recovery_record(&self, saga_id: Uuid) -> Result<()> {
751        let conn = self.pool.get().await?;
752        conn.execute(
753            "DELETE FROM tb_federation_saga_recovery
754             WHERE saga_pk_ = (SELECT pk_ FROM tb_federation_sagas WHERE id = $1)",
755            &[&saga_id],
756        )
757        .await?;
758        Ok(())
759    }
760
761    /// Delete a saga and all associated steps and recovery records
762    ///
763    /// CASCADE constraints ensure related records are deleted.
764    /// Uses natural key (UUID) for deletion.
765    ///
766    /// # Errors
767    ///
768    /// Returns `SagaStoreError::Database` if the operation fails.
769    pub async fn delete_saga(&self, saga_id: Uuid) -> Result<()> {
770        let conn = self.pool.get().await?;
771        conn.execute("DELETE FROM tb_federation_sagas WHERE id = $1", &[&saga_id])
772            .await?;
773        Ok(())
774    }
775
776    /// Delete all completed and compensated sagas
777    ///
778    /// # Errors
779    ///
780    /// Returns `SagaStoreError::Database` if the operation fails.
781    ///
782    /// # Returns
783    ///
784    /// Number of sagas deleted.
785    pub async fn delete_completed_sagas(&self) -> Result<u64> {
786        let conn = self.pool.get().await?;
787        let result = conn
788            .execute(
789                "DELETE FROM tb_federation_sagas WHERE state IN ('completed', 'compensated')",
790                &[],
791            )
792            .await?;
793        Ok(result)
794    }
795
796    /// Delete sagas older than the specified threshold that are in a terminal state
797    ///
798    /// # Arguments
799    ///
800    /// * `hours_threshold` - Delete sagas created more than this many hours ago
801    ///
802    /// # Errors
803    ///
804    /// Returns `SagaStoreError::Database` if the operation fails.
805    ///
806    /// # Returns
807    ///
808    /// Number of sagas deleted.
809    pub async fn cleanup_stale_sagas(&self, hours_threshold: i64) -> Result<u64> {
810        let conn = self.pool.get().await?;
811        let result = conn
812            .execute(
813                "DELETE FROM tb_federation_sagas WHERE created_at < NOW() - INTERVAL '1 hour' * $1 AND state IN ('completed', 'compensated')",
814                &[&hours_threshold],
815            )
816            .await?;
817        Ok(result)
818    }
819
820    /// Get the maximum recovery attempt count for a saga
821    ///
822    /// Trinity pattern: uses subquery to convert saga natural key to surrogate key.
823    ///
824    /// # Errors
825    ///
826    /// Returns `SagaStoreError::Database` if the query fails.
827    pub async fn get_recovery_attempts(&self, saga_id: Uuid) -> Result<i32> {
828        let conn = self.pool.get().await?;
829        let row = conn
830            .query_opt(
831                "SELECT COALESCE(MAX(attempt_count), 0) FROM tb_federation_saga_recovery
832                 WHERE saga_pk_ = (SELECT pk_ FROM tb_federation_sagas WHERE id = $1)",
833                &[&saga_id],
834            )
835            .await?;
836        Ok(row.map(|r| r.get(0)).unwrap_or(0))
837    }
838
839    /// Save a saga recovery record
840    ///
841    /// Trinity pattern: subquery converts saga natural key (UUID) to surrogate key (BIGINT).
842    ///
843    /// # Errors
844    ///
845    /// Returns `SagaStoreError::Database` if the operation fails.
846    pub async fn save_recovery_record(&self, recovery: &SagaRecovery) -> Result<()> {
847        let conn = self.pool.get().await?;
848
849        // Use subquery to convert saga natural key to surrogate key
850        conn.execute(
851            "INSERT INTO tb_federation_saga_recovery (id, saga_pk_, recovery_type, attempted_at, last_attempt, attempt_count, last_error)
852             SELECT $1, fs.pk_, $3, $4, $5, $6, $7
853             FROM tb_federation_sagas fs
854             WHERE fs.id = $2",
855            &[
856                &recovery.id,
857                &recovery.saga_id,  // Used in subquery
858                &recovery.recovery_type,
859                &recovery.attempted_at,
860                &recovery.last_attempt,
861                &recovery.attempt_count,
862                &recovery.last_error,
863            ],
864        )
865        .await?;
866
867        Ok(())
868    }
869
870    /// Delete all sagas, steps, and recovery records (for testing)
871    ///
872    /// # Errors
873    ///
874    /// Returns `SagaStoreError::Database` if the operation fails.
875    pub async fn cleanup_all(&self) -> Result<()> {
876        let conn = self.pool.get().await?;
877        conn.execute("DELETE FROM tb_federation_saga_recovery", &[]).await?;
878        conn.execute("DELETE FROM tb_federation_saga_steps", &[]).await?;
879        conn.execute("DELETE FROM tb_federation_sagas", &[]).await?;
880        Ok(())
881    }
882
883    /// Get total number of sagas in the database
884    ///
885    /// # Errors
886    ///
887    /// Returns `SagaStoreError::Database` if the query fails.
888    pub async fn saga_count(&self) -> Result<i64> {
889        let conn = self.pool.get().await?;
890        let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_sagas", &[]).await?;
891        Ok(row.get(0))
892    }
893
894    /// Get total number of saga steps in the database
895    ///
896    /// # Errors
897    ///
898    /// Returns `SagaStoreError::Database` if the query fails.
899    pub async fn step_count(&self) -> Result<i64> {
900        let conn = self.pool.get().await?;
901        let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_saga_steps", &[]).await?;
902        Ok(row.get(0))
903    }
904
905    /// Get total number of saga recovery records in the database
906    ///
907    /// # Errors
908    ///
909    /// Returns `SagaStoreError::Database` if the query fails.
910    pub async fn recovery_count(&self) -> Result<i64> {
911        let conn = self.pool.get().await?;
912        let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_saga_recovery", &[]).await?;
913        Ok(row.get(0))
914    }
915
916    /// Find all stuck sagas (in executing state that may have crashed)
917    ///
918    /// # Errors
919    ///
920    /// Returns `SagaStoreError::Database` if the query fails.
921    pub async fn find_stuck_sagas(&self) -> Result<Vec<Saga>> {
922        self.load_sagas_by_state(&SagaState::Executing).await
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929
930    #[tokio::test]
931    #[ignore = "requires running PostgreSQL database"]
932    async fn test_postgres_connection() {
933        let store = PostgresSagaStore::new("postgresql://localhost/fraiseql_test")
934            .await
935            .expect("Failed to create store");
936        store.health_check().await.expect("Health check failed");
937    }
938}