1mod migrations;
6mod index;
7
8pub use migrations::{get_schema_version, needs_migration, run_migrations};
9pub use index::{IndexConfig, PersistentHnswIndex, SharedPersistentIndex};
10
11use cp_core::{Chunk, CPError, Document, Edge, EdgeKind, Embedding, Result};
12use rusqlite::{params, Connection, OptionalExtension};
13use std::path::PathBuf;
14use tracing::info;
15use uuid::Uuid;
16
17fn uuid_from_bytes(bytes: &[u8]) -> rusqlite::Result<Uuid> {
19 Uuid::from_slice(bytes).map_err(|e| {
20 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Blob, Box::new(e))
21 })
22}
23
24pub struct GraphStore {
26 db: Connection,
28 hnsw: SharedPersistentIndex,
30 db_path: Option<PathBuf>,
32 index_config: IndexConfig,
34}
35
36impl GraphStore {
37 pub fn open(db_path: &str) -> Result<Self> {
39 Self::open_with_config(db_path, IndexConfig::default())
40 }
41
42 pub fn open_with_config(db_path: &str, index_config: IndexConfig) -> Result<Self> {
44 let (db, path) = if db_path == ":memory:" {
45 (Connection::open_in_memory().map_err(|e| CPError::Database(e.to_string()))?, None)
46 } else {
47 let path = PathBuf::from(db_path);
48 (Connection::open(&path).map_err(|e| CPError::Database(e.to_string()))?, Some(path))
49 };
50
51 run_migrations(&db)?;
53
54 let hnsw = if let Some(ref p) = path {
56 let index_path = p.with_extension("usearch");
57 SharedPersistentIndex::open(index_path, index_config.clone())?
58 } else {
59 SharedPersistentIndex::new(index_config.clone())?
60 };
61
62 let store = Self {
63 db,
64 hnsw,
65 db_path: path,
66 index_config,
67 };
68
69 if store.hnsw.needs_rebuild() {
71 store.rebuild_hnsw_index()?;
72 } else {
73 let current_root = store.compute_merkle_root()?;
75 if !store.hnsw.is_valid(¤t_root) {
76 info!("Index checkpoint mismatch, rebuilding...");
77 store.rebuild_hnsw_index()?;
78 }
79 }
80
81 info!("Opened graph store at {} and loaded index", db_path);
82
83 Ok(store)
84 }
85
86 pub fn in_memory() -> Result<Self> {
88 Self::open(":memory:")
89 }
90
91 fn rebuild_hnsw_index(&self) -> Result<()> {
93 info!("Rebuilding HNSW index from database...");
94 self.hnsw.clear()?;
95
96 let embs = self.get_all_embeddings()?;
97 for emb in embs {
98 self.hnsw.insert(emb.id, emb.to_f32())?;
99 }
100
101 let root = self.compute_merkle_root()?;
103 self.hnsw.checkpoint(root)?;
104
105 info!("HNSW index rebuilt with {} vectors", self.hnsw.len());
106 Ok(())
107 }
108
109 pub fn insert_document(&mut self, doc: &Document) -> Result<()> {
113 self.db
114 .execute(
115 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx)
116 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
117 params![
118 doc.id.as_bytes().as_slice(),
119 doc.path.to_string_lossy().as_ref(),
120 doc.hash.as_slice(),
121 doc.hierarchical_hash.as_slice(),
122 doc.mtime,
123 doc.size as i64,
124 &doc.mime_type,
125 doc.path_id.as_bytes().as_slice(),
126 doc.arweave_tx.as_deref(),
127 ],
128 )
129 .map_err(|e| CPError::Database(e.to_string()))?;
130
131 Ok(())
132 }
133
134 const DOC_COLUMNS: &'static str = "id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx";
136
137 fn row_to_document(row: &rusqlite::Row) -> rusqlite::Result<Document> {
139 let id_bytes: Vec<u8> = row.get(0)?;
140 let path_str: String = row.get(1)?;
141 let hash_bytes: Vec<u8> = row.get(2)?;
142 let hh_bytes: Vec<u8> = row.get(3)?;
143 let mtime: i64 = row.get(4)?;
144 let size: i64 = row.get(5)?;
145 let mime_type: String = row.get(6)?;
146 let path_id_bytes: Option<Vec<u8>> = row.get(7)?;
147 let arweave_tx: Option<String> = row.get(8)?;
148
149 let id = uuid_from_bytes(&id_bytes)?;
150 let mut hash = [0u8; 32];
151 hash.copy_from_slice(&hash_bytes);
152 let mut hierarchical_hash = [0u8; 32];
153 hierarchical_hash.copy_from_slice(&hh_bytes);
154
155 let path_id = path_id_bytes
156 .and_then(|b| Uuid::from_slice(&b).ok())
157 .unwrap_or(Uuid::nil());
158
159 Ok(Document {
160 id,
161 path_id,
162 path: std::path::PathBuf::from(path_str),
163 hash,
164 hierarchical_hash,
165 mtime,
166 size: size as u64,
167 mime_type,
168 arweave_tx,
169 })
170 }
171
172 pub fn get_document(&self, id: Uuid) -> Result<Option<Document>> {
174 let sql = format!("SELECT {} FROM documents WHERE id = ?1", Self::DOC_COLUMNS);
175 self.db
176 .query_row(&sql, params![id.as_bytes().as_slice()], Self::row_to_document)
177 .optional()
178 .map_err(|e| CPError::Database(e.to_string()))
179 }
180
181 pub fn get_document_by_path(&self, path: &std::path::Path) -> Result<Option<Document>> {
183 let sql = format!("SELECT {} FROM documents WHERE path = ?1", Self::DOC_COLUMNS);
184 self.db
185 .query_row(&sql, params![path.to_string_lossy().as_ref()], Self::row_to_document)
186 .optional()
187 .map_err(|e| CPError::Database(e.to_string()))
188 }
189
190 pub fn delete_document(&mut self, id: Uuid) -> Result<()> {
192 self.db
195 .execute(
196 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
197 params![id.as_bytes().as_slice()],
198 )
199 .map_err(|e| CPError::Database(e.to_string()))?;
200 self.db
201 .execute(
202 "DELETE FROM edges WHERE source IN (SELECT id FROM chunks WHERE doc_id = ?1)
203 OR target IN (SELECT id FROM chunks WHERE doc_id = ?1)",
204 params![id.as_bytes().as_slice()],
205 )
206 .map_err(|e| CPError::Database(e.to_string()))?;
207 self.db
208 .execute(
209 "DELETE FROM edges WHERE source IN (SELECT id FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1))
210 OR target IN (SELECT id FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1))",
211 params![id.as_bytes().as_slice()],
212 )
213 .map_err(|e| CPError::Database(e.to_string()))?;
214
215 self.db
217 .execute(
218 "DELETE FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1)",
219 params![id.as_bytes().as_slice()],
220 )
221 .map_err(|e| CPError::Database(e.to_string()))?;
222
223 self.db
224 .execute(
225 "DELETE FROM chunks WHERE doc_id = ?1",
226 params![id.as_bytes().as_slice()],
227 )
228 .map_err(|e| CPError::Database(e.to_string()))?;
229
230 self.db
231 .execute(
232 "DELETE FROM documents WHERE id = ?1",
233 params![id.as_bytes().as_slice()],
234 )
235 .map_err(|e| CPError::Database(e.to_string()))?;
236
237 self.hnsw.invalidate();
239
240 Ok(())
241 }
242
243 pub fn delete_chunk(&mut self, id: Uuid) -> Result<()> {
245 self.db
247 .execute(
248 "DELETE FROM embeddings WHERE chunk_id = ?1",
249 params![id.as_bytes().as_slice()],
250 )
251 .map_err(|e| CPError::Database(e.to_string()))?;
252 self.db
254 .execute(
255 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
256 params![id.as_bytes().as_slice()],
257 )
258 .map_err(|e| CPError::Database(e.to_string()))?;
259 self.db
260 .execute(
261 "DELETE FROM chunks WHERE id = ?1",
262 params![id.as_bytes().as_slice()],
263 )
264 .map_err(|e| CPError::Database(e.to_string()))?;
265 self.hnsw.invalidate();
266 Ok(())
267 }
268
269 pub fn delete_embedding(&mut self, id: Uuid) -> Result<()> {
271 self.db
272 .execute(
273 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
274 params![id.as_bytes().as_slice()],
275 )
276 .map_err(|e| CPError::Database(e.to_string()))?;
277 self.db
278 .execute(
279 "DELETE FROM embeddings WHERE id = ?1",
280 params![id.as_bytes().as_slice()],
281 )
282 .map_err(|e| CPError::Database(e.to_string()))?;
283 self.hnsw.invalidate();
284 Ok(())
285 }
286
287 pub fn get_all_documents(&self) -> Result<Vec<Document>> {
289 let sql = format!("SELECT {} FROM documents", Self::DOC_COLUMNS);
290 let mut stmt = self
291 .db
292 .prepare(&sql)
293 .map_err(|e| CPError::Database(e.to_string()))?;
294
295 let docs = stmt
296 .query_map([], Self::row_to_document)
297 .map_err(|e| CPError::Database(e.to_string()))?
298 .collect::<std::result::Result<Vec<_>, _>>()
299 .map_err(|e| CPError::Database(e.to_string()))?;
300
301 Ok(docs)
302 }
303
304 pub fn get_all_chunk_ids(&self) -> Result<Vec<Uuid>> {
308 let mut stmt = self
309 .db
310 .prepare("SELECT id FROM chunks ORDER BY id")
311 .map_err(|e| CPError::Database(e.to_string()))?;
312
313 let ids = stmt
314 .query_map([], |row| {
315 let id_bytes: Vec<u8> = row.get(0)?;
316 Ok(uuid_from_bytes(&id_bytes)?)
317 })
318 .map_err(|e| CPError::Database(e.to_string()))?
319 .collect::<std::result::Result<Vec<_>, _>>()
320 .map_err(|e| CPError::Database(e.to_string()))?;
321
322 Ok(ids)
323 }
324
325 pub fn insert_chunk(&mut self, chunk: &Chunk) -> Result<()> {
327 self.db
328 .execute(
329 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
330 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
331 params![
332 chunk.id.as_bytes().as_slice(),
333 chunk.doc_id.as_bytes().as_slice(),
334 &chunk.text,
335 chunk.byte_offset as i64,
336 chunk.byte_length as i64,
337 chunk.sequence,
338 chunk.text_hash.as_slice(),
339 ],
340 )
341 .map_err(|e| CPError::Database(e.to_string()))?;
342
343 Ok(())
344 }
345
346 pub fn get_chunks_for_doc(&self, doc_id: Uuid) -> Result<Vec<Chunk>> {
348 let mut stmt = self
349 .db
350 .prepare(
351 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash
352 FROM chunks WHERE doc_id = ?1 ORDER BY sequence",
353 )
354 .map_err(|e| CPError::Database(e.to_string()))?;
355
356 let chunks = stmt
357 .query_map(params![doc_id.as_bytes().as_slice()], |row| {
358 let id_bytes: Vec<u8> = row.get(0)?;
359 let doc_id_bytes: Vec<u8> = row.get(1)?;
360 let text: String = row.get(2)?;
361 let byte_offset: i64 = row.get(3)?;
362 let byte_length: i64 = row.get(4)?;
363 let sequence: u32 = row.get(5)?;
364 let text_hash_bytes: Vec<u8> = row.get(6)?;
365
366 let id = uuid_from_bytes(&id_bytes)?;
367 let doc_id = uuid_from_bytes(&doc_id_bytes)?;
368 let mut text_hash = [0u8; 32];
369 text_hash.copy_from_slice(&text_hash_bytes);
370
371 Ok(Chunk {
372 id,
373 doc_id,
374 text,
375 byte_offset: byte_offset as u64,
376 byte_length: byte_length as u64,
377 sequence,
378 text_hash,
379 })
380 })
381 .map_err(|e| CPError::Database(e.to_string()))?
382 .collect::<std::result::Result<Vec<_>, _>>()
383 .map_err(|e| CPError::Database(e.to_string()))?;
384
385 Ok(chunks)
386 }
387
388 pub fn get_chunk(&self, id: Uuid) -> Result<Option<Chunk>> {
390 self.db
391 .query_row(
392 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash FROM chunks WHERE id = ?1",
393 params![id.as_bytes().as_slice()],
394 |row| {
395 let id_bytes: Vec<u8> = row.get(0)?;
396 let doc_id_bytes: Vec<u8> = row.get(1)?;
397 let text: String = row.get(2)?;
398 let byte_offset: i64 = row.get(3)?;
399 let byte_length: i64 = row.get(4)?;
400 let sequence: u32 = row.get(5)?;
401 let text_hash_bytes: Vec<u8> = row.get(6)?;
402
403 let id = uuid_from_bytes(&id_bytes)?;
404 let doc_id = uuid_from_bytes(&doc_id_bytes)?;
405 let mut text_hash = [0u8; 32];
406 text_hash.copy_from_slice(&text_hash_bytes);
407
408 Ok(Chunk {
409 id,
410 doc_id,
411 text,
412 byte_offset: byte_offset as u64,
413 byte_length: byte_length as u64,
414 sequence,
415 text_hash,
416 })
417 },
418 )
419 .optional()
420 .map_err(|e| CPError::Database(e.to_string()))
421 }
422
423 pub fn insert_embedding(&mut self, emb: &Embedding) -> Result<()> {
427 let vector_bytes: Vec<u8> = emb
430 .vector
431 .iter()
432 .flat_map(|val| val.to_le_bytes())
433 .collect();
434
435 self.db
436 .execute(
437 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim, l2_norm)
438 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
439 params![
440 emb.id.as_bytes().as_slice(),
441 emb.chunk_id.as_bytes().as_slice(),
442 &vector_bytes,
443 emb.model_hash.as_slice(),
444 emb.dim as i32,
445 emb.l2_norm,
446 ],
447 )
448 .map_err(|e| CPError::Database(e.to_string()))?;
449
450 let vector_f32 = emb.to_f32();
452 self.hnsw.insert(emb.id, vector_f32)?;
453
454 self.hnsw.invalidate();
456
457 Ok(())
458 }
459
460 pub fn get_embedding_for_chunk(&self, chunk_id: Uuid) -> Result<Option<Embedding>> {
462 self.db
463 .query_row(
464 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version
465 FROM embeddings WHERE chunk_id = ?1",
466 params![chunk_id.as_bytes().as_slice()],
467 |row| self.row_to_embedding(row),
468 )
469 .optional()
470 .map_err(|e| CPError::Database(e.to_string()))
471 }
472
473 pub fn get_embedding(&self, id: Uuid) -> Result<Option<Embedding>> {
475 self.db
476 .query_row(
477 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version
478 FROM embeddings WHERE id = ?1",
479 params![id.as_bytes().as_slice()],
480 |row| self.row_to_embedding(row),
481 )
482 .optional()
483 .map_err(|e| CPError::Database(e.to_string()))
484 }
485
486 fn row_to_embedding(&self, row: &rusqlite::Row) -> rusqlite::Result<Embedding> {
487 let id_bytes: Vec<u8> = row.get(0)?;
488 let chunk_id_bytes: Vec<u8> = row.get(1)?;
489 let vector_bytes: Vec<u8> = row.get(2)?;
490 let model_hash_bytes: Vec<u8> = row.get(3)?;
491 let _dim: i32 = row.get(4)?;
492 let l2_norm: f32 = row.get(5).unwrap_or(0.0);
493 let embedding_version: u32 = row.get(6).unwrap_or(0);
494
495 let _id = uuid_from_bytes(&id_bytes)?;
496 let chunk_id = uuid_from_bytes(&chunk_id_bytes)?;
497 let mut model_hash = [0u8; 32];
498 model_hash.copy_from_slice(&model_hash_bytes);
499
500 let vector: Vec<i16> = vector_bytes
502 .chunks(2)
503 .map(|bytes| i16::from_le_bytes([bytes[0], bytes[1]]))
504 .collect();
505
506 Ok(Embedding::from_quantized_with_norm(
507 chunk_id,
508 vector,
509 model_hash,
510 l2_norm,
511 embedding_version,
512 ))
513 }
514
515 pub fn get_chunk_id_for_embedding(&self, embedding_id: Uuid) -> Result<Option<Uuid>> {
517 self.db
518 .query_row(
519 "SELECT chunk_id FROM embeddings WHERE id = ?1",
520 params![embedding_id.as_bytes().as_slice()],
521 |row| {
522 let id_bytes: Vec<u8> = row.get(0)?;
523 Ok(uuid_from_bytes(&id_bytes)?)
524 },
525 )
526 .optional()
527 .map_err(|e| CPError::Database(e.to_string()))
528 }
529
530 pub fn get_all_embeddings(&self) -> Result<Vec<Embedding>> {
532 let mut stmt = self
533 .db
534 .prepare("SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version FROM embeddings")
535 .map_err(|e| CPError::Database(e.to_string()))?;
536
537 let embs = stmt
538 .query_map([], |row| self.row_to_embedding(row))
539 .map_err(|e| CPError::Database(e.to_string()))?
540 .collect::<std::result::Result<Vec<_>, _>>()
541 .map_err(|e| CPError::Database(e.to_string()))?;
542
543 Ok(embs)
544 }
545
546 pub fn checkpoint_index(&self) -> Result<()> {
548 let root = self.compute_merkle_root()?;
549 self.hnsw.checkpoint(root)
550 }
551
552 pub fn save_index(&self) -> Result<()> {
554 self.hnsw.save()
555 }
556
557 pub fn add_edge(&mut self, edge: &Edge) -> Result<()> {
561 self.db
562 .execute(
563 "INSERT OR REPLACE INTO edges (source, target, kind, weight, metadata) VALUES (?1, ?2, ?3, ?4, ?5)",
564 params![
565 edge.source.as_bytes().as_slice(),
566 edge.target.as_bytes().as_slice(),
567 edge.kind as i32,
568 edge.weight,
569 edge.metadata.as_deref(),
570 ],
571 )
572 .map_err(|e| CPError::Database(e.to_string()))?;
573
574 Ok(())
575 }
576
577 pub fn get_edges(&self, node_id: Uuid) -> Result<Vec<Edge>> {
579 let mut stmt = self
580 .db
581 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE source = ?1")
582 .map_err(|e| CPError::Database(e.to_string()))?;
583
584 let edges = stmt
585 .query_map(params![node_id.as_bytes().as_slice()], |row| {
586 let source_bytes: Vec<u8> = row.get(0)?;
587 let target_bytes: Vec<u8> = row.get(1)?;
588 let kind: i32 = row.get(2)?;
589 let weight: Option<f64> = row.get(3)?;
590 let metadata: Option<String> = row.get(4)?;
591
592 let source = uuid_from_bytes(&source_bytes)?;
593 let target = uuid_from_bytes(&target_bytes)?;
594 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
595 rusqlite::Error::FromSqlConversionFailure(
596 2, rusqlite::types::Type::Integer,
597 format!("unknown edge kind: {}", kind).into(),
598 )
599 })?;
600
601 Ok(Edge {
602 source,
603 target,
604 kind,
605 weight: weight.map(|w| w as f32),
606 metadata,
607 })
608 })
609 .map_err(|e| CPError::Database(e.to_string()))?
610 .collect::<std::result::Result<Vec<_>, _>>()
611 .map_err(|e| CPError::Database(e.to_string()))?;
612
613 Ok(edges)
614 }
615
616 pub fn edges_to(&self, node_id: Uuid) -> Result<Vec<Edge>> {
618 let mut stmt = self
619 .db
620 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE target = ?1")
621 .map_err(|e| CPError::Database(e.to_string()))?;
622
623 let edges = stmt
624 .query_map(params![node_id.as_bytes().as_slice()], |row| {
625 let source_bytes: Vec<u8> = row.get(0)?;
626 let target_bytes: Vec<u8> = row.get(1)?;
627 let kind: i32 = row.get(2)?;
628 let weight: Option<f64> = row.get(3)?;
629 let metadata: Option<String> = row.get(4)?;
630
631 let source = uuid_from_bytes(&source_bytes)?;
632 let target = uuid_from_bytes(&target_bytes)?;
633 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
634 rusqlite::Error::FromSqlConversionFailure(
635 2, rusqlite::types::Type::Integer,
636 format!("unknown edge kind: {}", kind).into(),
637 )
638 })?;
639
640 Ok(Edge {
641 source,
642 target,
643 kind,
644 weight: weight.map(|w| w as f32),
645 metadata,
646 })
647 })
648 .map_err(|e| CPError::Database(e.to_string()))?
649 .collect::<std::result::Result<Vec<_>, _>>()
650 .map_err(|e| CPError::Database(e.to_string()))?;
651
652 Ok(edges)
653 }
654
655 pub fn all_edges(&self) -> Result<Vec<Edge>> {
657 let mut stmt = self
658 .db
659 .prepare("SELECT source, target, kind, weight, metadata FROM edges")
660 .map_err(|e| CPError::Database(e.to_string()))?;
661
662 let edges = stmt
663 .query_map([], |row| {
664 let source_bytes: Vec<u8> = row.get(0)?;
665 let target_bytes: Vec<u8> = row.get(1)?;
666 let kind: i32 = row.get(2)?;
667 let weight: Option<f64> = row.get(3)?;
668 let metadata: Option<String> = row.get(4)?;
669
670 let source = uuid_from_bytes(&source_bytes)?;
671 let target = uuid_from_bytes(&target_bytes)?;
672 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
673 rusqlite::Error::FromSqlConversionFailure(
674 2, rusqlite::types::Type::Integer,
675 format!("unknown edge kind: {}", kind).into(),
676 )
677 })?;
678
679 Ok(Edge {
680 source,
681 target,
682 kind,
683 weight: weight.map(|w| w as f32),
684 metadata,
685 })
686 })
687 .map_err(|e| CPError::Database(e.to_string()))?
688 .collect::<std::result::Result<Vec<_>, _>>()
689 .map_err(|e| CPError::Database(e.to_string()))?;
690
691 Ok(edges)
692 }
693
694 pub fn search(&self, query_vec: &[f32], k: usize) -> Result<Vec<(Uuid, f32)>> {
698 Ok(self.hnsw.search(query_vec, k))
699 }
700
701 pub fn search_lexical(&self, query: &str, k: usize) -> Result<Vec<(Uuid, f32)>> {
703 let mut stmt = self
704 .db
705 .prepare(
706 "SELECT id, rank FROM fts_chunks
707 WHERE fts_chunks MATCH ?1
708 ORDER BY rank LIMIT ?2",
709 )
710 .map_err(|e| CPError::Database(e.to_string()))?;
711
712 let results = stmt
713 .query_map(params![query, k as i64], |row| {
714 let id_bytes: Vec<u8> = row.get(0)?;
715 let rank: f64 = row.get(1)?;
716
717 let id = uuid_from_bytes(&id_bytes)?;
718 Ok((id, -rank as f32))
722 })
723 .map_err(|e| CPError::Database(e.to_string()))?
724 .collect::<std::result::Result<Vec<_>, _>>()
725 .map_err(|e| CPError::Database(e.to_string()))?;
726
727 Ok(results)
728 }
729
730 pub fn get_sorted_chunk_hashes(&self) -> Result<Vec<([u8; 16], [u8; 32])>> {
736 let mut stmt = self
737 .db
738 .prepare("SELECT id, text_hash FROM chunks ORDER BY id")
739 .map_err(|e| CPError::Database(e.to_string()))?;
740
741 let results = stmt
742 .query_map([], |row| {
743 let id_bytes: Vec<u8> = row.get(0)?;
744 let hash_bytes: Vec<u8> = row.get(1)?;
745
746 let mut id = [0u8; 16];
747 id.copy_from_slice(&id_bytes);
748 let mut hash = [0u8; 32];
749 hash.copy_from_slice(&hash_bytes);
750
751 Ok((id, hash))
752 })
753 .map_err(|e| CPError::Database(e.to_string()))?
754 .collect::<std::result::Result<Vec<_>, _>>()
755 .map_err(|e| CPError::Database(e.to_string()))?;
756
757 Ok(results)
758 }
759
760 pub fn compute_chunk_tree_root(&self) -> Result<[u8; 32]> {
765 let sorted = self.get_sorted_chunk_hashes()?;
766 let hashes: Vec<[u8; 32]> = sorted.iter().map(|(_, h)| *h).collect();
767 Ok(cp_core::state::compute_merkle_root(&hashes))
768 }
769
770 pub fn compute_merkle_root(&self) -> Result<[u8; 32]> {
777 let mut hasher = blake3::Hasher::new();
778
779 let mut docs = self.get_all_documents()?;
781 docs.sort_by_key(|d| d.id);
782 for doc in docs {
783 hasher.update(doc.id.as_bytes());
784 hasher.update(&doc.hash);
785 hasher.update(&doc.hierarchical_hash);
786 }
788
789 let mut stmt = self.db.prepare("SELECT id, text_hash FROM chunks ORDER BY id")
793 .map_err(|e| CPError::Database(e.to_string()))?;
794 let chunk_iter = stmt.query_map([], |row| {
795 let id_bytes: Vec<u8> = row.get(0)?;
796 let hash_bytes: Vec<u8> = row.get(1)?;
797 Ok((id_bytes, hash_bytes))
798 }).map_err(|e| CPError::Database(e.to_string()))?;
799
800 for chunk in chunk_iter {
801 let (id, hash) = chunk.map_err(|e| CPError::Database(e.to_string()))?;
802 hasher.update(&id);
803 hasher.update(&hash);
804 }
805
806 let mut stmt = self.db.prepare("SELECT id, vector, model_hash FROM embeddings ORDER BY id")
808 .map_err(|e| CPError::Database(e.to_string()))?;
809 let emb_iter = stmt.query_map([], |row| {
810 let id_bytes: Vec<u8> = row.get(0)?;
811 let vec_bytes: Vec<u8> = row.get(1)?;
812 let model_hash: Vec<u8> = row.get(2)?;
813 Ok((id_bytes, vec_bytes, model_hash))
814 }).map_err(|e| CPError::Database(e.to_string()))?;
815
816 for emb in emb_iter {
817 let (id, vec, model) = emb.map_err(|e| CPError::Database(e.to_string()))?;
818 hasher.update(&id);
819 hasher.update(&vec);
820 hasher.update(&model);
821 }
822
823 let mut stmt = self.db.prepare("SELECT source, target, kind, weight FROM edges ORDER BY source, target, kind")
825 .map_err(|e| CPError::Database(e.to_string()))?;
826 let edge_iter = stmt.query_map([], |row| {
827 let s: Vec<u8> = row.get(0)?;
828 let t: Vec<u8> = row.get(1)?;
829 let k: i32 = row.get(2)?;
830 let w: Option<f64> = row.get(3)?;
831 Ok((s, t, k, w))
832 }).map_err(|e| CPError::Database(e.to_string()))?;
833
834 for edge in edge_iter {
835 let (s, t, k, w) = edge.map_err(|e| CPError::Database(e.to_string()))?;
836 hasher.update(&s);
837 hasher.update(&t);
838 hasher.update(&k.to_le_bytes());
839 if let Some(weight) = w {
840 hasher.update(&weight.to_le_bytes());
841 }
842 }
843
844 Ok(*hasher.finalize().as_bytes())
845 }
846
847 pub fn get_state_root_by_hash(&self, hash: &[u8; 32]) -> Result<Option<cp_core::StateRoot>> {
849 self.db
850 .query_row(
851 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
852 FROM state_roots WHERE hash = ?1",
853 params![hash.as_slice()],
854 |row| {
855 let hash_bytes: Vec<u8> = row.get(0)?;
856 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
857 let hlc_wall_ms: i64 = row.get(2)?;
858 let hlc_counter: i32 = row.get(3)?;
859 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
860 let device_id_bytes: Vec<u8> = row.get(5)?;
861 let signature_bytes: Vec<u8> = row.get(6)?;
862 let seq: i64 = row.get(7)?;
863
864 let mut hash = [0u8; 32];
865 hash.copy_from_slice(&hash_bytes);
866
867 let parent = parent_bytes.map(|b| {
868 let mut p = [0u8; 32];
869 p.copy_from_slice(&b);
870 p
871 });
872
873 let mut hlc_node_id = [0u8; 16];
874 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
875 let hlc = cp_core::Hlc {
876 wall_ms: hlc_wall_ms as u64,
877 counter: hlc_counter as u16,
878 node_id: hlc_node_id,
879 };
880
881 let device_id = uuid_from_bytes(&device_id_bytes)?;
882
883 let mut signature = [0u8; 64];
884 signature.copy_from_slice(&signature_bytes);
885
886 Ok(cp_core::StateRoot {
887 hash,
888 parent,
889 hlc,
890 device_id,
891 signature,
892 seq: seq as u64,
893 })
894 },
895 )
896 .optional()
897 .map_err(|e| CPError::Database(e.to_string()))
898 }
899
900 pub fn get_latest_root(&self) -> Result<Option<cp_core::StateRoot>> {
902 use rusqlite::OptionalExtension;
903 self.db
904 .query_row(
905 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
906 FROM state_roots ORDER BY seq DESC LIMIT 1",
907 [],
908 |row| {
909 let hash_bytes: Vec<u8> = row.get(0)?;
910 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
911 let hlc_wall_ms: i64 = row.get(2)?;
912 let hlc_counter: i32 = row.get(3)?;
913 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
914 let device_id_bytes: Vec<u8> = row.get(5)?;
915 let signature_bytes: Vec<u8> = row.get(6)?;
916 let seq: i64 = row.get(7)?;
917
918 let mut hash = [0u8; 32];
919 hash.copy_from_slice(&hash_bytes);
920
921 let parent = parent_bytes.map(|b| {
922 let mut p = [0u8; 32];
923 p.copy_from_slice(&b);
924 p
925 });
926
927 let mut hlc_node_id = [0u8; 16];
928 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
929 let hlc = cp_core::Hlc {
930 wall_ms: hlc_wall_ms as u64,
931 counter: hlc_counter as u16,
932 node_id: hlc_node_id,
933 };
934
935 let device_id = uuid_from_bytes(&device_id_bytes)?;
936
937 let mut signature = [0u8; 64];
938 signature.copy_from_slice(&signature_bytes);
939
940 Ok(cp_core::StateRoot {
941 hash,
942 parent,
943 hlc,
944 device_id,
945 signature,
946 seq: seq as u64,
947 })
948 },
949 )
950 .optional()
951 .map_err(|e| CPError::Database(e.to_string()))
952 }
953
954 pub fn set_latest_root(&mut self, root: &cp_core::StateRoot) -> Result<()> {
956 self.db
957 .execute(
958 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
959 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
960 params![
961 root.hash.as_slice(),
962 root.parent.as_ref().map(|p| p.as_slice()),
963 root.hlc.wall_ms as i64,
964 root.hlc.counter as i32,
965 root.hlc.node_id.as_slice(),
966 root.device_id.as_bytes().as_slice(),
967 root.signature.as_slice(),
968 root.seq as i64,
969 ],
970 )
971 .map_err(|e| CPError::Database(e.to_string()))?;
972
973 Ok(())
974 }
975
976 pub fn apply_diff(&mut self, diff: &cp_core::CognitiveDiff) -> Result<()> {
983 let latest_root = self.get_latest_root()?;
985
986 let (resolved_diff, has_conflict) = self.resolve_conflicts(diff, latest_root.as_ref())?;
988
989 if has_conflict {
990 info!("Conflict detected and resolved using LWW/HLC");
991 }
992
993 let tx = self.db.transaction().map_err(|e| CPError::Database(e.to_string()))?;
994
995 for id in &resolved_diff.removed_doc_ids {
997 tx.execute("DELETE FROM documents WHERE id = ?1", params![id.as_bytes().as_slice()])
998 .map_err(|e| CPError::Database(e.to_string()))?;
999 }
1000 for id in &resolved_diff.removed_chunk_ids {
1001 tx.execute("DELETE FROM chunks WHERE id = ?1", params![id.as_bytes().as_slice()])
1002 .map_err(|e| CPError::Database(e.to_string()))?;
1003 }
1004 for id in &resolved_diff.removed_embedding_ids {
1005 tx.execute("DELETE FROM embeddings WHERE id = ?1", params![id.as_bytes().as_slice()])
1006 .map_err(|e| CPError::Database(e.to_string()))?;
1007 }
1008
1009 if !resolved_diff.removed_embedding_ids.is_empty() {
1011 self.hnsw.invalidate();
1012 }
1013 for (source, target, kind) in &resolved_diff.removed_edges {
1014 tx.execute(
1015 "DELETE FROM edges WHERE source = ?1 AND target = ?2 AND kind = ?3",
1016 params![source.as_bytes().as_slice(), target.as_bytes().as_slice(), *kind as i32],
1017 )
1018 .map_err(|e| CPError::Database(e.to_string()))?;
1019 }
1020
1021 for doc in &resolved_diff.added_docs {
1023 tx.execute(
1024 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx)
1025 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1026 params![
1027 doc.id.as_bytes().as_slice(),
1028 doc.path.to_string_lossy().as_ref(),
1029 doc.hash.as_slice(),
1030 doc.hierarchical_hash.as_slice(),
1031 doc.mtime,
1032 doc.size as i64,
1033 &doc.mime_type,
1034 doc.path_id.as_bytes().as_slice(),
1035 doc.arweave_tx.as_deref(),
1036 ],
1037 ).map_err(|e| CPError::Database(e.to_string()))?;
1038 }
1039
1040 for chunk in &resolved_diff.added_chunks {
1041 tx.execute(
1042 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
1043 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1044 params![
1045 chunk.id.as_bytes().as_slice(),
1046 chunk.doc_id.as_bytes().as_slice(),
1047 &chunk.text,
1048 chunk.byte_offset as i64,
1049 chunk.byte_length as i64,
1050 chunk.sequence,
1051 chunk.text_hash.as_slice(),
1052 ],
1053 ).map_err(|e| CPError::Database(e.to_string()))?;
1054 }
1055
1056 for emb in &resolved_diff.added_embeddings {
1057 let vector_bytes: Vec<u8> = emb.vector.iter().flat_map(|f| f.to_le_bytes()).collect();
1058 tx.execute(
1059 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim, l2_norm)
1060 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1061 params![
1062 emb.id.as_bytes().as_slice(),
1063 emb.chunk_id.as_bytes().as_slice(),
1064 &vector_bytes,
1065 emb.model_hash.as_slice(),
1066 emb.dim as i32,
1067 emb.l2_norm,
1068 ],
1069 ).map_err(|e| CPError::Database(e.to_string()))?;
1070 }
1071
1072 for edge in &resolved_diff.added_edges {
1073 tx.execute(
1074 "INSERT OR REPLACE INTO edges (source, target, kind, weight) VALUES (?1, ?2, ?3, ?4)",
1075 params![
1076 edge.source.as_bytes().as_slice(),
1077 edge.target.as_bytes().as_slice(),
1078 edge.kind as i32,
1079 edge.weight,
1080 ],
1081 ).map_err(|e| CPError::Database(e.to_string()))?;
1082 }
1083
1084 tx.execute(
1086 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
1087 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1088 params![
1089 resolved_diff.metadata.new_root.as_slice(),
1090 if resolved_diff.metadata.prev_root == [0u8; 32] { None } else { Some(resolved_diff.metadata.prev_root.as_slice()) },
1091 resolved_diff.metadata.hlc.wall_ms as i64,
1092 resolved_diff.metadata.hlc.counter as i32,
1093 resolved_diff.metadata.hlc.node_id.as_slice(),
1094 resolved_diff.metadata.device_id.as_bytes().as_slice(),
1095 [0u8; 64].as_slice(),
1096 resolved_diff.metadata.seq as i64,
1097 ],
1098 ).map_err(|e| CPError::Database(e.to_string()))?;
1099
1100 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
1101
1102 for emb in &resolved_diff.added_embeddings {
1104 let vec_f32 = emb.to_f32();
1105 let _ = self.hnsw.insert(emb.id, vec_f32);
1106 }
1107
1108 self.hnsw.invalidate();
1110
1111 Ok(())
1112 }
1113
1114 fn resolve_conflicts(
1122 &self,
1123 incoming_diff: &cp_core::CognitiveDiff,
1124 current_root: Option<&cp_core::StateRoot>,
1125 ) -> Result<(cp_core::CognitiveDiff, bool)> {
1126 let mut has_conflict = false;
1127
1128 let current_root = match current_root {
1130 Some(root) => root,
1131 None => return Ok((incoming_diff.clone(), false)),
1132 };
1133
1134 let incoming_hlc = &incoming_diff.metadata.hlc;
1136 let current_hlc = ¤t_root.hlc;
1137
1138 let incoming_is_newer = incoming_hlc > current_hlc;
1140
1141 if !incoming_is_newer && incoming_hlc != current_hlc {
1143 info!(
1144 "Conflict: incoming diff HLC {:?} is older than current {:?}",
1145 incoming_hlc, current_hlc
1146 );
1147 has_conflict = true;
1148 return Ok((cp_core::CognitiveDiff::empty(
1150 current_root.hash,
1151 incoming_diff.metadata.device_id,
1152 incoming_diff.metadata.seq,
1153 incoming_diff.metadata.hlc.clone(),
1154 ), has_conflict));
1155 }
1156
1157 if incoming_hlc == current_hlc {
1159 let incoming_hash = blake3::hash(&incoming_diff.metadata.new_root);
1160 let current_hash = blake3::hash(¤t_root.hash);
1161
1162 if incoming_hash.as_bytes() <= current_hash.as_bytes() {
1164 info!("Conflict: equal HLC, current hash wins (tiebreaker)");
1165 has_conflict = true;
1166 return Ok((cp_core::CognitiveDiff::empty(
1167 current_root.hash,
1168 incoming_diff.metadata.device_id,
1169 incoming_diff.metadata.seq,
1170 incoming_diff.metadata.hlc.clone(),
1171 ), has_conflict));
1172 }
1173 }
1174
1175 Ok((incoming_diff.clone(), has_conflict))
1177 }
1178
1179 pub fn clear_state_roots(&mut self) -> Result<()> {
1181 self.db.execute_batch("DELETE FROM state_roots;")
1182 .map_err(|e| CPError::Database(e.to_string()))?;
1183 info!("Cleared state roots");
1184 Ok(())
1185 }
1186
1187 pub fn clear_all(&mut self) -> Result<()> {
1189 self.db.execute_batch(
1190 r#"
1191 DELETE FROM embeddings;
1192 DELETE FROM chunks;
1193 DELETE FROM documents;
1194 DELETE FROM edges;
1195 DELETE FROM state_roots;
1196 "#,
1197 )
1198 .map_err(|e| CPError::Database(e.to_string()))?;
1199
1200 self.hnsw.clear()?;
1202
1203 info!("Cleared all data from graph store for fresh sync");
1204 Ok(())
1205 }
1206
1207 pub fn stats(&self) -> Result<GraphStats> {
1209 let doc_count: i64 = self
1210 .db
1211 .query_row("SELECT COUNT(*) FROM documents", [], |row| row.get(0))
1212 .map_err(|e| CPError::Database(e.to_string()))?;
1213
1214 let chunk_count: i64 = self
1215 .db
1216 .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
1217 .map_err(|e| CPError::Database(e.to_string()))?;
1218
1219 let embedding_count: i64 = self
1220 .db
1221 .query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))
1222 .map_err(|e| CPError::Database(e.to_string()))?;
1223
1224 let edge_count: i64 = self
1225 .db
1226 .query_row("SELECT COUNT(*) FROM edges", [], |row| row.get(0))
1227 .map_err(|e| CPError::Database(e.to_string()))?;
1228
1229 Ok(GraphStats {
1230 documents: doc_count as usize,
1231 chunks: chunk_count as usize,
1232 embeddings: embedding_count as usize,
1233 edges: edge_count as usize,
1234 })
1235 }
1236}
1237
1238#[derive(Debug, Clone)]
1240pub struct GraphStats {
1241 pub documents: usize,
1242 pub chunks: usize,
1243 pub embeddings: usize,
1244 pub edges: usize,
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use super::*;
1250 use std::path::PathBuf;
1251
1252 fn fresh_store() -> GraphStore {
1253 GraphStore::in_memory().unwrap()
1254 }
1255
1256 fn test_hlc() -> cp_core::Hlc {
1257 cp_core::Hlc::new(1234567890, [0u8; 16])
1258 }
1259
1260 #[test]
1263 fn test_graph_store_new() {
1264 let store = GraphStore::in_memory().unwrap();
1265 assert!(store.stats().is_ok());
1266 }
1267
1268 #[test]
1269 fn test_graph_store_in_memory() {
1270 let store = GraphStore::in_memory().unwrap();
1271 let stats = store.stats().unwrap();
1272 assert_eq!(stats.documents, 0);
1273 assert_eq!(stats.chunks, 0);
1274 assert_eq!(stats.embeddings, 0);
1275 assert_eq!(stats.edges, 0);
1276 }
1277
1278 #[test]
1279 fn test_graph_store_insert_document() {
1280 let mut store = fresh_store();
1281 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1282 store.insert_document(&doc).unwrap();
1283
1284 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1285 assert_eq!(retrieved.id, doc.id);
1286 assert_eq!(retrieved.path, doc.path);
1287 assert_eq!(retrieved.hash, doc.hash);
1288 }
1289
1290 #[test]
1291 fn test_graph_store_get_document_by_id() {
1292 let mut store = fresh_store();
1293 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1294 store.insert_document(&doc).unwrap();
1295
1296 let retrieved = store.get_document(doc.id).unwrap();
1297 assert!(retrieved.is_some());
1298 assert_eq!(retrieved.unwrap().id, doc.id);
1299 }
1300
1301 #[test]
1302 fn test_graph_store_get_document_by_path() {
1303 let mut store = fresh_store();
1304 let path = PathBuf::from("test.md");
1305 let doc = Document::new(path.clone(), b"Content", 12345);
1306 store.insert_document(&doc).unwrap();
1307
1308 let retrieved = store.get_document_by_path(&path).unwrap();
1309 assert!(retrieved.is_some());
1310 assert_eq!(retrieved.unwrap().path, path);
1311 }
1312
1313 #[test]
1314 fn test_graph_store_document_not_found() {
1315 let store = fresh_store();
1316 let non_existent_id = Uuid::new_v4();
1317 let retrieved = store.get_document(non_existent_id).unwrap();
1318 assert!(retrieved.is_none());
1319
1320 let non_existent_path = PathBuf::from("non_existent.md");
1321 let retrieved_by_path = store.get_document_by_path(&non_existent_path).unwrap();
1322 assert!(retrieved_by_path.is_none());
1323 }
1324
1325 #[test]
1326 fn test_graph_store_update_document() {
1327 let mut store = fresh_store();
1328
1329 let doc = Document::new(PathBuf::from("test.md"), b"Original content", 12345);
1331 let original_id = doc.id;
1332 store.insert_document(&doc).unwrap();
1333
1334 store.insert_document(&doc).unwrap();
1336
1337 let docs = store.get_all_documents().unwrap();
1339 assert_eq!(docs.len(), 1);
1340
1341 let retrieved = store.get_document(original_id).unwrap();
1343 assert!(retrieved.is_some());
1344 }
1345
1346 #[test]
1347 fn test_graph_store_delete_document() {
1348 let mut store = fresh_store();
1349
1350 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1352 store.insert_document(&doc).unwrap();
1353
1354 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1355 store.insert_chunk(&chunk).unwrap();
1356
1357 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1358 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1359 store.insert_embedding(&emb).unwrap();
1360
1361 store.delete_document(doc.id).unwrap();
1363
1364 let retrieved = store.get_document(doc.id).unwrap();
1366 assert!(retrieved.is_none());
1367
1368 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1370 assert!(chunks.is_empty());
1371
1372 let embedding = store.get_embedding(emb.id).unwrap();
1374 assert!(embedding.is_none());
1375 }
1376
1377 #[test]
1378 fn test_graph_store_all_documents() {
1379 let mut store = fresh_store();
1380
1381 let doc1 = Document::new(PathBuf::from("test1.md"), b"Content 1", 12345);
1382 let doc2 = Document::new(PathBuf::from("test2.md"), b"Content 2", 12346);
1383
1384 store.insert_document(&doc1).unwrap();
1385 store.insert_document(&doc2).unwrap();
1386
1387 let all_docs = store.get_all_documents().unwrap();
1388 assert_eq!(all_docs.len(), 2);
1389 }
1390
1391 #[test]
1392 fn test_graph_store_insert_chunk() {
1393 let mut store = fresh_store();
1394
1395 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1396 store.insert_document(&doc).unwrap();
1397
1398 let chunk = Chunk::new(doc.id, "Test chunk text".to_string(), 0, 0);
1399 store.insert_chunk(&chunk).unwrap();
1400
1401 let retrieved = store.get_chunk(chunk.id).unwrap().unwrap();
1402 assert_eq!(retrieved.id, chunk.id);
1403 assert_eq!(retrieved.doc_id, doc.id);
1404 }
1405
1406 #[test]
1407 fn test_graph_store_get_chunk() {
1408 let mut store = fresh_store();
1409
1410 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1411 store.insert_document(&doc).unwrap();
1412
1413 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1414 store.insert_chunk(&chunk).unwrap();
1415
1416 let retrieved = store.get_chunk(chunk.id).unwrap();
1417 assert!(retrieved.is_some());
1418 assert_eq!(retrieved.unwrap().id, chunk.id);
1419 }
1420
1421 #[test]
1422 fn test_graph_store_chunks_for_document() {
1423 let mut store = fresh_store();
1424
1425 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1426 store.insert_document(&doc).unwrap();
1427
1428 let chunk1 = Chunk::new(doc.id, "Chunk 1".to_string(), 0, 10);
1429 let chunk2 = Chunk::new(doc.id, "Chunk 2".to_string(), 10, 20);
1430
1431 store.insert_chunk(&chunk1).unwrap();
1432 store.insert_chunk(&chunk2).unwrap();
1433
1434 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1435 assert_eq!(chunks.len(), 2);
1436 }
1437
1438 #[test]
1439 fn test_graph_store_delete_chunks_for_document() {
1440 let mut store = fresh_store();
1441
1442 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1443 store.insert_document(&doc).unwrap();
1444
1445 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1446 store.insert_chunk(&chunk).unwrap();
1447
1448 store.delete_document(doc.id).unwrap();
1450
1451 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1452 assert!(chunks.is_empty());
1453 }
1454
1455 #[test]
1456 fn test_graph_store_insert_embedding() {
1457 let mut store = fresh_store();
1458
1459 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1460 store.insert_document(&doc).unwrap();
1461
1462 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1463 store.insert_chunk(&chunk).unwrap();
1464
1465 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1466 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1467
1468 store.insert_embedding(&emb).unwrap();
1469
1470 let retrieved = store.get_embedding(emb.id).unwrap().unwrap();
1471 assert_eq!(retrieved.id, emb.id);
1472 }
1473
1474 #[test]
1475 fn test_graph_store_get_embedding() {
1476 let mut store = fresh_store();
1477
1478 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1479 store.insert_document(&doc).unwrap();
1480
1481 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1482 store.insert_chunk(&chunk).unwrap();
1483
1484 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1485 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1486
1487 store.insert_embedding(&emb).unwrap();
1488
1489 let retrieved = store.get_embedding(emb.id).unwrap();
1490 assert!(retrieved.is_some());
1491 assert_eq!(retrieved.unwrap().id, emb.id);
1492 }
1493
1494 #[test]
1495 fn test_graph_store_embeddings_for_chunk() {
1496 let mut store = fresh_store();
1497
1498 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1499 store.insert_document(&doc).unwrap();
1500
1501 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1502 store.insert_chunk(&chunk).unwrap();
1503
1504 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1505 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1506
1507 store.insert_embedding(&emb).unwrap();
1508
1509 let retrieved = store.get_embedding_for_chunk(chunk.id).unwrap();
1510 assert!(retrieved.is_some());
1511 assert_eq!(retrieved.unwrap().chunk_id, chunk.id);
1512 }
1513
1514 #[test]
1515 fn test_graph_store_all_embeddings() {
1516 let mut store = fresh_store();
1517
1518 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1519 store.insert_document(&doc).unwrap();
1520
1521 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1522 store.insert_chunk(&chunk).unwrap();
1523
1524 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1525 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1526
1527 store.insert_embedding(&emb).unwrap();
1528
1529 let all_embeddings = store.get_all_embeddings().unwrap();
1530 assert_eq!(all_embeddings.len(), 1);
1531 }
1532
1533 #[test]
1534 fn test_graph_store_insert_edge() {
1535 let mut store = fresh_store();
1536
1537 let source = Uuid::new_v4();
1538 let target = Uuid::new_v4();
1539 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
1540
1541 store.add_edge(&edge).unwrap();
1542
1543 let edges = store.get_edges(source).unwrap();
1544 assert_eq!(edges.len(), 1);
1545 assert_eq!(edges[0].target, target);
1546 }
1547
1548 #[test]
1549 fn test_graph_store_edges_from() {
1550 let mut store = fresh_store();
1551
1552 let source = Uuid::new_v4();
1553 let target1 = Uuid::new_v4();
1554 let target2 = Uuid::new_v4();
1555
1556 let edge1 = Edge::new(source, target1, EdgeKind::DocToChunk);
1557 let edge2 = Edge::new(source, target2, EdgeKind::ChunkToChunk);
1558
1559 store.add_edge(&edge1).unwrap();
1560 store.add_edge(&edge2).unwrap();
1561
1562 let edges = store.get_edges(source).unwrap();
1563 assert_eq!(edges.len(), 2);
1564 }
1565
1566 #[test]
1567 fn test_graph_store_edges_to() {
1568 let mut store = fresh_store();
1569
1570 let source1 = Uuid::new_v4();
1571 let source2 = Uuid::new_v4();
1572 let target = Uuid::new_v4();
1573
1574 let edge1 = Edge::new(source1, target, EdgeKind::DocToChunk);
1575 let edge2 = Edge::new(source2, target, EdgeKind::ChunkToChunk);
1576
1577 store.add_edge(&edge1).unwrap();
1578 store.add_edge(&edge2).unwrap();
1579
1580 let edges_from_source1 = store.get_edges(source1).unwrap();
1582 assert_eq!(edges_from_source1.len(), 1);
1583
1584 let edges_from_source2 = store.get_edges(source2).unwrap();
1585 assert_eq!(edges_from_source2.len(), 1);
1586 }
1587
1588 #[test]
1589 fn test_graph_store_all_edges() {
1590 let mut store = fresh_store();
1591
1592 let edge1 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::DocToChunk);
1593 let edge2 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::ChunkToChunk);
1594
1595 store.add_edge(&edge1).unwrap();
1596 store.add_edge(&edge2).unwrap();
1597
1598 let stats = store.stats().unwrap();
1600 assert_eq!(stats.edges, 2);
1601 }
1602
1603 #[test]
1604 fn test_graph_store_compute_merkle_root() {
1605 let mut store = fresh_store();
1606
1607 let root1 = store.compute_merkle_root().unwrap();
1608
1609 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
1610 store.insert_document(&doc).unwrap();
1611
1612 let root2 = store.compute_merkle_root().unwrap();
1613
1614 assert_ne!(root1, root2);
1616 }
1617
1618 #[test]
1619 fn test_graph_store_insert_state_root() {
1620 let mut store = fresh_store();
1621
1622 let state_root = cp_core::StateRoot::new([1u8; 32], Some([2u8; 32]), test_hlc(), Uuid::new_v4(), 1);
1623 store.set_latest_root(&state_root).unwrap();
1624
1625 let retrieved = store.get_latest_root().unwrap();
1626 assert!(retrieved.is_some());
1627 assert_eq!(retrieved.unwrap().hash, state_root.hash);
1628 }
1629
1630 #[test]
1631 fn test_graph_store_get_latest_state_root() {
1632 let mut store = fresh_store();
1633
1634 let latest = store.get_latest_root().unwrap();
1636 assert!(latest.is_none());
1637
1638 let state_root = cp_core::StateRoot::new([1u8; 32], None, test_hlc(), Uuid::new_v4(), 0);
1640 store.set_latest_root(&state_root).unwrap();
1641
1642 let retrieved = store.get_latest_root().unwrap();
1643 assert!(retrieved.is_some());
1644 }
1645
1646 #[test]
1647 fn test_graph_store_stats() {
1648 let mut store = fresh_store();
1649
1650 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1651 store.insert_document(&doc).unwrap();
1652
1653 let stats = store.stats().unwrap();
1654 assert_eq!(stats.documents, 1);
1655 assert_eq!(stats.chunks, 0);
1656 assert_eq!(stats.embeddings, 0);
1657 assert_eq!(stats.edges, 0);
1658 }
1659
1660 #[test]
1661 fn test_graph_store_stats_empty() {
1662 let store = fresh_store();
1663
1664 let stats = store.stats().unwrap();
1665 assert_eq!(stats.documents, 0);
1666 assert_eq!(stats.chunks, 0);
1667 assert_eq!(stats.embeddings, 0);
1668 assert_eq!(stats.edges, 0);
1669 }
1670
1671 #[test]
1674 fn test_transaction_begin() {
1675 let store = fresh_store();
1676 assert!(store.stats().is_ok());
1678 }
1679
1680 #[test]
1681 fn test_transaction_commit() {
1682 let mut store = fresh_store();
1683
1684 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1686 store.insert_document(&doc).unwrap();
1687
1688 let retrieved = store.get_document(doc.id).unwrap();
1690 assert!(retrieved.is_some());
1691 }
1692
1693 #[test]
1694 fn test_transaction_rollback() {
1695 let mut store = fresh_store();
1698
1699 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1700
1701 store.insert_document(&doc).unwrap();
1703
1704 let retrieved = store.get_document(doc.id).unwrap();
1706 assert!(retrieved.is_some());
1707 }
1708
1709 #[test]
1710 fn test_transaction_atomicity() {
1711 let mut store = fresh_store();
1712
1713 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1714 store.insert_document(&doc).unwrap();
1715
1716 let stats = store.stats().unwrap();
1717 assert_eq!(stats.documents, 1);
1718 }
1719
1720 #[test]
1721 fn test_transaction_isolation() {
1722 let store = fresh_store();
1723 let stats1 = store.stats().unwrap();
1726 let stats2 = store.stats().unwrap();
1727 assert_eq!(stats1.documents, stats2.documents);
1728 }
1729
1730 #[test]
1733 fn test_fts_search_basic() {
1734 let mut store = fresh_store();
1735
1736 let doc = Document::new(PathBuf::from("test.md"), b"Some test content", 0);
1737 store.insert_document(&doc).unwrap();
1738
1739 let chunk = Chunk::new(doc.id, "Hello world this is a test".to_string(), 0, 0);
1740 store.insert_chunk(&chunk).unwrap();
1741
1742 std::thread::sleep(std::time::Duration::from_millis(100));
1744
1745 let results = store.search_lexical("test", 10).unwrap();
1746 }
1749
1750 #[test]
1751 fn test_fts_search_multiple_terms() {
1752 let mut store = fresh_store();
1753
1754 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1755 store.insert_document(&doc).unwrap();
1756
1757 let chunk = Chunk::new(doc.id, "Hello world test Rust programming".to_string(), 0, 0);
1758 store.insert_chunk(&chunk).unwrap();
1759
1760 std::thread::sleep(std::time::Duration::from_millis(100));
1761
1762 let results = store.search_lexical("hello world", 10).unwrap();
1764 }
1766
1767 #[test]
1768 fn test_fts_search_no_results() {
1769 let mut store = fresh_store();
1770
1771 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1772 store.insert_document(&doc).unwrap();
1773
1774 let chunk = Chunk::new(doc.id, "Some content here".to_string(), 0, 0);
1775 store.insert_chunk(&chunk).unwrap();
1776
1777 std::thread::sleep(std::time::Duration::from_millis(100));
1778
1779 let results = store.search_lexical("nonexistentterm12345", 10).unwrap();
1780 }
1782
1783 #[test]
1784 fn test_fts_search_ranking() {
1785 let mut store = fresh_store();
1786
1787 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1788 store.insert_document(&doc).unwrap();
1789
1790 let chunk1 = Chunk::new(doc.id, "test word test".to_string(), 0, 0);
1791
1792 store.insert_chunk(&chunk1).unwrap();
1793
1794 std::thread::sleep(std::time::Duration::from_millis(100));
1795
1796 let results = store.search_lexical("test", 10);
1798 assert!(results.is_ok());
1800 }
1801
1802 #[test]
1803 fn test_fts_search_unicode() {
1804 let mut store = fresh_store();
1805
1806 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1807 store.insert_document(&doc).unwrap();
1808
1809 let chunk = Chunk::new(doc.id, "Hello 世界 unicode".to_string(), 0, 0);
1810 store.insert_chunk(&chunk).unwrap();
1811
1812 std::thread::sleep(std::time::Duration::from_millis(100));
1813
1814 let results = store.search_lexical("世界", 10).unwrap();
1816 }
1817
1818 #[test]
1819 fn test_fts_trigger_insert() {
1820 let mut store = fresh_store();
1821
1822 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1823 store.insert_document(&doc).unwrap();
1824
1825 let chunk = Chunk::new(doc.id, "Trigger test content".to_string(), 0, 0);
1827 store.insert_chunk(&chunk).unwrap();
1828
1829 std::thread::sleep(std::time::Duration::from_millis(100));
1831
1832 let results = store.search_lexical("trigger", 10).unwrap();
1834 }
1835
1836 #[test]
1837 fn test_fts_trigger_delete() {
1838 let mut store = fresh_store();
1839
1840 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1841 store.insert_document(&doc).unwrap();
1842
1843 let chunk = Chunk::new(doc.id, "Delete test content".to_string(), 0, 0);
1844 store.insert_chunk(&chunk).unwrap();
1845
1846 std::thread::sleep(std::time::Duration::from_millis(100));
1847
1848 store.delete_document(doc.id).unwrap();
1850
1851 std::thread::sleep(std::time::Duration::from_millis(100));
1852
1853 let results = store.search_lexical("delete", 10).unwrap();
1855 }
1856
1857 #[test]
1858 fn test_fts_trigger_update() {
1859 let mut store = fresh_store();
1861
1862 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1863 store.insert_document(&doc).unwrap();
1864
1865 let chunk = Chunk::new(doc.id, "Original content".to_string(), 0, 0);
1866 store.insert_chunk(&chunk).unwrap();
1867
1868 std::thread::sleep(std::time::Duration::from_millis(100));
1869
1870 store.delete_document(doc.id).unwrap();
1872
1873 let updated_chunk = Chunk::new(doc.id, "Updated content".to_string(), 0, 0);
1874 store.insert_document(&doc).unwrap();
1875 store.insert_chunk(&updated_chunk).unwrap();
1876
1877 std::thread::sleep(std::time::Duration::from_millis(100));
1878
1879 let results = store.search_lexical("updated", 10).unwrap();
1881 }
1882
1883 #[test]
1886 fn test_document_roundtrip() {
1887 let mut store = fresh_store();
1888
1889 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1890 store.insert_document(&doc).unwrap();
1891
1892 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1893 assert_eq!(retrieved.id, doc.id);
1894 assert_eq!(retrieved.path, doc.path);
1895 assert_eq!(retrieved.hash, doc.hash);
1896 }
1897
1898 #[test]
1899 fn test_chunk_operations() {
1900 let mut store = fresh_store();
1901
1902 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1903 store.insert_document(&doc).unwrap();
1904
1905 let chunk = Chunk::new(doc.id, "Test chunk text".to_string(), 0, 0);
1907 store.insert_chunk(&chunk).unwrap();
1908
1909 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1910 assert_eq!(chunks.len(), 1);
1911 assert_eq!(chunks[0].text, "Test chunk text\n");
1913 }
1914
1915 #[test]
1916 fn test_embedding_and_search() {
1917 let mut store = fresh_store();
1918
1919 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1920 store.insert_document(&doc).unwrap();
1921
1922 let chunk = Chunk::new(doc.id, "Test text".to_string(), 0, 0);
1923 store.insert_chunk(&chunk).unwrap();
1924
1925 let vector: Vec<f32> = (0..1536).map(|i| (i as f32) * 0.01).collect();
1927 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1928 store.insert_embedding(&emb).unwrap();
1929
1930 let results = store.search(&vector, 1).unwrap();
1932 assert_eq!(results.len(), 1);
1933 assert!(results[0].1 > 0.99); }
1935
1936 #[test]
1937 fn test_edge_operations() {
1938 let mut store = fresh_store();
1939
1940 let source = Uuid::new_v4();
1941 let target = Uuid::new_v4();
1942 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
1943
1944 store.add_edge(&edge).unwrap();
1945
1946 let edges = store.get_edges(source).unwrap();
1947 assert_eq!(edges.len(), 1);
1948 assert_eq!(edges[0].target, target);
1949 }
1950
1951 #[test]
1952 fn test_merkle_root() {
1953 let mut store = fresh_store();
1954
1955 let root1 = store.compute_merkle_root().unwrap();
1956
1957 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
1958 store.insert_document(&doc).unwrap();
1959
1960 let root2 = store.compute_merkle_root().unwrap();
1961
1962 assert_ne!(root1, root2);
1964 }
1965
1966 #[test]
1967 fn test_stats() {
1968 let mut store = fresh_store();
1969
1970 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1971 store.insert_document(&doc).unwrap();
1972
1973 let stats = store.stats().unwrap();
1974 assert_eq!(stats.documents, 1);
1975 assert_eq!(stats.chunks, 0);
1976 }
1977}