Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the GraphStore trait and SQLite implementation.
4//! SQLite tables integrate with existing openfang-memory schema.
5
6use crate::node::{AinlMemoryNode, AinlNodeType};
7use rusqlite::OptionalExtension;
8use uuid::Uuid;
9
10/// Graph memory storage trait - swappable backends
11pub trait GraphStore {
12    /// Write a node to storage
13    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
14
15    /// Read a node by ID
16    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
17
18    /// Query episodes since a given timestamp
19    fn query_episodes_since(
20        &self,
21        since_timestamp: i64,
22        limit: usize,
23    ) -> Result<Vec<AinlMemoryNode>, String>;
24
25    /// Find nodes by type
26    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
27
28    /// Walk edges from a node with a given label
29    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
30}
31
32/// SQLite implementation of GraphStore
33///
34/// Integrates with existing openfang-memory schema by adding two tables:
35/// - `ainl_graph_nodes`: stores node payloads
36/// - `ainl_graph_edges`: stores graph edges
37pub struct SqliteGraphStore {
38    conn: rusqlite::Connection,
39}
40
41impl SqliteGraphStore {
42    /// Ensure the AINL graph schema exists in the database
43    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
44        conn.execute(
45            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
46                id TEXT PRIMARY KEY,
47                node_type TEXT NOT NULL,
48                payload TEXT NOT NULL,
49                timestamp INTEGER NOT NULL
50            )",
51            [],
52        )?;
53
54        conn.execute(
55            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
56             ON ainl_graph_nodes(timestamp DESC)",
57            [],
58        )?;
59
60        conn.execute(
61            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
62             ON ainl_graph_nodes(node_type)",
63            [],
64        )?;
65
66        conn.execute(
67            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
68                from_id TEXT NOT NULL,
69                to_id TEXT NOT NULL,
70                label TEXT NOT NULL,
71                PRIMARY KEY (from_id, to_id, label)
72            )",
73            [],
74        )?;
75
76        conn.execute(
77            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
78             ON ainl_graph_edges(from_id, label)",
79            [],
80        )?;
81
82        Ok(())
83    }
84
85    /// Open/create a graph store at the given path
86    pub fn open(path: &std::path::Path) -> Result<Self, String> {
87        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
88        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
89        Ok(Self { conn })
90    }
91
92    /// Create from an existing connection (for integration with openfang-memory pool)
93    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
94        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
95        Ok(Self { conn })
96    }
97
98    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
99    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
100        self.conn
101            .execute(
102                "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label)
103                 VALUES (?1, ?2, ?3)",
104                rusqlite::params![from_id.to_string(), to_id.to_string(), label],
105            )
106            .map_err(|e| e.to_string())?;
107        Ok(())
108    }
109
110    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
111    pub fn query_nodes_by_type_since(
112        &self,
113        node_type: &str,
114        since_timestamp: i64,
115        limit: usize,
116    ) -> Result<Vec<AinlMemoryNode>, String> {
117        let mut stmt = self
118            .conn
119            .prepare(
120                "SELECT payload FROM ainl_graph_nodes
121                 WHERE node_type = ?1 AND timestamp >= ?2
122                 ORDER BY timestamp DESC
123                 LIMIT ?3",
124            )
125            .map_err(|e| e.to_string())?;
126
127        let rows = stmt
128            .query_map(
129                rusqlite::params![node_type, since_timestamp, limit as i64],
130                |row| {
131                    let payload: String = row.get(0)?;
132                    Ok(payload)
133                },
134            )
135            .map_err(|e| e.to_string())?;
136
137        let mut nodes = Vec::new();
138        for row in rows {
139            let payload = row.map_err(|e| e.to_string())?;
140            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
141            nodes.push(node);
142        }
143
144        Ok(nodes)
145    }
146}
147
148impl GraphStore for SqliteGraphStore {
149    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
150    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
151    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
152        let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
153        let type_name = match &node.node_type {
154            AinlNodeType::Episode { .. } => "episode",
155            AinlNodeType::Semantic { .. } => "semantic",
156            AinlNodeType::Procedural { .. } => "procedural",
157            AinlNodeType::Persona { .. } => "persona",
158        };
159
160        // Extract timestamp from the node
161        let timestamp = match &node.node_type {
162            AinlNodeType::Episode { episodic } => episodic.timestamp,
163            _ => chrono::Utc::now().timestamp(),
164        };
165
166        self.conn
167            .execute(
168                "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
169                 VALUES (?1, ?2, ?3, ?4)",
170                rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
171            )
172            .map_err(|e| e.to_string())?;
173
174        // Write edges
175        for edge in &node.edges {
176            self.conn
177                .execute(
178                    "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label)
179                     VALUES (?1, ?2, ?3)",
180                    rusqlite::params![node.id.to_string(), edge.target_id.to_string(), edge.label,],
181                )
182                .map_err(|e| e.to_string())?;
183        }
184
185        Ok(())
186    }
187
188    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
189        let payload: Option<String> = self
190            .conn
191            .query_row(
192                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
193                [id.to_string()],
194                |row| row.get::<_, String>(0),
195            )
196            .optional()
197            .map_err(|e: rusqlite::Error| e.to_string())?;
198
199        match payload {
200            Some(p) => {
201                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
202                Ok(Some(node))
203            }
204            None => Ok(None),
205        }
206    }
207
208    fn query_episodes_since(
209        &self,
210        since_timestamp: i64,
211        limit: usize,
212    ) -> Result<Vec<AinlMemoryNode>, String> {
213        let mut stmt = self
214            .conn
215            .prepare(
216                "SELECT payload FROM ainl_graph_nodes
217                 WHERE node_type = 'episode' AND timestamp >= ?1
218                 ORDER BY timestamp DESC
219                 LIMIT ?2",
220            )
221            .map_err(|e| e.to_string())?;
222
223        let rows = stmt
224            .query_map([since_timestamp, limit as i64], |row| {
225                let payload: String = row.get(0)?;
226                Ok(payload)
227            })
228            .map_err(|e| e.to_string())?;
229
230        let mut nodes = Vec::new();
231        for row in rows {
232            let payload = row.map_err(|e| e.to_string())?;
233            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
234            nodes.push(node);
235        }
236
237        Ok(nodes)
238    }
239
240    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
241        let mut stmt = self
242            .conn
243            .prepare(
244                "SELECT payload FROM ainl_graph_nodes
245                 WHERE node_type = ?1
246                 ORDER BY timestamp DESC",
247            )
248            .map_err(|e| e.to_string())?;
249
250        let rows = stmt
251            .query_map([type_name], |row| {
252                let payload: String = row.get(0)?;
253                Ok(payload)
254            })
255            .map_err(|e| e.to_string())?;
256
257        let mut nodes = Vec::new();
258        for row in rows {
259            let payload = row.map_err(|e| e.to_string())?;
260            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
261            nodes.push(node);
262        }
263
264        Ok(nodes)
265    }
266
267    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
268        let mut stmt = self
269            .conn
270            .prepare(
271                "SELECT to_id FROM ainl_graph_edges
272                 WHERE from_id = ?1 AND label = ?2",
273            )
274            .map_err(|e| e.to_string())?;
275
276        let target_ids: Vec<String> = stmt
277            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
278            .map_err(|e| e.to_string())?
279            .collect::<Result<Vec<_>, _>>()
280            .map_err(|e| e.to_string())?;
281
282        let mut nodes = Vec::new();
283        for target_id in target_ids {
284            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
285            if let Some(node) = self.read_node(id)? {
286                nodes.push(node);
287            }
288        }
289
290        Ok(nodes)
291    }
292}