1use anyhow::Result;
6use async_trait::async_trait;
7use deadpool_postgres::Pool;
8use tokio_postgres::Row;
9use tracing::{debug, info};
10
11use tianshu::case::{Case, ExecutionState};
12use tianshu::store::CaseStore;
13
14pub struct PostgresCaseStore {
15 pool: Pool,
16}
17
18impl PostgresCaseStore {
19 pub fn new(pool: Pool) -> Self {
20 Self { pool }
21 }
22
23 fn row_to_case(row: &Row) -> Result<Case> {
24 let state_raw: String = row.get("execution_state");
25 let execution_state =
26 ExecutionState::from_str_lowercase(&state_raw).unwrap_or(ExecutionState::Running);
27
28 Ok(Case {
29 case_key: row.get("case_key"),
30 session_id: row.get("session_id"),
31 workflow_code: row.get("workflow_code"),
32 execution_state,
33 finished_type: row.get("finished_type"),
34 finished_description: row.get("finished_description"),
35 parent_key: row.get("parent_key"),
36 child_keys: row
37 .get::<_, Option<serde_json::Value>>("child_keys")
38 .and_then(|v| serde_json::from_value(v).ok())
39 .unwrap_or_default(),
40 lifecycle_state: row.get("lifecycle_state"),
41 processing_report: row
42 .get::<_, Option<serde_json::Value>>("processing_report")
43 .and_then(|v| v.as_array().cloned())
44 .unwrap_or_default(),
45 resource_data: row.get("resource_data"),
46 private_vars: row.get("private_vars"),
47 created_at: row.get("created_at"),
48 updated_at: row.get("updated_at"),
49 })
50 }
51}
52
53#[async_trait]
54impl CaseStore for PostgresCaseStore {
55 async fn upsert(&self, case: &Case) -> Result<()> {
56 let client = self.pool.get().await?;
57 debug!(
58 "Upserting case: case_key={}, state={:?}",
59 case.case_key, case.execution_state
60 );
61
62 client
63 .execute(
64 r#"
65 INSERT INTO wf_cases (
66 case_key, session_id, workflow_code,
67 execution_state, finished_type, finished_description,
68 parent_key, child_keys, lifecycle_state,
69 processing_report, resource_data, private_vars,
70 created_at, updated_at
71 )
72 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
73 ON CONFLICT (case_key) DO UPDATE SET
74 session_id = EXCLUDED.session_id,
75 workflow_code = EXCLUDED.workflow_code,
76 execution_state = EXCLUDED.execution_state,
77 finished_type = EXCLUDED.finished_type,
78 finished_description = EXCLUDED.finished_description,
79 parent_key = EXCLUDED.parent_key,
80 child_keys = EXCLUDED.child_keys,
81 lifecycle_state = EXCLUDED.lifecycle_state,
82 processing_report = EXCLUDED.processing_report,
83 resource_data = EXCLUDED.resource_data,
84 private_vars = EXCLUDED.private_vars,
85 updated_at = EXCLUDED.updated_at
86 "#,
87 &[
88 &case.case_key,
89 &case.session_id,
90 &case.workflow_code,
91 &case.execution_state.to_string(),
92 &case.finished_type,
93 &case.finished_description,
94 &case.parent_key,
95 &serde_json::to_value(&case.child_keys)?,
96 &case.lifecycle_state,
97 &serde_json::to_value(&case.processing_report)?,
98 &case.resource_data,
99 &case.private_vars,
100 &case.created_at,
101 &case.updated_at,
102 ],
103 )
104 .await?;
105
106 info!("Upserted case: case_key={}", case.case_key);
107 Ok(())
108 }
109
110 async fn get_by_key(&self, case_key: &str) -> Result<Option<Case>> {
111 let client = self.pool.get().await?;
112 debug!("Fetching case by key: {}", case_key);
113
114 let row_opt = client
115 .query_opt("SELECT * FROM wf_cases WHERE case_key = $1", &[&case_key])
116 .await?;
117
118 Ok(row_opt.map(|r| Self::row_to_case(&r)).transpose()?)
119 }
120
121 async fn get_by_session(&self, session_id: &str) -> Result<Vec<Case>> {
122 let client = self.pool.get().await?;
123 debug!("Fetching cases for session: {}", session_id);
124
125 let rows = client
126 .query(
127 "SELECT * FROM wf_cases WHERE session_id = $1 ORDER BY created_at ASC",
128 &[&session_id],
129 )
130 .await?;
131
132 rows.iter()
133 .map(Self::row_to_case)
134 .collect::<Result<Vec<_>>>()
135 }
136
137 async fn setup(&self) -> Result<()> {
138 let client = self.pool.get().await?;
139 client
140 .execute(
141 r#"
142 CREATE TABLE IF NOT EXISTS wf_cases (
143 case_key TEXT PRIMARY KEY,
144 session_id TEXT NOT NULL,
145 workflow_code TEXT NOT NULL,
146 execution_state TEXT NOT NULL DEFAULT 'running',
147 finished_type TEXT,
148 finished_description TEXT,
149 parent_key TEXT,
150 child_keys JSONB NOT NULL DEFAULT '[]'::jsonb,
151 lifecycle_state TEXT NOT NULL DEFAULT 'normal',
152 processing_report JSONB NOT NULL DEFAULT '[]'::jsonb,
153 resource_data JSONB,
154 private_vars JSONB,
155 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
156 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
157 )
158 "#,
159 &[],
160 )
161 .await?;
162 client
163 .execute(
164 "CREATE INDEX IF NOT EXISTS wf_cases_session_id_idx ON wf_cases (session_id)",
165 &[],
166 )
167 .await?;
168 info!("wf_cases table ready");
169 Ok(())
170 }
171}