Skip to main content

sayiir_postgres/
task_result_store.rs

1//! [`TaskResultStore`] implementation for Postgres.
2//!
3//! For non-terminal workflows, task results come from the current snapshot.
4//! For completed or failed workflows (where `completed_tasks` has been
5//! discarded), the implementation falls back to the most recent `InProgress`
6//! entry in `sayiir_workflow_snapshot_history`.
7
8use sayiir_core::codec::{self, Decoder, Encoder};
9use sayiir_core::snapshot::WorkflowSnapshot;
10use sayiir_persistence::{BackendError, SnapshotStore, TaskResultStore};
11use sqlx::Row;
12
13use crate::backend::PostgresBackend;
14use crate::error::PgError;
15
16impl<C> TaskResultStore for PostgresBackend<C>
17where
18    C: Encoder
19        + Decoder
20        + codec::sealed::EncodeValue<WorkflowSnapshot>
21        + codec::sealed::DecodeValue<WorkflowSnapshot>,
22{
23    #[tracing::instrument(
24        name = "db.load_task_result",
25        skip(self),
26        fields(db.system = "postgresql"),
27        err(level = tracing::Level::ERROR),
28    )]
29    async fn load_task_result(
30        &self,
31        instance_id: &str,
32        task_id: &str,
33    ) -> Result<Option<bytes::Bytes>, BackendError> {
34        let snapshot = self.load_snapshot(instance_id).await?;
35
36        // Non-terminal states carry completed_tasks directly.
37        if let Some(bytes) = snapshot.get_task_result_bytes(task_id) {
38            return Ok(Some(bytes));
39        }
40
41        // For terminal states, fall back to snapshot history.
42        if (snapshot.state.is_completed() || snapshot.state.is_failed())
43            && let Some(hist) = self.load_last_in_progress_snapshot(instance_id).await?
44        {
45            return Ok(hist.get_task_result_bytes(task_id));
46        }
47
48        Ok(None)
49    }
50}
51
52impl<C> PostgresBackend<C>
53where
54    C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
55{
56    /// Load the most recent `InProgress` snapshot from the history table.
57    ///
58    /// This is the last snapshot before the workflow transitioned to a terminal
59    /// state, so it still contains `completed_tasks`.
60    async fn load_last_in_progress_snapshot(
61        &self,
62        instance_id: &str,
63    ) -> Result<Option<WorkflowSnapshot>, BackendError> {
64        let row = sqlx::query(
65            "SELECT data FROM sayiir_workflow_snapshot_history
66             WHERE instance_id = $1 AND status = 'InProgress'
67             ORDER BY version DESC LIMIT 1",
68        )
69        .bind(instance_id)
70        .fetch_optional(&self.pool)
71        .await
72        .map_err(PgError)?;
73
74        match row {
75            Some(r) => {
76                let raw: &[u8] = r.get("data");
77                let snapshot = self.decode(raw)?;
78                Ok(Some(snapshot))
79            }
80            None => Ok(None),
81        }
82    }
83}