sayiir_postgres/
task_result_store.rs1use sayiir_core::codec::{self, Decoder, Encoder};
9use sayiir_core::snapshot::WorkflowSnapshot;
10use sayiir_persistence::{BackendError, SnapshotStore, TaskResultStore};
11use sqlx::Row;
12
13use crate::backend::PostgresBackend;
14use crate::error::PgError;
15
16impl<C> TaskResultStore for PostgresBackend<C>
17where
18 C: Encoder
19 + Decoder
20 + codec::sealed::EncodeValue<WorkflowSnapshot>
21 + codec::sealed::DecodeValue<WorkflowSnapshot>,
22{
23 #[tracing::instrument(
24 name = "db.load_task_result",
25 skip(self),
26 fields(db.system = "postgresql"),
27 err(level = tracing::Level::ERROR),
28 )]
29 async fn load_task_result(
30 &self,
31 instance_id: &str,
32 task_id: &str,
33 ) -> Result<Option<bytes::Bytes>, BackendError> {
34 let snapshot = self.load_snapshot(instance_id).await?;
35
36 if let Some(bytes) = snapshot.get_task_result_bytes(task_id) {
38 return Ok(Some(bytes));
39 }
40
41 if (snapshot.state.is_completed() || snapshot.state.is_failed())
43 && let Some(hist) = self.load_last_in_progress_snapshot(instance_id).await?
44 {
45 return Ok(hist.get_task_result_bytes(task_id));
46 }
47
48 Ok(None)
49 }
50}
51
52impl<C> PostgresBackend<C>
53where
54 C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
55{
56 async fn load_last_in_progress_snapshot(
61 &self,
62 instance_id: &str,
63 ) -> Result<Option<WorkflowSnapshot>, BackendError> {
64 let row = sqlx::query(
65 "SELECT data FROM sayiir_workflow_snapshot_history
66 WHERE instance_id = $1 AND status = 'InProgress'
67 ORDER BY version DESC LIMIT 1",
68 )
69 .bind(instance_id)
70 .fetch_optional(&self.pool)
71 .await
72 .map_err(PgError)?;
73
74 match row {
75 Some(r) => {
76 let raw: &[u8] = r.get("data");
77 let snapshot = self.decode(raw)?;
78 Ok(Some(snapshot))
79 }
80 None => Ok(None),
81 }
82 }
83}