1use std::sync::Arc;
40
41use deadpool_postgres::Pool;
42use serde_json::Value;
43use uuid::Uuid;
44
45#[derive(Debug)]
47pub enum SagaStoreError {
48 Database(String),
50 InvalidStateTransition { from: String, to: String },
52 SagaNotFound(Uuid),
54 StepNotFound(Uuid),
56}
57
58impl std::fmt::Display for SagaStoreError {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 Self::Database(msg) => write!(f, "Database error: {}", msg),
62 Self::InvalidStateTransition { from, to } => {
63 write!(f, "Invalid state transition from {} to {}", from, to)
64 },
65 Self::SagaNotFound(id) => write!(f, "Saga {} not found", id),
66 Self::StepNotFound(id) => write!(f, "Step {} not found", id),
67 }
68 }
69}
70
71impl std::error::Error for SagaStoreError {}
72
73impl From<tokio_postgres::Error> for SagaStoreError {
74 fn from(err: tokio_postgres::Error) -> Self {
75 Self::Database(err.to_string())
76 }
77}
78
79impl From<deadpool_postgres::PoolError> for SagaStoreError {
80 fn from(err: deadpool_postgres::PoolError) -> Self {
81 Self::Database(err.to_string())
82 }
83}
84
85impl<E> From<deadpool::managed::CreatePoolError<E>> for SagaStoreError
86where
87 E: std::fmt::Display,
88{
89 fn from(err: deadpool::managed::CreatePoolError<E>) -> Self {
90 Self::Database(format!("Failed to create connection pool: {}", err))
91 }
92}
93
94pub type Result<T> = std::result::Result<T, SagaStoreError>;
95
96#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum SagaState {
99 Pending,
100 Executing,
101 Completed,
102 Failed,
103 Compensating,
104 Compensated,
105}
106
107impl SagaState {
108 pub fn as_str(&self) -> &'static str {
109 match self {
110 SagaState::Pending => "pending",
111 SagaState::Executing => "executing",
112 SagaState::Completed => "completed",
113 SagaState::Failed => "failed",
114 SagaState::Compensating => "compensating",
115 SagaState::Compensated => "compensated",
116 }
117 }
118
119 pub fn from_str(s: &str) -> Option<Self> {
120 match s {
121 "pending" => Some(SagaState::Pending),
122 "executing" => Some(SagaState::Executing),
123 "completed" => Some(SagaState::Completed),
124 "failed" => Some(SagaState::Failed),
125 "compensating" => Some(SagaState::Compensating),
126 "compensated" => Some(SagaState::Compensated),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum StepState {
135 Pending,
136 Executing,
137 Completed,
138 Failed,
139}
140
141impl StepState {
142 pub fn as_str(&self) -> &'static str {
143 match self {
144 StepState::Pending => "pending",
145 StepState::Executing => "executing",
146 StepState::Completed => "completed",
147 StepState::Failed => "failed",
148 }
149 }
150
151 pub fn from_str(s: &str) -> Option<Self> {
152 match s {
153 "pending" => Some(StepState::Pending),
154 "executing" => Some(StepState::Executing),
155 "completed" => Some(StepState::Completed),
156 "failed" => Some(StepState::Failed),
157 _ => None,
158 }
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum MutationType {
165 Create,
166 Update,
167 Delete,
168}
169
170impl MutationType {
171 pub fn as_str(&self) -> &'static str {
173 match self {
174 MutationType::Create => "create",
175 MutationType::Update => "update",
176 MutationType::Delete => "delete",
177 }
178 }
179
180 pub fn from_str(s: &str) -> Option<Self> {
182 match s {
183 "create" => Some(MutationType::Create),
184 "update" => Some(MutationType::Update),
185 "delete" => Some(MutationType::Delete),
186 _ => None,
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct Saga {
194 pub id: Uuid,
195 pub state: SagaState,
196 pub created_at: chrono::DateTime<chrono::Utc>,
197 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
198 pub metadata: Option<Value>,
199}
200
201#[derive(Debug, Clone)]
203pub struct SagaStep {
204 pub id: Uuid,
205 pub saga_id: Uuid,
206 pub order: usize,
207 pub subgraph: String,
208 pub mutation_type: MutationType,
209 pub typename: String,
210 pub variables: Value,
211 pub state: StepState,
212 pub result: Option<Value>,
213 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
214 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
215}
216
217#[derive(Debug, Clone)]
219pub struct SagaRecovery {
220 pub id: Uuid,
221 pub saga_id: Uuid,
222 pub recovery_type: String,
223 pub attempted_at: chrono::DateTime<chrono::Utc>,
224 pub last_attempt: Option<chrono::DateTime<chrono::Utc>>,
225 pub attempt_count: i32,
226 pub last_error: Option<String>,
227}
228
229pub struct PostgresSagaStore {
234 pool: Arc<Pool>,
235}
236
237impl PostgresSagaStore {
238 pub async fn new(_connection_string: &str) -> Result<Self> {
257 let cfg = deadpool_postgres::Config {
259 dbname: Some("fraiseql".to_string()),
260 host: Some("localhost".to_string()),
261 port: Some(5432),
262 user: Some("postgres".to_string()),
263 password: Some("postgres".to_string()),
264 ..Default::default()
265 };
266
267 let pool = cfg.create_pool(
268 Some(deadpool_postgres::Runtime::Tokio1),
269 deadpool_postgres::tokio_postgres::NoTls,
270 )?;
271
272 let _conn = pool.get().await?;
274
275 Ok(Self {
276 pool: Arc::new(pool),
277 })
278 }
279
280 pub async fn migrate_schema(&self) -> Result<()> {
292 let conn = self.pool.get().await?;
293
294 conn.execute(
296 "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_sagas START 1 INCREMENT 1",
297 &[],
298 )
299 .await?;
300
301 conn.execute(
303 "
304 CREATE TABLE IF NOT EXISTS tb_tb_federation_sagas (
305 pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_sagas'),
306 id UUID NOT NULL UNIQUE,
307 state TEXT NOT NULL,
308 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
309 completed_at TIMESTAMPTZ,
310 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
311 metadata JSONB
312 )
313 ",
314 &[],
315 )
316 .await?;
317
318 conn.execute(
320 "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_saga_steps START 1 INCREMENT 1",
321 &[],
322 )
323 .await?;
324
325 conn.execute(
327 "
328 CREATE TABLE IF NOT EXISTS tb_tb_federation_saga_steps (
329 pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_saga_steps'),
330 id UUID NOT NULL UNIQUE,
331 saga_pk_ BIGINT NOT NULL REFERENCES tb_tb_federation_sagas(pk_) ON DELETE CASCADE,
332 step_number INTEGER NOT NULL,
333 subgraph TEXT NOT NULL,
334 mutation_type TEXT NOT NULL,
335 typename TEXT NOT NULL,
336 variables JSONB NOT NULL,
337 state TEXT NOT NULL,
338 result JSONB,
339 started_at TIMESTAMPTZ,
340 completed_at TIMESTAMPTZ,
341 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
342 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
343 )
344 ",
345 &[],
346 )
347 .await?;
348
349 conn.execute(
351 "CREATE SEQUENCE IF NOT EXISTS seq_tb_tb_federation_saga_recovery START 1 INCREMENT 1",
352 &[],
353 )
354 .await?;
355
356 conn.execute(
358 "
359 CREATE TABLE IF NOT EXISTS tb_tb_federation_saga_recovery (
360 pk_ BIGINT PRIMARY KEY DEFAULT nextval('seq_tb_tb_federation_saga_recovery'),
361 id UUID NOT NULL UNIQUE,
362 saga_pk_ BIGINT NOT NULL REFERENCES tb_tb_federation_sagas(pk_) ON DELETE CASCADE,
363 recovery_type TEXT NOT NULL,
364 attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
365 last_attempt TIMESTAMPTZ,
366 attempt_count INTEGER DEFAULT 0,
367 last_error TEXT,
368 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
369 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
370 )
371 ",
372 &[],
373 )
374 .await?;
375
376 conn.execute(
378 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_id ON tb_tb_federation_sagas(id)",
379 &[],
380 )
381 .await?;
382
383 conn.execute(
384 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_state ON tb_tb_federation_sagas(state)",
385 &[],
386 )
387 .await?;
388
389 conn.execute(
390 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_sagas_created ON tb_tb_federation_sagas(created_at)",
391 &[],
392 )
393 .await?;
394
395 conn.execute(
396 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_steps_id ON tb_tb_federation_saga_steps(id)",
397 &[],
398 )
399 .await?;
400
401 conn.execute(
402 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_steps_saga_pk ON tb_tb_federation_saga_steps(saga_pk_)",
403 &[],
404 )
405 .await?;
406
407 conn.execute(
408 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_recovery_id ON tb_tb_federation_saga_recovery(id)",
409 &[],
410 )
411 .await?;
412
413 conn.execute(
414 "CREATE INDEX IF NOT EXISTS idx_tb_tb_federation_saga_recovery_saga_pk ON tb_tb_federation_saga_recovery(saga_pk_)",
415 &[],
416 )
417 .await?;
418
419 Ok(())
420 }
421
422 pub async fn health_check(&self) -> Result<()> {
428 let _conn = self.pool.get().await?;
429 Ok(())
430 }
431
432 fn map_saga_row(row: &tokio_postgres::Row) -> Saga {
436 Saga {
437 id: row.get(0),
438 state: SagaState::from_str(row.get::<_, String>(1).as_str())
439 .unwrap_or(SagaState::Pending),
440 created_at: row.get(2),
441 completed_at: row.get(3),
442 metadata: row.get(4),
443 }
444 }
445
446 fn map_saga_step_row(row: &tokio_postgres::Row) -> SagaStep {
448 SagaStep {
449 id: row.get(0),
450 saga_id: row.get(1),
451 order: row.get::<_, i32>(2) as usize,
452 subgraph: row.get(3),
453 mutation_type: MutationType::from_str(row.get::<_, String>(4).as_str())
454 .unwrap_or(MutationType::Update),
455 typename: row.get(5),
456 variables: row.get(6),
457 state: StepState::from_str(row.get::<_, String>(7).as_str())
458 .unwrap_or(StepState::Pending),
459 result: row.get(8),
460 started_at: row.get(9),
461 completed_at: row.get(10),
462 }
463 }
464
465 pub async fn save_saga(&self, saga: &Saga) -> Result<()> {
474 let conn = self.pool.get().await?;
475 let state = saga.state.as_str();
476 let now = chrono::Utc::now();
477
478 conn.execute(
479 "INSERT INTO tb_federation_sagas (id, state, created_at, completed_at, updated_at, metadata)
480 VALUES ($1, $2, $3, $4, $5, $6)
481 ON CONFLICT (id) DO UPDATE SET
482 state = $2, completed_at = $4, updated_at = $5, metadata = $6",
483 &[&saga.id, &state, &saga.created_at, &saga.completed_at, &now, &saga.metadata],
484 )
485 .await?;
486
487 Ok(())
488 }
489
490 pub async fn load_saga(&self, saga_id: Uuid) -> Result<Option<Saga>> {
496 let conn = self.pool.get().await?;
497
498 let row = conn
499 .query_opt(
500 "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas WHERE id = $1",
501 &[&saga_id],
502 )
503 .await?;
504
505 Ok(row.map(|r| Self::map_saga_row(&r)))
506 }
507
508 pub async fn load_all_sagas(&self) -> Result<Vec<Saga>> {
514 let conn = self.pool.get().await?;
515
516 let rows = conn
517 .query(
518 "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas ORDER BY created_at DESC",
519 &[],
520 )
521 .await?;
522
523 Ok(rows.into_iter().map(|r| Self::map_saga_row(&r)).collect())
524 }
525
526 pub async fn load_sagas_by_state(&self, state: &SagaState) -> Result<Vec<Saga>> {
532 let conn = self.pool.get().await?;
533 let state_str = state.as_str();
534
535 let rows = conn
536 .query(
537 "SELECT id, state, created_at, completed_at, metadata FROM tb_federation_sagas WHERE state = $1 ORDER BY created_at DESC",
538 &[&state_str],
539 )
540 .await?;
541
542 Ok(rows.into_iter().map(|r| Self::map_saga_row(&r)).collect())
543 }
544
545 pub async fn update_saga_state(&self, saga_id: Uuid, state: &SagaState) -> Result<()> {
553 let conn = self.pool.get().await?;
554 let state_str = state.as_str();
555 let now = chrono::Utc::now();
556
557 let completed_at = if matches!(state, SagaState::Completed | SagaState::Compensated) {
558 Some(now)
559 } else {
560 None
561 };
562
563 conn.execute(
564 "UPDATE tb_federation_sagas SET state = $1, completed_at = $2, updated_at = $3 WHERE id = $4",
565 &[&state_str, &completed_at, &now, &saga_id],
566 )
567 .await?;
568
569 Ok(())
570 }
571
572 pub async fn load_saga_step(&self, step_id: Uuid) -> Result<Option<SagaStep>> {
578 let conn = self.pool.get().await?;
579
580 let row = conn
581 .query_opt(
582 "SELECT fss.id, fs.id as saga_id, fss.step_number, fss.subgraph, fss.mutation_type, fss.typename, fss.variables, fss.state, fss.result, fss.started_at, fss.completed_at
583 FROM tb_federation_saga_steps fss
584 INNER JOIN tb_federation_sagas fs ON fss.saga_pk_ = fs.pk_
585 WHERE fss.id = $1",
586 &[&step_id],
587 )
588 .await?;
589
590 Ok(row.map(|r| Self::map_saga_step_row(&r)))
591 }
592
593 pub async fn load_saga_steps(&self, saga_id: Uuid) -> Result<Vec<SagaStep>> {
599 let conn = self.pool.get().await?;
600
601 let rows = conn
602 .query(
603 "SELECT fss.id, fs.id as saga_id, fss.step_number, fss.subgraph, fss.mutation_type, fss.typename, fss.variables, fss.state, fss.result, fss.started_at, fss.completed_at
604 FROM tb_federation_saga_steps fss
605 INNER JOIN tb_federation_sagas fs ON fss.saga_pk_ = fs.pk_
606 WHERE fs.id = $1
607 ORDER BY fss.step_number ASC",
608 &[&saga_id],
609 )
610 .await?;
611
612 Ok(rows.into_iter().map(|r| Self::map_saga_step_row(&r)).collect())
613 }
614
615 pub async fn update_saga_step_state(&self, step_id: Uuid, state: &StepState) -> Result<()> {
623 let conn = self.pool.get().await?;
624 let state_str = state.as_str();
625 let now = chrono::Utc::now();
626
627 let completed_at = if matches!(state, StepState::Completed | StepState::Failed) {
628 Some(now)
629 } else {
630 None
631 };
632
633 conn.execute(
634 "UPDATE tb_federation_saga_steps SET state = $1, completed_at = $2, updated_at = $3 WHERE id = $4",
635 &[&state_str, &completed_at, &now, &step_id],
636 )
637 .await?;
638
639 Ok(())
640 }
641
642 pub async fn save_saga_step(&self, step: &SagaStep) -> Result<()> {
651 let conn = self.pool.get().await?;
652 let mutation_type = step.mutation_type.as_str();
653 let state = step.state.as_str();
654 let now = chrono::Utc::now();
655
656 #[allow(clippy::cast_possible_wrap)]
659 let step_number = step.order as i32;
661
662 conn.execute(
664 "INSERT INTO tb_federation_saga_steps (id, saga_pk_, step_number, subgraph, mutation_type, typename, variables, state, result, started_at, completed_at, created_at, updated_at)
665 SELECT $1, fs.pk_, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
666 FROM tb_federation_sagas fs
667 WHERE fs.id = $2
668 ON CONFLICT (id) DO UPDATE SET state = $8, result = $9, completed_at = $11, updated_at = $13",
669 &[
670 &step.id,
671 &step.saga_id, &step_number,
673 &step.subgraph,
674 &mutation_type,
675 &step.typename,
676 &step.variables,
677 &state,
678 &step.result,
679 &step.started_at,
680 &step.completed_at,
681 &chrono::Utc::now(),
682 &now,
683 ],
684 )
685 .await?;
686
687 Ok(())
688 }
689
690 pub async fn update_saga_step_result(&self, step_id: Uuid, result: &Value) -> Result<()> {
696 let conn = self.pool.get().await?;
697 let now = chrono::Utc::now();
698
699 conn.execute(
700 "UPDATE tb_federation_saga_steps SET result = $1, updated_at = $2 WHERE id = $3",
701 &[&result, &now, &step_id],
702 )
703 .await?;
704
705 Ok(())
706 }
707
708 pub async fn mark_saga_for_recovery(&self, saga_id: Uuid, reason: &str) -> Result<()> {
717 let conn = self.pool.get().await?;
718 let recovery_id = Uuid::new_v4();
719 let now = chrono::Utc::now();
720
721 conn.execute(
723 "INSERT INTO tb_federation_saga_recovery (id, saga_pk_, recovery_type, attempted_at, attempt_count)
724 SELECT $1, fs.pk_, $3, $4, $5
725 FROM tb_federation_sagas fs
726 WHERE fs.id = $2",
727 &[&recovery_id, &saga_id, &reason, &now, &0i32],
728 )
729 .await?;
730
731 Ok(())
732 }
733
734 pub async fn find_pending_sagas(&self) -> Result<Vec<Saga>> {
740 self.load_sagas_by_state(&SagaState::Pending).await
741 }
742
743 pub async fn clear_recovery_record(&self, saga_id: Uuid) -> Result<()> {
751 let conn = self.pool.get().await?;
752 conn.execute(
753 "DELETE FROM tb_federation_saga_recovery
754 WHERE saga_pk_ = (SELECT pk_ FROM tb_federation_sagas WHERE id = $1)",
755 &[&saga_id],
756 )
757 .await?;
758 Ok(())
759 }
760
761 pub async fn delete_saga(&self, saga_id: Uuid) -> Result<()> {
770 let conn = self.pool.get().await?;
771 conn.execute("DELETE FROM tb_federation_sagas WHERE id = $1", &[&saga_id])
772 .await?;
773 Ok(())
774 }
775
776 pub async fn delete_completed_sagas(&self) -> Result<u64> {
786 let conn = self.pool.get().await?;
787 let result = conn
788 .execute(
789 "DELETE FROM tb_federation_sagas WHERE state IN ('completed', 'compensated')",
790 &[],
791 )
792 .await?;
793 Ok(result)
794 }
795
796 pub async fn cleanup_stale_sagas(&self, hours_threshold: i64) -> Result<u64> {
810 let conn = self.pool.get().await?;
811 let result = conn
812 .execute(
813 "DELETE FROM tb_federation_sagas WHERE created_at < NOW() - INTERVAL '1 hour' * $1 AND state IN ('completed', 'compensated')",
814 &[&hours_threshold],
815 )
816 .await?;
817 Ok(result)
818 }
819
820 pub async fn get_recovery_attempts(&self, saga_id: Uuid) -> Result<i32> {
828 let conn = self.pool.get().await?;
829 let row = conn
830 .query_opt(
831 "SELECT COALESCE(MAX(attempt_count), 0) FROM tb_federation_saga_recovery
832 WHERE saga_pk_ = (SELECT pk_ FROM tb_federation_sagas WHERE id = $1)",
833 &[&saga_id],
834 )
835 .await?;
836 Ok(row.map(|r| r.get(0)).unwrap_or(0))
837 }
838
839 pub async fn save_recovery_record(&self, recovery: &SagaRecovery) -> Result<()> {
847 let conn = self.pool.get().await?;
848
849 conn.execute(
851 "INSERT INTO tb_federation_saga_recovery (id, saga_pk_, recovery_type, attempted_at, last_attempt, attempt_count, last_error)
852 SELECT $1, fs.pk_, $3, $4, $5, $6, $7
853 FROM tb_federation_sagas fs
854 WHERE fs.id = $2",
855 &[
856 &recovery.id,
857 &recovery.saga_id, &recovery.recovery_type,
859 &recovery.attempted_at,
860 &recovery.last_attempt,
861 &recovery.attempt_count,
862 &recovery.last_error,
863 ],
864 )
865 .await?;
866
867 Ok(())
868 }
869
870 pub async fn cleanup_all(&self) -> Result<()> {
876 let conn = self.pool.get().await?;
877 conn.execute("DELETE FROM tb_federation_saga_recovery", &[]).await?;
878 conn.execute("DELETE FROM tb_federation_saga_steps", &[]).await?;
879 conn.execute("DELETE FROM tb_federation_sagas", &[]).await?;
880 Ok(())
881 }
882
883 pub async fn saga_count(&self) -> Result<i64> {
889 let conn = self.pool.get().await?;
890 let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_sagas", &[]).await?;
891 Ok(row.get(0))
892 }
893
894 pub async fn step_count(&self) -> Result<i64> {
900 let conn = self.pool.get().await?;
901 let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_saga_steps", &[]).await?;
902 Ok(row.get(0))
903 }
904
905 pub async fn recovery_count(&self) -> Result<i64> {
911 let conn = self.pool.get().await?;
912 let row = conn.query_one("SELECT COUNT(*) FROM tb_federation_saga_recovery", &[]).await?;
913 Ok(row.get(0))
914 }
915
916 pub async fn find_stuck_sagas(&self) -> Result<Vec<Saga>> {
922 self.load_sagas_by_state(&SagaState::Executing).await
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use super::*;
929
930 #[tokio::test]
931 #[ignore = "requires running PostgreSQL database"]
932 async fn test_postgres_connection() {
933 let store = PostgresSagaStore::new("postgresql://localhost/fraiseql_test")
934 .await
935 .expect("Failed to create store");
936 store.health_check().await.expect("Health check failed");
937 }
938}