Skip to main content

zeph_memory/store/
graph_store.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Raw graph persistence trait and [`TaskGraphStore`] implementation.
5//!
6//! The trait operates on opaque JSON strings to avoid a dependency cycle
7//! (`zeph-core` → `zeph-memory` → `zeph-core`). `zeph-core` wraps this
8//! trait in `GraphPersistence<S>` which handles typed serialization.
9
10use zeph_db::DbPool;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use crate::error::MemoryError;
15
16/// Summary of a stored task graph (metadata only, no task details).
17#[derive(Debug, Clone)]
18pub struct GraphSummary {
19    pub id: String,
20    pub goal: String,
21    pub status: String,
22    pub created_at: String,
23    pub finished_at: Option<String>,
24}
25
26/// Raw persistence interface for task graphs.
27///
28/// All graph data is stored as a JSON blob. The orchestration layer in
29/// `zeph-core` is responsible for serializing/deserializing `TaskGraph`.
30pub trait RawGraphStore: Send + Sync {
31    /// Persist a graph (upsert by `id`).
32    ///
33    /// # Errors
34    ///
35    /// Returns a `MemoryError` on database failure.
36    #[allow(async_fn_in_trait)]
37    async fn save_graph(
38        &self,
39        id: &str,
40        goal: &str,
41        status: &str,
42        graph_json: &str,
43        created_at: &str,
44        finished_at: Option<&str>,
45    ) -> Result<(), MemoryError>;
46
47    /// Load a graph by its string UUID.
48    ///
49    /// Returns `None` if the graph does not exist.
50    ///
51    /// # Errors
52    ///
53    /// Returns a `MemoryError` on database failure.
54    #[allow(async_fn_in_trait)]
55    async fn load_graph(&self, id: &str) -> Result<Option<String>, MemoryError>;
56
57    /// List graphs ordered by `created_at` descending, limited to `limit` rows.
58    ///
59    /// # Errors
60    ///
61    /// Returns a `MemoryError` on database failure.
62    #[allow(async_fn_in_trait)]
63    async fn list_graphs(&self, limit: u32) -> Result<Vec<GraphSummary>, MemoryError>;
64
65    /// Delete a graph by its string UUID.
66    ///
67    /// Returns `true` if a row was deleted.
68    ///
69    /// # Errors
70    ///
71    /// Returns a `MemoryError` on database failure.
72    #[allow(async_fn_in_trait)]
73    async fn delete_graph(&self, id: &str) -> Result<bool, MemoryError>;
74}
75
76/// Database-backed implementation of [`RawGraphStore`].
77#[derive(Debug, Clone)]
78pub struct TaskGraphStore {
79    pool: DbPool,
80}
81
82impl TaskGraphStore {
83    /// Create a new [`TaskGraphStore`] backed by the given pool.
84    #[must_use]
85    pub fn new(pool: DbPool) -> Self {
86        Self { pool }
87    }
88}
89
90impl RawGraphStore for TaskGraphStore {
91    async fn save_graph(
92        &self,
93        id: &str,
94        goal: &str,
95        status: &str,
96        graph_json: &str,
97        created_at: &str,
98        finished_at: Option<&str>,
99    ) -> Result<(), MemoryError> {
100        zeph_db::query(sql!(
101            "INSERT INTO task_graphs (id, goal, status, graph_json, created_at, finished_at) \
102             VALUES (?, ?, ?, ?, ?, ?) \
103             ON CONFLICT(id) DO UPDATE SET \
104                 goal        = excluded.goal, \
105                 status      = excluded.status, \
106                 graph_json  = excluded.graph_json, \
107                 created_at  = excluded.created_at, \
108                 finished_at = excluded.finished_at"
109        ))
110        .bind(id)
111        .bind(goal)
112        .bind(status)
113        .bind(graph_json)
114        .bind(created_at)
115        .bind(finished_at)
116        .execute(&self.pool)
117        .await
118        .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
119        Ok(())
120    }
121
122    async fn load_graph(&self, id: &str) -> Result<Option<String>, MemoryError> {
123        let row: Option<(String,)> =
124            zeph_db::query_as(sql!("SELECT graph_json FROM task_graphs WHERE id = ?"))
125                .bind(id)
126                .fetch_optional(&self.pool)
127                .await
128                .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
129        Ok(row.map(|(json,)| json))
130    }
131
132    async fn list_graphs(&self, limit: u32) -> Result<Vec<GraphSummary>, MemoryError> {
133        let rows: Vec<(String, String, String, String, Option<String>)> = zeph_db::query_as(sql!(
134            "SELECT id, goal, status, created_at, finished_at \
135             FROM task_graphs \
136             ORDER BY created_at DESC \
137             LIMIT ?"
138        ))
139        .bind(limit)
140        .fetch_all(&self.pool)
141        .await
142        .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
143
144        Ok(rows
145            .into_iter()
146            .map(|(id, goal, status, created_at, finished_at)| GraphSummary {
147                id,
148                goal,
149                status,
150                created_at,
151                finished_at,
152            })
153            .collect())
154    }
155
156    async fn delete_graph(&self, id: &str) -> Result<bool, MemoryError> {
157        let result = zeph_db::query(sql!("DELETE FROM task_graphs WHERE id = ?"))
158            .bind(id)
159            .execute(&self.pool)
160            .await
161            .map_err(|e| MemoryError::GraphStore(e.to_string()))?;
162        Ok(result.rows_affected() > 0)
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::store::DbStore;
170
171    async fn make_store() -> TaskGraphStore {
172        let db = DbStore::new(":memory:").await.expect("DbStore");
173        TaskGraphStore::new(db.pool().clone())
174    }
175
176    #[tokio::test]
177    async fn test_save_and_load_roundtrip() {
178        let store = make_store().await;
179        store
180            .save_graph("id-1", "goal", "created", r#"{"key":"val"}"#, "100", None)
181            .await
182            .expect("save");
183        let loaded = store
184            .load_graph("id-1")
185            .await
186            .expect("load")
187            .expect("should exist");
188        assert_eq!(loaded, r#"{"key":"val"}"#);
189    }
190
191    #[tokio::test]
192    async fn test_load_nonexistent() {
193        let store = make_store().await;
194        let result = store.load_graph("missing-id").await.expect("load");
195        assert!(result.is_none());
196    }
197
198    #[tokio::test]
199    async fn test_list_graphs_ordering() {
200        let store = make_store().await;
201        store
202            .save_graph("id-1", "first", "created", "{}", "100", None)
203            .await
204            .expect("save 1");
205        store
206            .save_graph("id-2", "second", "created", "{}", "200", None)
207            .await
208            .expect("save 2");
209        let list = store.list_graphs(10).await.expect("list");
210        assert_eq!(list.len(), 2);
211        // Ordered by created_at DESC: id-2 (200) before id-1 (100)
212        assert_eq!(list[0].id, "id-2");
213        assert_eq!(list[1].id, "id-1");
214    }
215
216    #[tokio::test]
217    async fn test_delete_graph() {
218        let store = make_store().await;
219        store
220            .save_graph("id-del", "goal", "created", "{}", "1", None)
221            .await
222            .expect("save");
223        let deleted = store.delete_graph("id-del").await.expect("delete");
224        assert!(deleted);
225        let loaded = store.load_graph("id-del").await.expect("load");
226        assert!(loaded.is_none());
227    }
228
229    #[tokio::test]
230    async fn test_save_overwrites_existing() {
231        let store = make_store().await;
232        store
233            .save_graph("id-1", "old", "created", r#"{"v":1}"#, "1", None)
234            .await
235            .expect("save 1");
236        store
237            .save_graph("id-1", "new", "running", r#"{"v":2}"#, "1", None)
238            .await
239            .expect("save 2 (upsert)");
240        let loaded = store
241            .load_graph("id-1")
242            .await
243            .expect("load")
244            .expect("exists");
245        assert_eq!(loaded, r#"{"v":2}"#);
246    }
247}