1use zeph_db::DbPool;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use crate::error::MemoryError;
15
16#[derive(Debug, Clone)]
18pub struct GraphSummary {
19 pub id: String,
20 pub goal: String,
21 pub status: String,
22 pub created_at: String,
23 pub finished_at: Option<String>,
24}
25
26pub trait RawGraphStore: Send + Sync {
31 #[allow(async_fn_in_trait)]
37 async fn save_graph(
38 &self,
39 id: &str,
40 goal: &str,
41 status: &str,
42 graph_json: &str,
43 created_at: &str,
44 finished_at: Option<&str>,
45 ) -> Result<(), MemoryError>;
46
47 #[allow(async_fn_in_trait)]
55 async fn load_graph(&self, id: &str) -> Result<Option<String>, MemoryError>;
56
57 #[allow(async_fn_in_trait)]
63 async fn list_graphs(&self, limit: u32) -> Result<Vec<GraphSummary>, MemoryError>;
64
65 #[allow(async_fn_in_trait)]
73 async fn delete_graph(&self, id: &str) -> Result<bool, MemoryError>;
74}
75
76#[derive(Debug, Clone)]
78pub struct TaskGraphStore {
79 pool: DbPool,
80}
81
82impl TaskGraphStore {
83 #[must_use]
85 pub fn new(pool: DbPool) -> Self {
86 Self { pool }
87 }
88}
89
90impl RawGraphStore for TaskGraphStore {
91 async fn save_graph(
92 &self,
93 id: &str,
94 goal: &str,
95 status: &str,
96 graph_json: &str,
97 created_at: &str,
98 finished_at: Option<&str>,
99 ) -> Result<(), MemoryError> {
100 zeph_db::query(sql!(
101 "INSERT INTO task_graphs (id, goal, status, graph_json, created_at, finished_at) \
102 VALUES (?, ?, ?, ?, ?, ?) \
103 ON CONFLICT(id) DO UPDATE SET \
104 goal = excluded.goal, \
105 status = excluded.status, \
106 graph_json = excluded.graph_json, \
107 created_at = excluded.created_at, \
108 finished_at = excluded.finished_at"
109 ))
110 .bind(id)
111 .bind(goal)
112 .bind(status)
113 .bind(graph_json)
114 .bind(created_at)
115 .bind(finished_at)
116 .execute(&self.pool)
117 .await
118 .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
119 Ok(())
120 }
121
122 async fn load_graph(&self, id: &str) -> Result<Option<String>, MemoryError> {
123 let row: Option<(String,)> =
124 zeph_db::query_as(sql!("SELECT graph_json FROM task_graphs WHERE id = ?"))
125 .bind(id)
126 .fetch_optional(&self.pool)
127 .await
128 .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
129 Ok(row.map(|(json,)| json))
130 }
131
132 async fn list_graphs(&self, limit: u32) -> Result<Vec<GraphSummary>, MemoryError> {
133 let rows: Vec<(String, String, String, String, Option<String>)> = zeph_db::query_as(sql!(
134 "SELECT id, goal, status, created_at, finished_at \
135 FROM task_graphs \
136 ORDER BY created_at DESC \
137 LIMIT ?"
138 ))
139 .bind(limit)
140 .fetch_all(&self.pool)
141 .await
142 .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
143
144 Ok(rows
145 .into_iter()
146 .map(|(id, goal, status, created_at, finished_at)| GraphSummary {
147 id,
148 goal,
149 status,
150 created_at,
151 finished_at,
152 })
153 .collect())
154 }
155
156 async fn delete_graph(&self, id: &str) -> Result<bool, MemoryError> {
157 let result = zeph_db::query(sql!("DELETE FROM task_graphs WHERE id = ?"))
158 .bind(id)
159 .execute(&self.pool)
160 .await
161 .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
162 Ok(result.rows_affected() > 0)
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::store::DbStore;
170
171 async fn make_store() -> TaskGraphStore {
172 let db = DbStore::new(":memory:").await.expect("DbStore");
173 TaskGraphStore::new(db.pool().clone())
174 }
175
176 #[tokio::test]
177 async fn test_save_and_load_roundtrip() {
178 let store = make_store().await;
179 store
180 .save_graph("id-1", "goal", "created", r#"{"key":"val"}"#, "100", None)
181 .await
182 .expect("save");
183 let loaded = store
184 .load_graph("id-1")
185 .await
186 .expect("load")
187 .expect("should exist");
188 assert_eq!(loaded, r#"{"key":"val"}"#);
189 }
190
191 #[tokio::test]
192 async fn test_load_nonexistent() {
193 let store = make_store().await;
194 let result = store.load_graph("missing-id").await.expect("load");
195 assert!(result.is_none());
196 }
197
198 #[tokio::test]
199 async fn test_list_graphs_ordering() {
200 let store = make_store().await;
201 store
202 .save_graph("id-1", "first", "created", "{}", "100", None)
203 .await
204 .expect("save 1");
205 store
206 .save_graph("id-2", "second", "created", "{}", "200", None)
207 .await
208 .expect("save 2");
209 let list = store.list_graphs(10).await.expect("list");
210 assert_eq!(list.len(), 2);
211 assert_eq!(list[0].id, "id-2");
213 assert_eq!(list[1].id, "id-1");
214 }
215
216 #[tokio::test]
217 async fn test_delete_graph() {
218 let store = make_store().await;
219 store
220 .save_graph("id-del", "goal", "created", "{}", "1", None)
221 .await
222 .expect("save");
223 let deleted = store.delete_graph("id-del").await.expect("delete");
224 assert!(deleted);
225 let loaded = store.load_graph("id-del").await.expect("load");
226 assert!(loaded.is_none());
227 }
228
229 #[tokio::test]
230 async fn test_save_overwrites_existing() {
231 let store = make_store().await;
232 store
233 .save_graph("id-1", "old", "created", r#"{"v":1}"#, "1", None)
234 .await
235 .expect("save 1");
236 store
237 .save_graph("id-1", "new", "running", r#"{"v":2}"#, "1", None)
238 .await
239 .expect("save 2 (upsert)");
240 let loaded = store
241 .load_graph("id-1")
242 .await
243 .expect("load")
244 .expect("exists");
245 assert_eq!(loaded, r#"{"v":2}"#);
246 }
247}