Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the GraphStore trait and SQLite implementation.
4//! SQLite tables integrate with existing openfang-memory schema.
5
6use crate::node::{AinlMemoryNode, AinlNodeType};
7use rusqlite::OptionalExtension;
8use uuid::Uuid;
9
10/// Graph memory storage trait - swappable backends
11pub trait GraphStore {
12    /// Write a node to storage
13    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
14
15    /// Read a node by ID
16    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
17
18    /// Query episodes since a given timestamp
19    fn query_episodes_since(&self, since_timestamp: i64, limit: usize)
20        -> Result<Vec<AinlMemoryNode>, String>;
21
22    /// Find nodes by type
23    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
24
25    /// Walk edges from a node with a given label
26    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
27}
28
29/// SQLite implementation of GraphStore
30///
31/// Integrates with existing openfang-memory schema by adding two tables:
32/// - `ainl_graph_nodes`: stores node payloads
33/// - `ainl_graph_edges`: stores graph edges
34pub struct SqliteGraphStore {
35    conn: rusqlite::Connection,
36}
37
38impl SqliteGraphStore {
39    /// Ensure the AINL graph schema exists in the database
40    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
41        conn.execute(
42            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
43                id TEXT PRIMARY KEY,
44                node_type TEXT NOT NULL,
45                payload TEXT NOT NULL,
46                timestamp INTEGER NOT NULL
47            )",
48            [],
49        )?;
50
51        conn.execute(
52            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
53             ON ainl_graph_nodes(timestamp DESC)",
54            [],
55        )?;
56
57        conn.execute(
58            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
59             ON ainl_graph_nodes(node_type)",
60            [],
61        )?;
62
63        conn.execute(
64            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
65                from_id TEXT NOT NULL,
66                to_id TEXT NOT NULL,
67                label TEXT NOT NULL,
68                PRIMARY KEY (from_id, to_id, label)
69            )",
70            [],
71        )?;
72
73        conn.execute(
74            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
75             ON ainl_graph_edges(from_id, label)",
76            [],
77        )?;
78
79        Ok(())
80    }
81
82    /// Open/create a graph store at the given path
83    pub fn open(path: &std::path::Path) -> Result<Self, String> {
84        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
85        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
86        Ok(Self { conn })
87    }
88
89    /// Create from an existing connection (for integration with openfang-memory pool)
90    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
91        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
92        Ok(Self { conn })
93    }
94}
95
96impl GraphStore for SqliteGraphStore {
97    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
98        let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
99        let type_name = match &node.node_type {
100            AinlNodeType::Episode { .. } => "episode",
101            AinlNodeType::Semantic { .. } => "semantic",
102            AinlNodeType::Procedural { .. } => "procedural",
103            AinlNodeType::Persona { .. } => "persona",
104        };
105
106        // Extract timestamp from the node
107        let timestamp = match &node.node_type {
108            AinlNodeType::Episode { timestamp, .. } => *timestamp,
109            _ => chrono::Utc::now().timestamp(),
110        };
111
112        self.conn
113            .execute(
114                "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
115                 VALUES (?1, ?2, ?3, ?4)",
116                rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
117            )
118            .map_err(|e| e.to_string())?;
119
120        // Write edges
121        for edge in &node.edges {
122            self.conn
123                .execute(
124                    "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label)
125                     VALUES (?1, ?2, ?3)",
126                    rusqlite::params![
127                        node.id.to_string(),
128                        edge.target_id.to_string(),
129                        edge.label,
130                    ],
131                )
132                .map_err(|e| e.to_string())?;
133        }
134
135        Ok(())
136    }
137
138    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
139        let payload: Option<String> = self
140            .conn
141            .query_row(
142                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
143                [id.to_string()],
144                |row| row.get::<_, String>(0),
145            )
146            .optional()
147            .map_err(|e: rusqlite::Error| e.to_string())?;
148
149        match payload {
150            Some(p) => {
151                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
152                Ok(Some(node))
153            }
154            None => Ok(None),
155        }
156    }
157
158    fn query_episodes_since(
159        &self,
160        since_timestamp: i64,
161        limit: usize,
162    ) -> Result<Vec<AinlMemoryNode>, String> {
163        let mut stmt = self
164            .conn
165            .prepare(
166                "SELECT payload FROM ainl_graph_nodes
167                 WHERE node_type = 'episode' AND timestamp >= ?1
168                 ORDER BY timestamp DESC
169                 LIMIT ?2",
170            )
171            .map_err(|e| e.to_string())?;
172
173        let rows = stmt
174            .query_map([since_timestamp, limit as i64], |row| {
175                let payload: String = row.get(0)?;
176                Ok(payload)
177            })
178            .map_err(|e| e.to_string())?;
179
180        let mut nodes = Vec::new();
181        for row in rows {
182            let payload = row.map_err(|e| e.to_string())?;
183            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
184            nodes.push(node);
185        }
186
187        Ok(nodes)
188    }
189
190    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
191        let mut stmt = self
192            .conn
193            .prepare(
194                "SELECT payload FROM ainl_graph_nodes
195                 WHERE node_type = ?1
196                 ORDER BY timestamp DESC",
197            )
198            .map_err(|e| e.to_string())?;
199
200        let rows = stmt
201            .query_map([type_name], |row| {
202                let payload: String = row.get(0)?;
203                Ok(payload)
204            })
205            .map_err(|e| e.to_string())?;
206
207        let mut nodes = Vec::new();
208        for row in rows {
209            let payload = row.map_err(|e| e.to_string())?;
210            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
211            nodes.push(node);
212        }
213
214        Ok(nodes)
215    }
216
217    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
218        let mut stmt = self
219            .conn
220            .prepare(
221                "SELECT to_id FROM ainl_graph_edges
222                 WHERE from_id = ?1 AND label = ?2",
223            )
224            .map_err(|e| e.to_string())?;
225
226        let target_ids: Vec<String> = stmt
227            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
228            .map_err(|e| e.to_string())?
229            .collect::<Result<Vec<_>, _>>()
230            .map_err(|e| e.to_string())?;
231
232        let mut nodes = Vec::new();
233        for target_id in target_ids {
234            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
235            if let Some(node) = self.read_node(id)? {
236                nodes.push(node);
237            }
238        }
239
240        Ok(nodes)
241    }
242}