Skip to main content

tianshu_postgres/
case_store.rs

1// Copyright 2026 Desicool
2//
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use deadpool_postgres::Pool;
8use tokio_postgres::Row;
9use tracing::{debug, info};
10
11use tianshu::case::{Case, ExecutionState};
12use tianshu::store::CaseStore;
13
14pub struct PostgresCaseStore {
15    pool: Pool,
16}
17
18impl PostgresCaseStore {
19    pub fn new(pool: Pool) -> Self {
20        Self { pool }
21    }
22
23    fn row_to_case(row: &Row) -> Result<Case> {
24        let state_raw: String = row.get("execution_state");
25        let execution_state =
26            ExecutionState::from_str_lowercase(&state_raw).unwrap_or(ExecutionState::Running);
27
28        Ok(Case {
29            case_key: row.get("case_key"),
30            session_id: row.get("session_id"),
31            workflow_code: row.get("workflow_code"),
32            execution_state,
33            finished_type: row.get("finished_type"),
34            finished_description: row.get("finished_description"),
35            parent_key: row.get("parent_key"),
36            child_keys: row
37                .get::<_, Option<serde_json::Value>>("child_keys")
38                .and_then(|v| serde_json::from_value(v).ok())
39                .unwrap_or_default(),
40            lifecycle_state: row.get("lifecycle_state"),
41            processing_report: row
42                .get::<_, Option<serde_json::Value>>("processing_report")
43                .and_then(|v| v.as_array().cloned())
44                .unwrap_or_default(),
45            resource_data: row.get("resource_data"),
46            private_vars: row.get("private_vars"),
47            created_at: row.get("created_at"),
48            updated_at: row.get("updated_at"),
49        })
50    }
51}
52
53#[async_trait]
54impl CaseStore for PostgresCaseStore {
55    async fn upsert(&self, case: &Case) -> Result<()> {
56        let client = self.pool.get().await?;
57        debug!(
58            "Upserting case: case_key={}, state={:?}",
59            case.case_key, case.execution_state
60        );
61
62        client
63            .execute(
64                r#"
65                INSERT INTO wf_cases (
66                    case_key, session_id, workflow_code,
67                    execution_state, finished_type, finished_description,
68                    parent_key, child_keys, lifecycle_state,
69                    processing_report, resource_data, private_vars,
70                    created_at, updated_at
71                )
72                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
73                ON CONFLICT (case_key) DO UPDATE SET
74                    session_id         = EXCLUDED.session_id,
75                    workflow_code      = EXCLUDED.workflow_code,
76                    execution_state    = EXCLUDED.execution_state,
77                    finished_type      = EXCLUDED.finished_type,
78                    finished_description = EXCLUDED.finished_description,
79                    parent_key         = EXCLUDED.parent_key,
80                    child_keys         = EXCLUDED.child_keys,
81                    lifecycle_state    = EXCLUDED.lifecycle_state,
82                    processing_report  = EXCLUDED.processing_report,
83                    resource_data      = EXCLUDED.resource_data,
84                    private_vars       = EXCLUDED.private_vars,
85                    updated_at         = EXCLUDED.updated_at
86                "#,
87                &[
88                    &case.case_key,
89                    &case.session_id,
90                    &case.workflow_code,
91                    &case.execution_state.to_string(),
92                    &case.finished_type,
93                    &case.finished_description,
94                    &case.parent_key,
95                    &serde_json::to_value(&case.child_keys)?,
96                    &case.lifecycle_state,
97                    &serde_json::to_value(&case.processing_report)?,
98                    &case.resource_data,
99                    &case.private_vars,
100                    &case.created_at,
101                    &case.updated_at,
102                ],
103            )
104            .await?;
105
106        info!("Upserted case: case_key={}", case.case_key);
107        Ok(())
108    }
109
110    async fn get_by_key(&self, case_key: &str) -> Result<Option<Case>> {
111        let client = self.pool.get().await?;
112        debug!("Fetching case by key: {}", case_key);
113
114        let row_opt = client
115            .query_opt("SELECT * FROM wf_cases WHERE case_key = $1", &[&case_key])
116            .await?;
117
118        Ok(row_opt.map(|r| Self::row_to_case(&r)).transpose()?)
119    }
120
121    async fn get_by_session(&self, session_id: &str) -> Result<Vec<Case>> {
122        let client = self.pool.get().await?;
123        debug!("Fetching cases for session: {}", session_id);
124
125        let rows = client
126            .query(
127                "SELECT * FROM wf_cases WHERE session_id = $1 ORDER BY created_at ASC",
128                &[&session_id],
129            )
130            .await?;
131
132        rows.iter()
133            .map(Self::row_to_case)
134            .collect::<Result<Vec<_>>>()
135    }
136
137    async fn setup(&self) -> Result<()> {
138        let client = self.pool.get().await?;
139        client
140            .execute(
141                r#"
142                CREATE TABLE IF NOT EXISTS wf_cases (
143                    case_key            TEXT PRIMARY KEY,
144                    session_id          TEXT NOT NULL,
145                    workflow_code       TEXT NOT NULL,
146                    execution_state     TEXT NOT NULL DEFAULT 'running',
147                    finished_type       TEXT,
148                    finished_description TEXT,
149                    parent_key          TEXT,
150                    child_keys          JSONB NOT NULL DEFAULT '[]'::jsonb,
151                    lifecycle_state     TEXT NOT NULL DEFAULT 'normal',
152                    processing_report   JSONB NOT NULL DEFAULT '[]'::jsonb,
153                    resource_data       JSONB,
154                    private_vars        JSONB,
155                    created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
156                    updated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
157                )
158                "#,
159                &[],
160            )
161            .await?;
162        client
163            .execute(
164                "CREATE INDEX IF NOT EXISTS wf_cases_session_id_idx ON wf_cases (session_id)",
165                &[],
166            )
167            .await?;
168        info!("wf_cases table ready");
169        Ok(())
170    }
171}