1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13 GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, PolicyEntry,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18 conn: Arc<Mutex<Connection>>,
19 instance_id: String,
20 graph_store: KnowledgeGraphStore,
21}
22
23impl Persistence {
24 pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
26 Self::with_instance_id(db_path, generate_instance_id())
27 }
28
29 pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
31 let db_path = expand_tilde(db_path.as_ref())?;
32 if let Some(dir) = db_path.parent() {
33 std::fs::create_dir_all(dir).context("creating DB directory")?;
34 }
35 let conn = Connection::open(&db_path).context("opening DuckDB")?;
36 migrations::run(&conn).context("running migrations")?;
37 let conn_arc = Arc::new(Mutex::new(conn));
38 let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
39 Ok(Self {
40 conn: conn_arc,
41 instance_id,
42 graph_store,
43 })
44 }
45
46 pub fn instance_id(&self) -> &str {
48 &self.instance_id
49 }
50
51 pub fn graph_store(&self) -> &KnowledgeGraphStore {
53 &self.graph_store
54 }
55
56 pub fn checkpoint(&self) -> Result<()> {
59 let conn = self.conn();
60 conn.execute_batch("CHECKPOINT;")
61 .context("checkpointing database")
62 }
63
64 pub fn new_default() -> Result<Self> {
66 let base = BaseDirs::new().context("base directories not available")?;
67 let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
68 Self::new(path)
69 }
70
71 pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
74 self.conn
75 .lock()
76 .expect("database connection mutex poisoned")
77 }
78
79 pub fn insert_message(
82 &self,
83 session_id: &str,
84 role: MessageRole,
85 content: &str,
86 ) -> Result<i64> {
87 let conn = self.conn();
88 let mut stmt = conn.prepare(
89 "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
90 )?;
91 let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
92 row.get(0)
93 })?;
94 Ok(id)
95 }
96
97 pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
98 let conn = self.conn();
99 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
100 let mut rows = stmt.query(params![session_id, limit])?;
101 let mut out = Vec::new();
102 while let Some(row) = rows.next()? {
103 let id: i64 = row.get(0)?;
104 let sid: String = row.get(1)?;
105 let role: String = row.get(2)?;
106 let content: String = row.get(3)?;
107 let created_at: String = row.get(4)?; let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
109 out.push(Message {
110 id,
111 session_id: sid,
112 role: MessageRole::from_str(&role),
113 content,
114 created_at,
115 });
116 }
117 out.reverse();
118 Ok(out)
119 }
120
121 pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
122 let conn = self.conn();
123 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
124 let mut rows = stmt.query(params![message_id])?;
125 if let Some(row) = rows.next()? {
126 let id: i64 = row.get(0)?;
127 let sid: String = row.get(1)?;
128 let role: String = row.get(2)?;
129 let content: String = row.get(3)?;
130 let created_at: String = row.get(4)?;
131 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
132 Ok(Some(Message {
133 id,
134 session_id: sid,
135 role: MessageRole::from_str(&role),
136 content,
137 created_at,
138 }))
139 } else {
140 Ok(None)
141 }
142 }
143
144 pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
146 let conn = self.conn();
147 let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
148 let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
149 Ok(changed)
150 }
151
152 pub fn insert_memory_vector(
155 &self,
156 session_id: &str,
157 message_id: Option<i64>,
158 embedding: &[f32],
159 ) -> Result<i64> {
160 let conn = self.conn();
161 let embedding_json = serde_json::to_string(embedding)?;
162 let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
163 let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
164 row.get(0)
165 })?;
166 Ok(id)
167 }
168
169 pub fn recall_top_k(
170 &self,
171 session_id: &str,
172 query_embedding: &[f32],
173 k: usize,
174 ) -> Result<Vec<(MemoryVector, f32)>> {
175 let conn = self.conn();
176 let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
177 let mut rows = stmt.query(params![session_id])?;
178 let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
179 while let Some(row) = rows.next()? {
180 let id: i64 = row.get(0)?;
181 let sid: String = row.get(1)?;
182 let message_id: Option<i64> = row.get(2)?;
183 let embedding_text: String = row.get(3)?;
184 let created_at: String = row.get(4)?;
185 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
186 let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
187 let score = cosine_similarity(query_embedding, &embedding);
188 scored.push((
189 MemoryVector {
190 id,
191 session_id: sid,
192 message_id,
193 embedding,
194 created_at,
195 },
196 score,
197 ));
198 }
199 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
200 scored.truncate(k);
201 Ok(scored)
202 }
203
204 pub fn list_sessions(&self) -> Result<Vec<String>> {
206 let conn = self.conn();
207 let mut stmt = conn.prepare(
208 "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
209 )?;
210 let mut rows = stmt.query([])?;
211 let mut out = Vec::new();
212 while let Some(row) = rows.next()? {
213 let sid: String = row.get(0)?;
214 out.push(sid);
215 }
216 Ok(out)
217 }
218
219 pub fn log_tool(
222 &self,
223 session_id: &str,
224 agent_name: &str,
225 run_id: &str,
226 tool_name: &str,
227 arguments: &JsonValue,
228 result: &JsonValue,
229 success: bool,
230 error: Option<&str>,
231 ) -> Result<i64> {
232 let conn = self.conn();
233 let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
234 let id: i64 = stmt.query_row(
235 params![
236 session_id,
237 agent_name,
238 run_id,
239 tool_name,
240 arguments.to_string(),
241 result.to_string(),
242 success,
243 error.unwrap_or("")
244 ],
245 |row| row.get(0),
246 )?;
247 Ok(id)
248 }
249
250 pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
253 let conn = self.conn();
254 conn.execute_batch("BEGIN TRANSACTION;")?;
256 {
257 let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
258 let _ = del.execute(params![key])?;
259 let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
260 let _ = ins.execute(params![key, value.to_string()])?;
261 }
262 conn.execute_batch("COMMIT;")?;
263 Ok(())
264 }
265
266 pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
267 let conn = self.conn();
268 let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
269 let mut rows = stmt.query(params![key])?;
270 if let Some(row) = rows.next()? {
271 let key: String = row.get(0)?;
272 let value_text: String = row.get(1)?;
273 let updated_at: String = row.get(2)?;
274 let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
275 let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
276 Ok(Some(PolicyEntry {
277 key,
278 value,
279 updated_at,
280 }))
281 } else {
282 Ok(None)
283 }
284 }
285}
286
287fn generate_instance_id() -> String {
288 let hostname = hostname::get()
289 .ok()
290 .and_then(|h| h.into_string().ok())
291 .unwrap_or_else(|| "unknown".to_string());
292 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
293 format!("{}-{}", hostname, uuid)
294}
295
296fn expand_tilde(path: &Path) -> Result<PathBuf> {
297 let path_str = path.to_string_lossy();
298 if path_str == "~" {
299 let base = BaseDirs::new().context("base directories not available")?;
300 Ok(base.home_dir().to_path_buf())
301 } else if let Some(stripped) = path_str.strip_prefix("~/") {
302 let base = BaseDirs::new().context("base directories not available")?;
303 Ok(base.home_dir().join(stripped))
304 } else {
305 Ok(path.to_path_buf())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use std::path::Path;
313
314 #[test]
315 fn expands_home_directory_prefix() {
316 let base = BaseDirs::new().expect("home directory available");
317 let expected = base.home_dir().join("demo.db");
318 let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
319 assert_eq!(result, expected);
320 }
321
322 #[test]
323 fn leaves_regular_paths_unchanged() {
324 let input = Path::new("relative/path.db");
325 let result = expand_tilde(input).expect("path expansion succeeds");
326 assert_eq!(result, input);
327 }
328}
329
330fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
331 if a.is_empty() || b.is_empty() || a.len() != b.len() {
332 return 0.0;
333 }
334 let mut dot = 0.0f32;
335 let mut na = 0.0f32;
336 let mut nb = 0.0f32;
337 for i in 0..a.len() {
338 dot += a[i] * b[i];
339 na += a[i] * a[i];
340 nb += b[i] * b[i];
341 }
342 if na == 0.0 || nb == 0.0 {
343 return 0.0;
344 }
345 dot / (na.sqrt() * nb.sqrt())
346}
347
348fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
353 GraphNode {
354 id: node.id,
355 session_id: node.session_id,
356 node_type: node.node_type,
357 label: node.label,
358 properties: node.properties,
359 embedding_id: node.embedding_id,
360 created_at: node.created_at,
361 updated_at: node.updated_at,
362 }
363}
364
365fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
366 GraphEdge {
367 id: edge.id,
368 session_id: edge.session_id,
369 source_id: edge.source_id,
370 target_id: edge.target_id,
371 edge_type: edge.edge_type,
372 predicate: edge.predicate,
373 properties: edge.properties,
374 weight: edge.weight,
375 temporal_start: edge.temporal_start,
376 temporal_end: edge.temporal_end,
377 created_at: edge.created_at,
378 }
379}
380
381fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
382 GraphPath {
383 nodes: path.nodes.into_iter().map(from_kg_node).collect(),
384 edges: path.edges.into_iter().map(from_kg_edge).collect(),
385 length: path.length,
386 weight: path.weight,
387 }
388}
389
390impl Persistence {
391 pub fn insert_graph_node(
394 &self,
395 session_id: &str,
396 node_type: spec_ai_knowledge_graph::NodeType,
397 label: &str,
398 properties: &JsonValue,
399 embedding_id: Option<i64>,
400 ) -> Result<i64> {
401 self.graph_store
402 .insert_graph_node(session_id, node_type, label, properties, embedding_id)
403 }
404
405 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
406 self.graph_store
407 .get_graph_node(node_id)
408 .map(|opt| opt.map(from_kg_node))
409 }
410
411 pub fn list_graph_nodes(
412 &self,
413 session_id: &str,
414 node_type: Option<spec_ai_knowledge_graph::NodeType>,
415 limit: Option<i64>,
416 ) -> Result<Vec<GraphNode>> {
417 self.graph_store
418 .list_graph_nodes(session_id, node_type, limit)
419 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
420 }
421
422 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
423 self.graph_store.count_graph_nodes(session_id)
424 }
425
426 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
427 self.graph_store.update_graph_node(node_id, properties)
428 }
429
430 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
431 self.graph_store.delete_graph_node(node_id)
432 }
433
434 pub fn insert_graph_edge(
437 &self,
438 session_id: &str,
439 source_id: i64,
440 target_id: i64,
441 edge_type: spec_ai_knowledge_graph::EdgeType,
442 predicate: Option<&str>,
443 properties: Option<&JsonValue>,
444 weight: f32,
445 ) -> Result<i64> {
446 self.graph_store.insert_graph_edge(
447 session_id, source_id, target_id, edge_type, predicate, properties, weight,
448 )
449 }
450
451 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
452 self.graph_store
453 .get_graph_edge(edge_id)
454 .map(|opt| opt.map(from_kg_edge))
455 }
456
457 pub fn list_graph_edges(
458 &self,
459 session_id: &str,
460 source_id: Option<i64>,
461 target_id: Option<i64>,
462 ) -> Result<Vec<GraphEdge>> {
463 self.graph_store
464 .list_graph_edges(session_id, source_id, target_id)
465 .map(|edges| edges.into_iter().map(from_kg_edge).collect())
466 }
467
468 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
469 self.graph_store.count_graph_edges(session_id)
470 }
471
472 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
473 self.graph_store.delete_graph_edge(edge_id)
474 }
475
476 pub fn find_shortest_path(
479 &self,
480 session_id: &str,
481 source_id: i64,
482 target_id: i64,
483 max_hops: Option<usize>,
484 ) -> Result<Option<GraphPath>> {
485 self.graph_store
486 .find_shortest_path(session_id, source_id, target_id, max_hops)
487 .map(|opt| opt.map(from_kg_path))
488 }
489
490 pub fn traverse_neighbors(
491 &self,
492 session_id: &str,
493 node_id: i64,
494 direction: spec_ai_knowledge_graph::TraversalDirection,
495 depth: usize,
496 ) -> Result<Vec<GraphNode>> {
497 self.graph_store
498 .traverse_neighbors(session_id, node_id, direction, depth)
499 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
500 }
501
502 pub fn insert_transcription(
505 &self,
506 session_id: &str,
507 chunk_id: i64,
508 text: &str,
509 timestamp: chrono::DateTime<Utc>,
510 ) -> Result<i64> {
511 let conn = self.conn();
512 let mut stmt = conn.prepare(
513 "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
514 )?;
515 let id: i64 = stmt.query_row(
516 params![session_id, chunk_id, text, timestamp.to_rfc3339()],
517 |row| row.get(0),
518 )?;
519 Ok(id)
520 }
521
522 pub fn update_transcription_embedding(
523 &self,
524 transcription_id: i64,
525 embedding_id: i64,
526 ) -> Result<()> {
527 let conn = self.conn();
528 conn.execute(
529 "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
530 params![embedding_id, transcription_id],
531 )?;
532 Ok(())
533 }
534
535 pub fn list_transcriptions(
536 &self,
537 session_id: &str,
538 limit: Option<i64>,
539 ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
540 let conn = self.conn();
541 let query = if let Some(lim) = limit {
542 format!(
543 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
544 lim
545 )
546 } else {
547 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
548 };
549
550 let mut stmt = conn.prepare(&query)?;
551 let mut rows = stmt.query(params![session_id])?;
552 let mut out = Vec::new();
553
554 while let Some(row) = rows.next()? {
555 let id: i64 = row.get(0)?;
556 let chunk_id: i64 = row.get(1)?;
557 let text: String = row.get(2)?;
558 let timestamp_str: String = row.get(3)?;
559 let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
560 out.push((id, chunk_id, text, timestamp));
561 }
562
563 Ok(out)
564 }
565
566 pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
567 let transcriptions = self.list_transcriptions(session_id, None)?;
568 Ok(transcriptions
569 .into_iter()
570 .map(|(_, _, text, _)| text)
571 .collect::<Vec<_>>()
572 .join(" "))
573 }
574
575 pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
576 let conn = self.conn();
577 conn.execute(
578 "DELETE FROM transcriptions WHERE session_id = ?",
579 params![session_id],
580 )?;
581 Ok(())
582 }
583
584 pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
585 let conn = self.conn();
586 let mut stmt =
587 conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
588 let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
589 match result {
590 Ok(text) => Ok(Some(text)),
591 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
592 Err(e) => Err(e.into()),
593 }
594 }
595
596 pub fn upsert_tokenized_file(
600 &self,
601 session_id: &str,
602 path: &str,
603 file_hash: &str,
604 raw_tokens: usize,
605 cleaned_tokens: usize,
606 bytes_captured: usize,
607 truncated: bool,
608 embedding_id: Option<i64>,
609 ) -> Result<i64> {
610 let conn = self.conn();
611 conn.execute(
612 "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
613 params![session_id, path],
614 )?;
615 let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
616 let id: i64 = stmt.query_row(
617 params![
618 session_id,
619 path,
620 file_hash,
621 raw_tokens as i64,
622 cleaned_tokens as i64,
623 bytes_captured as i64,
624 truncated,
625 embedding_id
626 ],
627 |row| row.get(0),
628 )?;
629 Ok(id)
630 }
631
632 pub fn get_tokenized_file(
633 &self,
634 session_id: &str,
635 path: &str,
636 ) -> Result<Option<TokenizedFileRecord>> {
637 let conn = self.conn();
638 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
639 let mut rows = stmt.query(params![session_id, path])?;
640 if let Some(row) = rows.next()? {
641 let record = TokenizedFileRecord::from_row(row)?;
642 Ok(Some(record))
643 } else {
644 Ok(None)
645 }
646 }
647
648 pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
649 let conn = self.conn();
650 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
651 let mut rows = stmt.query(params![session_id])?;
652 let mut out = Vec::new();
653 while let Some(row) = rows.next()? {
654 out.push(TokenizedFileRecord::from_row(row)?);
655 }
656 Ok(out)
657 }
658
659 pub fn mesh_message_store(
663 &self,
664 message_id: &str,
665 source_instance: &str,
666 target_instance: Option<&str>,
667 message_type: &str,
668 payload: &JsonValue,
669 status: &str,
670 ) -> Result<i64> {
671 let conn = self.conn();
672 let payload_json = serde_json::to_string(payload)?;
673 conn.execute(
674 "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
675 params![message_id, source_instance, target_instance, message_type, payload_json, status],
676 )?;
677 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
679 Ok(id)
680 }
681
682 pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
684 let conn = self.conn();
685 let count: i64 = conn.query_row(
686 "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
687 params![message_id],
688 |row| row.get(0),
689 )?;
690 Ok(count > 0)
691 }
692
693 pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
695 let conn = self.conn();
696 conn.execute(
697 "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
698 params![status, message_id],
699 )?;
700 Ok(())
701 }
702
703 pub fn mesh_message_get_pending(
705 &self,
706 target_instance: &str,
707 ) -> Result<Vec<MeshMessageRecord>> {
708 let conn = self.conn();
709 let mut stmt = conn.prepare(
710 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
711 FROM mesh_messages
712 WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
713 ORDER BY created_at",
714 )?;
715 let mut rows = stmt.query(params![target_instance])?;
716 let mut out = Vec::new();
717 while let Some(row) = rows.next()? {
718 out.push(MeshMessageRecord::from_row(row)?);
719 }
720 Ok(out)
721 }
722
723 pub fn mesh_message_get_history(
725 &self,
726 instance_id: Option<&str>,
727 limit: usize,
728 ) -> Result<Vec<MeshMessageRecord>> {
729 let conn = self.conn();
730 let query = if instance_id.is_some() {
731 format!(
732 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
733 FROM mesh_messages
734 WHERE source_instance = ? OR target_instance = ?
735 ORDER BY created_at DESC LIMIT {}",
736 limit
737 )
738 } else {
739 format!(
740 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
741 FROM mesh_messages
742 ORDER BY created_at DESC LIMIT {}",
743 limit
744 )
745 };
746
747 let mut stmt = conn.prepare(&query)?;
748 let mut rows = if let Some(inst) = instance_id {
749 stmt.query(params![inst, inst])?
750 } else {
751 stmt.query(params![])?
752 };
753
754 let mut out = Vec::new();
755 while let Some(row) = rows.next()? {
756 out.push(MeshMessageRecord::from_row(row)?);
757 }
758 Ok(out)
759 }
760
761 pub fn graph_changelog_append(
765 &self,
766 session_id: &str,
767 instance_id: &str,
768 entity_type: &str,
769 entity_id: i64,
770 operation: &str,
771 vector_clock: &str,
772 data: Option<&str>,
773 ) -> Result<i64> {
774 self.graph_store.graph_changelog_append(
775 session_id,
776 instance_id,
777 entity_type,
778 entity_id,
779 operation,
780 vector_clock,
781 data,
782 )
783 }
784
785 pub fn graph_changelog_get_since(
787 &self,
788 session_id: &str,
789 since_timestamp: &str,
790 ) -> Result<Vec<ChangelogEntry>> {
791 self.graph_store
792 .graph_changelog_get_since(session_id, since_timestamp)
793 .map(|entries| {
794 entries
795 .into_iter()
796 .map(|e| ChangelogEntry {
797 id: e.id,
798 session_id: e.session_id,
799 instance_id: e.instance_id,
800 entity_type: e.entity_type,
801 entity_id: e.entity_id,
802 operation: e.operation,
803 vector_clock: e.vector_clock,
804 data: e.data,
805 created_at: e.created_at,
806 })
807 .collect()
808 })
809 }
810
811 pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
813 self.graph_store
814 .graph_list_conflicts(session_id, 100)
815 .map(|entries| {
816 entries
817 .into_iter()
818 .map(|e| ChangelogEntry {
819 id: e.id,
820 session_id: e.session_id,
821 instance_id: e.instance_id,
822 entity_type: e.entity_type,
823 entity_id: e.entity_id,
824 operation: e.operation,
825 vector_clock: e.vector_clock,
826 data: e.data,
827 created_at: e.created_at,
828 })
829 .collect()
830 })
831 }
832
833 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
835 self.graph_store.graph_changelog_prune(days_to_keep)
836 }
837
838 pub fn graph_sync_state_get(
840 &self,
841 instance_id: &str,
842 session_id: &str,
843 graph_name: &str,
844 ) -> Result<Option<String>> {
845 self.graph_store
846 .graph_sync_state_get(instance_id, session_id, graph_name)
847 }
848
849 pub fn graph_sync_state_get_metadata(
851 &self,
852 instance_id: &str,
853 session_id: &str,
854 graph_name: &str,
855 ) -> Result<Option<SyncStateRecord>> {
856 self.graph_store
857 .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
858 .map(|opt| {
859 opt.map(|r| SyncStateRecord {
860 vector_clock: r.vector_clock,
861 last_sync_at: r.last_sync_at,
862 })
863 })
864 }
865
866 pub fn graph_sync_state_update(
868 &self,
869 instance_id: &str,
870 session_id: &str,
871 graph_name: &str,
872 vector_clock: &str,
873 ) -> Result<()> {
874 self.graph_store
875 .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
876 }
877
878 pub fn graph_set_sync_config(
880 &self,
881 session_id: &str,
882 graph_name: &str,
883 sync_enabled: bool,
884 conflict_resolution_strategy: Option<&str>,
885 sync_interval_seconds: Option<u64>,
886 ) -> Result<GraphSyncConfig> {
887 self.graph_store
888 .graph_set_sync_config(
889 session_id,
890 graph_name,
891 sync_enabled,
892 conflict_resolution_strategy,
893 sync_interval_seconds,
894 )
895 .map(|cfg| GraphSyncConfig {
896 sync_enabled: cfg.sync_enabled,
897 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
898 sync_interval_seconds: cfg.sync_interval_seconds,
899 })
900 }
901
902 pub fn graph_get_sync_config(
904 &self,
905 session_id: &str,
906 graph_name: &str,
907 ) -> Result<GraphSyncConfig> {
908 self.graph_store
909 .graph_get_sync_config(session_id, graph_name)
910 .map(|cfg| GraphSyncConfig {
911 sync_enabled: cfg.sync_enabled,
912 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
913 sync_interval_seconds: cfg.sync_interval_seconds,
914 })
915 }
916
917 pub fn graph_set_sync_enabled(
919 &self,
920 session_id: &str,
921 graph_name: &str,
922 enabled: bool,
923 ) -> Result<()> {
924 self.graph_store
925 .graph_set_sync_enabled(session_id, graph_name, enabled)
926 }
927
928 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
930 self.graph_store
931 .graph_get_sync_enabled(session_id, graph_name)
932 }
933
934 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
936 self.graph_store.graph_list(session_id)
937 }
938
939 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
941 self.graph_store.graph_list_sync_enabled()
942 }
943
944 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
946 self.graph_store
947 .graph_get_node_with_sync(node_id)
948 .map(|opt| {
949 opt.map(|r| SyncedNodeRecord {
950 id: r.id,
951 session_id: r.session_id,
952 node_type: r.node_type,
953 label: r.label,
954 properties: r.properties,
955 embedding_id: r.embedding_id,
956 created_at: r.created_at,
957 updated_at: r.updated_at,
958 vector_clock: r.vector_clock,
959 last_modified_by: r.last_modified_by,
960 is_deleted: r.is_deleted,
961 sync_enabled: r.sync_enabled,
962 })
963 })
964 }
965
966 pub fn graph_list_nodes_with_sync(
968 &self,
969 session_id: &str,
970 sync_enabled_only: bool,
971 include_deleted: bool,
972 ) -> Result<Vec<SyncedNodeRecord>> {
973 self.graph_store
974 .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
975 .map(|nodes| {
976 nodes
977 .into_iter()
978 .map(|r| SyncedNodeRecord {
979 id: r.id,
980 session_id: r.session_id,
981 node_type: r.node_type,
982 label: r.label,
983 properties: r.properties,
984 embedding_id: r.embedding_id,
985 created_at: r.created_at,
986 updated_at: r.updated_at,
987 vector_clock: r.vector_clock,
988 last_modified_by: r.last_modified_by,
989 is_deleted: r.is_deleted,
990 sync_enabled: r.sync_enabled,
991 })
992 .collect()
993 })
994 }
995
996 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
998 self.graph_store
999 .graph_get_edge_with_sync(edge_id)
1000 .map(|opt| {
1001 opt.map(|r| SyncedEdgeRecord {
1002 id: r.id,
1003 session_id: r.session_id,
1004 source_id: r.source_id,
1005 target_id: r.target_id,
1006 edge_type: r.edge_type,
1007 predicate: r.predicate,
1008 properties: r.properties,
1009 weight: r.weight,
1010 temporal_start: r.temporal_start,
1011 temporal_end: r.temporal_end,
1012 created_at: r.created_at,
1013 vector_clock: r.vector_clock,
1014 last_modified_by: r.last_modified_by,
1015 is_deleted: r.is_deleted,
1016 sync_enabled: r.sync_enabled,
1017 })
1018 })
1019 }
1020
1021 pub fn graph_list_edges_with_sync(
1023 &self,
1024 session_id: &str,
1025 sync_enabled_only: bool,
1026 include_deleted: bool,
1027 ) -> Result<Vec<SyncedEdgeRecord>> {
1028 self.graph_store
1029 .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1030 .map(|edges| {
1031 edges
1032 .into_iter()
1033 .map(|r| SyncedEdgeRecord {
1034 id: r.id,
1035 session_id: r.session_id,
1036 source_id: r.source_id,
1037 target_id: r.target_id,
1038 edge_type: r.edge_type,
1039 predicate: r.predicate,
1040 properties: r.properties,
1041 weight: r.weight,
1042 temporal_start: r.temporal_start,
1043 temporal_end: r.temporal_end,
1044 created_at: r.created_at,
1045 vector_clock: r.vector_clock,
1046 last_modified_by: r.last_modified_by,
1047 is_deleted: r.is_deleted,
1048 sync_enabled: r.sync_enabled,
1049 })
1050 .collect()
1051 })
1052 }
1053
1054 pub fn graph_update_node_sync_metadata(
1056 &self,
1057 node_id: i64,
1058 vector_clock: &str,
1059 last_modified_by: &str,
1060 sync_enabled: bool,
1061 ) -> Result<()> {
1062 self.graph_store.graph_update_node_sync_metadata(
1063 node_id,
1064 vector_clock,
1065 last_modified_by,
1066 sync_enabled,
1067 )
1068 }
1069
1070 pub fn graph_update_edge_sync_metadata(
1072 &self,
1073 edge_id: i64,
1074 vector_clock: &str,
1075 last_modified_by: &str,
1076 sync_enabled: bool,
1077 ) -> Result<()> {
1078 self.graph_store.graph_update_edge_sync_metadata(
1079 edge_id,
1080 vector_clock,
1081 last_modified_by,
1082 sync_enabled,
1083 )
1084 }
1085
1086 pub fn graph_mark_node_deleted(
1088 &self,
1089 node_id: i64,
1090 vector_clock: &str,
1091 deleted_by: &str,
1092 ) -> Result<()> {
1093 self.graph_store
1094 .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1095 }
1096
1097 pub fn graph_mark_edge_deleted(
1099 &self,
1100 edge_id: i64,
1101 vector_clock: &str,
1102 deleted_by: &str,
1103 ) -> Result<()> {
1104 self.graph_store
1105 .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1106 }
1107}
1108
1109#[derive(Debug, Clone)]
1110pub struct TokenizedFileRecord {
1111 pub id: i64,
1112 pub session_id: String,
1113 pub path: String,
1114 pub file_hash: String,
1115 pub raw_tokens: usize,
1116 pub cleaned_tokens: usize,
1117 pub bytes_captured: usize,
1118 pub truncated: bool,
1119 pub embedding_id: Option<i64>,
1120 pub updated_at: DateTime<Utc>,
1121}
1122
1123impl TokenizedFileRecord {
1124 fn from_row(row: &duckdb::Row) -> Result<Self> {
1125 let id: i64 = row.get(0)?;
1126 let session_id: String = row.get(1)?;
1127 let path: String = row.get(2)?;
1128 let file_hash: String = row.get(3)?;
1129 let raw_tokens: i64 = row.get(4)?;
1130 let cleaned_tokens: i64 = row.get(5)?;
1131 let bytes_captured: i64 = row.get(6)?;
1132 let truncated: bool = row.get(7)?;
1133 let embedding_id: Option<i64> = row.get(8)?;
1134 let updated_at: String = row.get(9)?;
1135
1136 Ok(Self {
1137 id,
1138 session_id,
1139 path,
1140 file_hash,
1141 raw_tokens: raw_tokens.max(0) as usize,
1142 cleaned_tokens: cleaned_tokens.max(0) as usize,
1143 bytes_captured: bytes_captured.max(0) as usize,
1144 truncated,
1145 embedding_id,
1146 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1147 })
1148 }
1149}
1150
1151#[derive(Debug, Clone)]
1152pub struct MeshMessageRecord {
1153 pub id: i64,
1154 pub source_instance: String,
1155 pub target_instance: Option<String>,
1156 pub message_type: String,
1157 pub payload: JsonValue,
1158 pub status: String,
1159 pub created_at: DateTime<Utc>,
1160 pub delivered_at: Option<DateTime<Utc>>,
1161}
1162
1163impl MeshMessageRecord {
1164 fn from_row(row: &duckdb::Row) -> Result<Self> {
1165 let id: i64 = row.get(0)?;
1166 let source_instance: String = row.get(1)?;
1167 let target_instance: Option<String> = row.get(2)?;
1168 let message_type: String = row.get(3)?;
1169 let payload_str: String = row.get(4)?;
1170 let payload: JsonValue = serde_json::from_str(&payload_str)?;
1171 let status: String = row.get(5)?;
1172 let created_at_str: String = row.get(6)?;
1173 let delivered_at_str: Option<String> = row.get(7)?;
1174
1175 Ok(MeshMessageRecord {
1176 id,
1177 source_instance,
1178 target_instance,
1179 message_type,
1180 payload,
1181 status,
1182 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1183 delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1184 })
1185 }
1186}
1187
1188#[derive(Debug, Clone)]
1191pub struct SyncStateRecord {
1192 pub vector_clock: String,
1193 pub last_sync_at: Option<String>,
1194}
1195
1196#[derive(Debug, Clone, Default)]
1197pub struct GraphSyncConfig {
1198 pub sync_enabled: bool,
1199 pub conflict_resolution_strategy: Option<String>,
1200 pub sync_interval_seconds: Option<u64>,
1201}
1202
1203#[derive(Debug, Clone)]
1204pub struct ChangelogEntry {
1205 pub id: i64,
1206 pub session_id: String,
1207 pub instance_id: String,
1208 pub entity_type: String,
1209 pub entity_id: i64,
1210 pub operation: String,
1211 pub vector_clock: String,
1212 pub data: Option<String>,
1213 pub created_at: DateTime<Utc>,
1214}
1215
1216impl ChangelogEntry {
1217 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1218 let id: i64 = row.get(0)?;
1219 let session_id: String = row.get(1)?;
1220 let instance_id: String = row.get(2)?;
1221 let entity_type: String = row.get(3)?;
1222 let entity_id: i64 = row.get(4)?;
1223 let operation: String = row.get(5)?;
1224 let vector_clock: String = row.get(6)?;
1225 let data: Option<String> = row.get(7)?;
1226 let created_at_str: String = row.get(8)?;
1227
1228 Ok(ChangelogEntry {
1229 id,
1230 session_id,
1231 instance_id,
1232 entity_type,
1233 entity_id,
1234 operation,
1235 vector_clock,
1236 data,
1237 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1238 })
1239 }
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct SyncedNodeRecord {
1244 pub id: i64,
1245 pub session_id: String,
1246 pub node_type: String,
1247 pub label: String,
1248 pub properties: serde_json::Value,
1249 pub embedding_id: Option<i64>,
1250 pub created_at: DateTime<Utc>,
1251 pub updated_at: DateTime<Utc>,
1252 pub vector_clock: String,
1253 pub last_modified_by: Option<String>,
1254 pub is_deleted: bool,
1255 pub sync_enabled: bool,
1256}
1257
1258impl SyncedNodeRecord {
1259 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1260 let id: i64 = row.get(0)?;
1261 let session_id: String = row.get(1)?;
1262 let node_type: String = row.get(2)?;
1263 let label: String = row.get(3)?;
1264 let properties_str: String = row.get(4)?;
1265 let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1266 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1267 })?;
1268 let embedding_id: Option<i64> = row.get(5)?;
1269 let created_at_str: String = row.get(6)?;
1270 let updated_at_str: String = row.get(7)?;
1271 let vector_clock: String = row.get(8)?;
1272 let last_modified_by: Option<String> = row.get(9)?;
1273 let is_deleted: bool = row.get(10)?;
1274 let sync_enabled: bool = row.get(11)?;
1275
1276 Ok(SyncedNodeRecord {
1277 id,
1278 session_id,
1279 node_type,
1280 label,
1281 properties,
1282 embedding_id,
1283 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1284 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1285 vector_clock,
1286 last_modified_by,
1287 is_deleted,
1288 sync_enabled,
1289 })
1290 }
1291}
1292
1293#[derive(Debug, Clone)]
1294pub struct SyncedEdgeRecord {
1295 pub id: i64,
1296 pub session_id: String,
1297 pub source_id: i64,
1298 pub target_id: i64,
1299 pub edge_type: String,
1300 pub predicate: Option<String>,
1301 pub properties: Option<serde_json::Value>,
1302 pub weight: f32,
1303 pub temporal_start: Option<DateTime<Utc>>,
1304 pub temporal_end: Option<DateTime<Utc>>,
1305 pub created_at: DateTime<Utc>,
1306 pub vector_clock: String,
1307 pub last_modified_by: Option<String>,
1308 pub is_deleted: bool,
1309 pub sync_enabled: bool,
1310}
1311
1312impl SyncedEdgeRecord {
1313 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1314 let id: i64 = row.get(0)?;
1315 let session_id: String = row.get(1)?;
1316 let source_id: i64 = row.get(2)?;
1317 let target_id: i64 = row.get(3)?;
1318 let edge_type: String = row.get(4)?;
1319 let predicate: Option<String> = row.get(5)?;
1320 let properties_str: Option<String> = row.get(6)?;
1321 let properties: Option<serde_json::Value> = properties_str
1322 .as_ref()
1323 .and_then(|s| serde_json::from_str(s).ok());
1324 let weight: f32 = row.get(7)?;
1325 let temporal_start_str: Option<String> = row.get(8)?;
1326 let temporal_end_str: Option<String> = row.get(9)?;
1327 let created_at_str: String = row.get(10)?;
1328 let vector_clock: String = row.get(11)?;
1329 let last_modified_by: Option<String> = row.get(12)?;
1330 let is_deleted: bool = row.get(13)?;
1331 let sync_enabled: bool = row.get(14)?;
1332
1333 Ok(SyncedEdgeRecord {
1334 id,
1335 session_id,
1336 source_id,
1337 target_id,
1338 edge_type,
1339 predicate,
1340 properties,
1341 weight,
1342 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1343 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1344 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1345 vector_clock,
1346 last_modified_by,
1347 is_deleted,
1348 sync_enabled,
1349 })
1350 }
1351}