1use crate::db;
6use crate::db::{bytes_to_embedding, parse_optional_json, with_transaction};
7#[cfg(feature = "hnsw")]
8use crate::db::{enqueue_pending_index_op, PendingIndexOpKind};
9#[cfg(feature = "hnsw")]
10use crate::episodes;
11use crate::error::MemoryError;
12use crate::quantize::{self, Quantizer};
13use crate::types::{Fact, NamespaceDeleteReport};
14use crate::{merge_trace_ctx, MemoryStore};
15use rusqlite::{params, Connection};
16use stack_ids::TraceCtx;
17
18#[allow(dead_code)]
20pub fn insert_fact_with_fts(
21 conn: &Connection,
22 fact_id: &str,
23 namespace: &str,
24 content: &str,
25 embedding_bytes: &[u8],
26 source: Option<&str>,
27 metadata: Option<&serde_json::Value>,
28) -> Result<(), MemoryError> {
29 insert_fact_with_fts_q8(
30 conn,
31 fact_id,
32 namespace,
33 content,
34 embedding_bytes,
35 None,
36 source,
37 metadata,
38 )
39}
40
41#[allow(clippy::too_many_arguments)]
43pub fn insert_fact_with_fts_q8(
44 conn: &Connection,
45 fact_id: &str,
46 namespace: &str,
47 content: &str,
48 embedding_bytes: &[u8],
49 q8_bytes: Option<&[u8]>,
50 source: Option<&str>,
51 metadata: Option<&serde_json::Value>,
52) -> Result<(), MemoryError> {
53 let metadata_str = metadata.map(|m| m.to_string());
54 with_transaction(conn, |tx| {
55 tx.execute(
56 "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
57 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
58 params![
59 fact_id,
60 namespace,
61 content,
62 source,
63 embedding_bytes,
64 q8_bytes,
65 metadata_str
66 ],
67 )?;
68
69 tx.execute(
70 "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
71 params![fact_id],
72 )?;
73 let fts_rowid = tx.last_insert_rowid();
74
75 tx.execute(
76 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
77 params![fts_rowid, content],
78 )?;
79
80 #[cfg(feature = "hnsw")]
81 enqueue_pending_index_op(
82 tx,
83 &format!("fact:{}", fact_id),
84 "fact",
85 PendingIndexOpKind::Upsert,
86 )?;
87 db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
88
89 Ok(())
90 })
91}
92
93#[allow(clippy::too_many_arguments)]
97pub fn insert_fact_in_tx(
98 tx: &rusqlite::Transaction<'_>,
99 fact_id: &str,
100 namespace: &str,
101 content: &str,
102 embedding_bytes: &[u8],
103 q8_bytes: Option<&[u8]>,
104 source: Option<&str>,
105 metadata: Option<&serde_json::Value>,
106) -> Result<(), MemoryError> {
107 let metadata_str = metadata.map(|m| m.to_string());
108 tx.execute(
109 "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
110 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
111 params![
112 fact_id,
113 namespace,
114 content,
115 source,
116 embedding_bytes,
117 q8_bytes,
118 metadata_str
119 ],
120 )?;
121
122 tx.execute(
123 "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
124 params![fact_id],
125 )?;
126 let fts_rowid = tx.last_insert_rowid();
127
128 tx.execute(
129 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
130 params![fts_rowid, content],
131 )?;
132
133 #[cfg(feature = "hnsw")]
134 enqueue_pending_index_op(
135 tx,
136 &format!("fact:{}", fact_id),
137 "fact",
138 PendingIndexOpKind::Upsert,
139 )?;
140 db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
141
142 Ok(())
143}
144
145pub fn delete_fact_with_fts(conn: &Connection, fact_id: &str) -> Result<(), MemoryError> {
147 with_transaction(conn, |tx| {
148 let fts_rowid: i64 = tx
149 .query_row(
150 "SELECT rowid FROM facts_rowid_map WHERE fact_id = ?1",
151 params![fact_id],
152 |row| row.get(0),
153 )
154 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
155
156 let content: String = tx
157 .query_row(
158 "SELECT content FROM facts WHERE id = ?1",
159 params![fact_id],
160 |row| row.get(0),
161 )
162 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
163
164 tx.execute(
165 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
166 params![fts_rowid, content],
167 )?;
168 tx.execute(
169 "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
170 params![fact_id],
171 )?;
172 tx.execute(
173 "DELETE FROM episode_causes WHERE cause_node_id IN (?1, ?2)",
174 params![fact_id, format!("fact:{fact_id}")],
175 )?;
176 tx.execute(
177 "DELETE FROM derivation_edges
178 WHERE (source_kind = 'fact' AND source_id = ?1)
179 OR (target_kind = 'fact' AND target_id = ?1)",
180 params![fact_id],
181 )?;
182 tx.execute("DELETE FROM facts WHERE id = ?1", params![fact_id])?;
183
184 #[cfg(feature = "hnsw")]
185 enqueue_pending_index_op(
186 tx,
187 &format!("fact:{}", fact_id),
188 "fact",
189 PendingIndexOpKind::Delete,
190 )?;
191 db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
192
193 Ok(())
194 })
195}
196
197pub fn update_fact_with_fts(
199 conn: &Connection,
200 fact_id: &str,
201 new_content: &str,
202 new_embedding_bytes: &[u8],
203 new_q8_bytes: Option<&[u8]>,
204) -> Result<(), MemoryError> {
205 with_transaction(conn, |tx| {
206 let (fts_rowid, old_content): (i64, String) = tx
207 .query_row(
208 "SELECT fm.rowid, f.content
209 FROM facts f
210 JOIN facts_rowid_map fm ON fm.fact_id = f.id
211 WHERE f.id = ?1",
212 params![fact_id],
213 |row| Ok((row.get(0)?, row.get(1)?)),
214 )
215 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
216
217 tx.execute(
218 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
219 params![fts_rowid, old_content],
220 )?;
221
222 tx.execute(
223 "UPDATE facts
224 SET content = ?1,
225 embedding = ?2,
226 embedding_q8 = ?3,
227 updated_at = datetime('now')
228 WHERE id = ?4",
229 params![new_content, new_embedding_bytes, new_q8_bytes, fact_id],
230 )?;
231
232 tx.execute(
233 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
234 params![fts_rowid, new_content],
235 )?;
236 tx.execute(
237 "DELETE FROM derivation_edges
238 WHERE (source_kind = 'fact' AND source_id = ?1)
239 OR (target_kind = 'fact' AND target_id = ?1)",
240 params![fact_id],
241 )?;
242
243 #[cfg(feature = "hnsw")]
244 enqueue_pending_index_op(
245 tx,
246 &format!("fact:{}", fact_id),
247 "fact",
248 PendingIndexOpKind::Upsert,
249 )?;
250 db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
251
252 Ok(())
253 })
254}
255
256pub fn delete_namespace(
258 conn: &Connection,
259 namespace: &str,
260) -> Result<NamespaceDeleteReport, MemoryError> {
261 with_transaction(conn, |tx| {
262 let mut report = NamespaceDeleteReport::default();
263 let delete_session = |session_id: &str| -> Result<(usize, usize), MemoryError> {
264 let message_data: Vec<(i64, String, i64, bool)> = {
265 let mut stmt = tx.prepare(
266 "SELECT m.id, m.content, mm.rowid, m.embedding IS NOT NULL
267 FROM messages m
268 JOIN messages_rowid_map mm ON mm.message_id = m.id
269 WHERE m.session_id = ?1",
270 )?;
271 let rows = stmt.query_map(params![session_id], |row| {
272 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
273 })?;
274 rows.collect::<Result<Vec<_>, _>>()?
275 };
276
277 for (message_id, content, fts_rowid, has_embedding) in &message_data {
278 #[cfg(not(feature = "hnsw"))]
279 let _ = (message_id, has_embedding);
280 tx.execute(
281 "INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', ?1, ?2)",
282 params![fts_rowid, content],
283 )?;
284 #[cfg(feature = "hnsw")]
285 if *has_embedding {
286 enqueue_pending_index_op(
287 tx,
288 &format!("msg:{}", message_id),
289 "message",
290 PendingIndexOpKind::Delete,
291 )?;
292 }
293 }
294
295 let affected = tx.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
296 if affected == 0 {
297 return Err(MemoryError::SessionNotFound(session_id.to_string()));
298 }
299 let hnsw_ops = message_data
300 .iter()
301 .filter(|(_, _, _, has_embedding)| *has_embedding)
302 .count();
303 Ok((message_data.len(), hnsw_ops))
304 };
305
306 let document_ids: Vec<String> = {
307 let mut stmt = tx.prepare("SELECT id FROM documents WHERE namespace = ?1")?;
308 let ids = stmt
309 .query_map(params![namespace], |row| row.get(0))?
310 .collect::<Result<Vec<_>, _>>()?;
311 ids
312 };
313
314 let session_ids: Vec<String> = {
315 let mut stmt = tx.prepare("SELECT id, metadata FROM sessions")?;
316 let rows = stmt.query_map([], |row| {
317 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
318 })?;
319 let mut ids = Vec::new();
320 for row in rows {
321 let (session_id, metadata_raw) = row?;
322 let metadata = parse_optional_json(
323 "sessions",
324 &session_id,
325 "metadata",
326 metadata_raw.as_deref(),
327 )?;
328 let namespace_matches = metadata
329 .as_ref()
330 .and_then(|value| {
331 value
332 .get("namespace")
333 .or_else(|| value.get("scope_namespace"))
334 })
335 .and_then(|value| value.as_str())
336 == Some(namespace);
337 if namespace_matches {
338 ids.push(session_id);
339 }
340 }
341 ids
342 };
343
344 for session_id in &session_ids {
345 let (messages, hnsw_ops) = delete_session(session_id)?;
346 report.messages += messages;
347 report.hnsw_ops += hnsw_ops;
348 }
349 report.sessions = session_ids.len();
350
351 let delete_derivation_edges_for_id = |kind: &str, id: &str| -> Result<(), MemoryError> {
352 tx.execute(
353 "DELETE FROM derivation_edges
354 WHERE (source_kind = ?1 AND source_id = ?2)
355 OR (target_kind = ?1 AND target_id = ?2)",
356 params![kind, id],
357 )?;
358 Ok(())
359 };
360
361 let delete_derivation_edges_for_ids =
362 |kind: &str, ids: &[String]| -> Result<(), MemoryError> {
363 for id in ids {
364 delete_derivation_edges_for_id(kind, id)?;
365 }
366 Ok(())
367 };
368
369 let facts: Vec<(String, i64, String)> = {
370 let mut stmt = tx.prepare(
371 "SELECT f.id, fm.rowid, f.content
372 FROM facts f
373 JOIN facts_rowid_map fm ON fm.fact_id = f.id
374 WHERE f.namespace = ?1",
375 )?;
376 let facts = stmt
377 .query_map(params![namespace], |row| {
378 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
379 })?
380 .collect::<Result<Vec<_>, _>>()?;
381 facts
382 };
383
384 for (fact_id, fts_rowid, content) in &facts {
385 tx.execute(
386 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
387 params![fts_rowid, content],
388 )?;
389 tx.execute(
390 "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
391 params![fact_id],
392 )?;
393
394 #[cfg(feature = "hnsw")]
395 enqueue_pending_index_op(
396 tx,
397 &format!("fact:{}", fact_id),
398 "fact",
399 PendingIndexOpKind::Delete,
400 )?;
401 #[cfg(feature = "hnsw")]
402 {
403 report.hnsw_ops += 1;
404 }
405 }
406 tx.execute("DELETE FROM facts WHERE namespace = ?1", params![namespace])?;
407 report.facts = facts.len();
408
409 for doc_id in &document_ids {
410 let mut stmt = tx.prepare(
411 "SELECT c.id, c.content, cm.rowid
412 FROM chunks c
413 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
414 WHERE c.document_id = ?1",
415 )?;
416 let chunk_rows: Vec<(String, String, i64)> = stmt
417 .query_map(params![doc_id], |row| {
418 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
419 })?
420 .collect::<Result<Vec<_>, _>>()?;
421 report.chunks += chunk_rows.len();
422
423 for (chunk_id, content, fts_rowid) in &chunk_rows {
424 tx.execute(
425 "INSERT INTO chunks_fts(chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
426 params![fts_rowid, content],
427 )?;
428 tx.execute(
429 "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
430 params![chunk_id],
431 )?;
432 #[cfg(feature = "hnsw")]
433 enqueue_pending_index_op(
434 tx,
435 &format!("chunk:{}", chunk_id),
436 "chunk",
437 PendingIndexOpKind::Delete,
438 )?;
439 #[cfg(feature = "hnsw")]
440 {
441 report.hnsw_ops += 1;
442 }
443 }
444
445 tx.execute("DELETE FROM chunks WHERE document_id = ?1", params![doc_id])?;
446 }
447
448 for doc_id in &document_ids {
449 let mut stmt = tx.prepare(
450 "SELECT e.episode_id, e.search_text, erm.rowid
451 FROM episodes e
452 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
453 WHERE e.document_id = ?1",
454 )?;
455 let episode_rows: Vec<(String, String, i64)> = stmt
456 .query_map(params![doc_id], |row| {
457 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
458 })?
459 .collect::<Result<Vec<_>, _>>()?;
460 report.episodes += episode_rows.len();
461
462 for (episode_id, search_text, fts_rowid) in &episode_rows {
463 tx.execute(
464 "INSERT INTO episodes_fts(episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
465 params![fts_rowid, search_text],
466 )?;
467 tx.execute(
468 "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
469 params![episode_id],
470 )?;
471 tx.execute(
472 "DELETE FROM episode_causes WHERE episode_id = ?1",
473 params![episode_id],
474 )?;
475 #[cfg(feature = "hnsw")]
476 enqueue_pending_index_op(
477 tx,
478 &episodes::episode_item_key(episode_id),
479 "episode",
480 PendingIndexOpKind::Delete,
481 )?;
482 #[cfg(feature = "hnsw")]
483 {
484 report.hnsw_ops += 1;
485 }
486 }
487
488 tx.execute(
489 "DELETE FROM episodes WHERE document_id = ?1",
490 params![doc_id],
491 )?;
492 tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
493 }
494 report.documents = document_ids.len();
495
496 let claim_ids: Vec<String> = {
497 let mut stmt =
498 tx.prepare("SELECT claim_id FROM claim_versions WHERE scope_namespace = ?1")?;
499 let ids = stmt
500 .query_map(params![namespace], |row| row.get(0))?
501 .collect::<Result<Vec<_>, _>>()?;
502 ids
503 };
504
505 let claim_version_ids: Vec<String> = {
506 let mut stmt = tx.prepare(
507 "SELECT claim_version_id FROM claim_versions WHERE scope_namespace = ?1",
508 )?;
509 let ids = stmt
510 .query_map(params![namespace], |row| row.get(0))?
511 .collect::<Result<Vec<_>, _>>()?;
512 ids
513 };
514
515 let relation_version_ids: Vec<String> = {
516 let mut stmt = tx.prepare(
517 "SELECT relation_version_id FROM relation_versions WHERE scope_namespace = ?1",
518 )?;
519 let ids = stmt
520 .query_map(params![namespace], |row| row.get(0))?
521 .collect::<Result<Vec<_>, _>>()?;
522 ids
523 };
524
525 let alias_entity_ids: Vec<String> = {
526 let mut stmt = tx.prepare(
527 "SELECT canonical_entity_id FROM entity_aliases WHERE scope_namespace = ?1",
528 )?;
529 let ids = stmt
530 .query_map(params![namespace], |row| row.get(0))?
531 .collect::<Result<Vec<_>, _>>()?;
532 ids
533 };
534
535 let evidence_handles: Vec<String> = {
536 let mut stmt = tx.prepare(
537 "SELECT er.fetch_handle FROM evidence_refs er
538 JOIN projection_import_log pil ON er.source_envelope_id = pil.source_envelope_id
539 WHERE pil.scope_namespace = ?1",
540 )?;
541 let handles = stmt
542 .query_map(params![namespace], |row| row.get(0))?
543 .collect::<Result<Vec<_>, _>>()?;
544 handles
545 };
546
547 let episode_ids: Vec<String> = {
548 let mut stmt = tx.prepare(
549 "SELECT episode_id FROM episode_links
550 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
551 )?;
552 let ids = stmt
553 .query_map(params![namespace], |row| row.get(0))?
554 .collect::<Result<Vec<_>, _>>()?;
555 ids
556 };
557
558 delete_derivation_edges_for_ids("claim", &claim_ids)?;
559 delete_derivation_edges_for_ids("claim_version", &claim_version_ids)?;
560 delete_derivation_edges_for_ids("relation_version", &relation_version_ids)?;
561 delete_derivation_edges_for_ids("entity", &alias_entity_ids)?;
562 delete_derivation_edges_for_ids("evidence_ref", &evidence_handles)?;
563 delete_derivation_edges_for_ids("episode", &episode_ids)?;
564
565 report.projection_rows += tx.execute(
566 "DELETE FROM claim_versions WHERE scope_namespace = ?1",
567 params![namespace],
568 )?;
569 report.projection_rows += tx.execute(
570 "DELETE FROM relation_versions WHERE scope_namespace = ?1",
571 params![namespace],
572 )?;
573 report.projection_rows += tx.execute(
574 "DELETE FROM entity_aliases WHERE scope_namespace = ?1",
575 params![namespace],
576 )?;
577 report.projection_rows += tx.execute(
578 "DELETE FROM evidence_refs
579 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
580 params![namespace],
581 )?;
582 report.projection_rows += tx.execute(
583 "DELETE FROM episode_links
584 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
585 params![namespace],
586 )?;
587 report.projection_rows += tx.execute(
588 "DELETE FROM projection_import_failures WHERE scope_namespace = ?1",
589 params![namespace],
590 )?;
591 report.projection_rows += tx.execute(
592 "DELETE FROM projection_import_log WHERE scope_namespace = ?1",
593 params![namespace],
594 )?;
595
596 Ok(report)
597 })
598}
599
600pub fn get_fact(conn: &Connection, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
602 let result = conn.query_row(
603 "SELECT id, namespace, content, source, created_at, updated_at, metadata
604 FROM facts WHERE id = ?1",
605 params![fact_id],
606 |row| {
607 Ok((
608 row.get::<_, String>(0)?,
609 row.get::<_, String>(1)?,
610 row.get::<_, String>(2)?,
611 row.get::<_, Option<String>>(3)?,
612 row.get::<_, String>(4)?,
613 row.get::<_, String>(5)?,
614 row.get::<_, Option<String>>(6)?,
615 ))
616 },
617 );
618
619 match result {
620 Ok((id, namespace, content, source, created_at, updated_at, metadata_raw)) => {
621 Ok(Some(Fact {
622 metadata: parse_optional_json("facts", &id, "metadata", metadata_raw.as_deref())?,
623 id,
624 namespace,
625 content,
626 source,
627 created_at,
628 updated_at,
629 }))
630 }
631 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
632 Err(err) => Err(MemoryError::Database(err)),
633 }
634}
635
636pub fn get_fact_embedding(
638 conn: &Connection,
639 fact_id: &str,
640) -> Result<Option<Vec<f32>>, MemoryError> {
641 let result: Result<Option<Vec<u8>>, _> = conn.query_row(
642 "SELECT embedding FROM facts WHERE id = ?1",
643 params![fact_id],
644 |row| row.get(0),
645 );
646
647 match result {
648 Ok(Some(bytes)) => Ok(Some(bytes_to_embedding(&bytes)?)),
649 Ok(None) => Ok(None),
650 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
651 Err(err) => Err(MemoryError::Database(err)),
652 }
653}
654
655pub fn list_facts(
657 conn: &Connection,
658 namespace: &str,
659 limit: usize,
660 offset: usize,
661) -> Result<Vec<Fact>, MemoryError> {
662 let mut stmt = conn.prepare(
663 "SELECT id, namespace, content, source, created_at, updated_at, metadata
664 FROM facts
665 WHERE namespace = ?1
666 ORDER BY updated_at DESC
667 LIMIT ?2 OFFSET ?3",
668 )?;
669
670 let facts = stmt
671 .query_map(params![namespace, limit as i64, offset as i64], |row| {
672 Ok((
673 row.get::<_, String>(0)?,
674 row.get::<_, String>(1)?,
675 row.get::<_, String>(2)?,
676 row.get::<_, Option<String>>(3)?,
677 row.get::<_, String>(4)?,
678 row.get::<_, String>(5)?,
679 row.get::<_, Option<String>>(6)?,
680 ))
681 })?
682 .collect::<Result<Vec<_>, _>>()?
683 .into_iter()
684 .map(
685 |(id, namespace, content, source, created_at, updated_at, metadata_raw)| {
686 Ok(Fact {
687 metadata: parse_optional_json(
688 "facts",
689 &id,
690 "metadata",
691 metadata_raw.as_deref(),
692 )?,
693 id,
694 namespace,
695 content,
696 source,
697 created_at,
698 updated_at,
699 })
700 },
701 )
702 .collect::<Result<Vec<_>, MemoryError>>()?;
703
704 Ok(facts)
705}
706
707impl MemoryStore {
708 pub async fn add_fact(
710 &self,
711 namespace: &str,
712 content: &str,
713 source: Option<&str>,
714 metadata: Option<serde_json::Value>,
715 ) -> Result<String, MemoryError> {
716 self.add_fact_with_trace(namespace, content, source, metadata, None)
717 .await
718 }
719
720 pub async fn add_fact_with_trace(
722 &self,
723 namespace: &str,
724 content: &str,
725 source: Option<&str>,
726 metadata: Option<serde_json::Value>,
727 trace_ctx: Option<&TraceCtx>,
728 ) -> Result<String, MemoryError> {
729 self.validate_content("fact.content", content)?;
730
731 let embedding = self.embed_text_internal(content).await?;
732 self.validate_embedding_dimensions(&embedding)?;
733 let embedding_bytes = db::embedding_to_bytes(&embedding);
734 let fact_id = uuid::Uuid::new_v4().to_string();
735 let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
736
737 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
738 let q8_bytes = quantizer
740 .quantize(&embedding)
741 .map(|qv| quantize::pack_quantized(&qv))
742 .ok();
743
744 let ns = namespace.to_string();
745 let ct = content.to_string();
746 let fid = fact_id.clone();
747 let src = source.map(|s| s.to_string());
748 let meta = merge_trace_ctx(metadata, trace_ctx);
749 self.with_write_conn(move |conn| {
750 let current_count: usize = conn.query_row(
751 "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
752 rusqlite::params![&ns],
753 |row| row.get(0),
754 )?;
755 if current_count >= max_facts_per_namespace {
756 return Err(MemoryError::NamespaceFull {
757 namespace: ns.clone(),
758 count: current_count,
759 limit: max_facts_per_namespace,
760 });
761 }
762 insert_fact_with_fts_q8(
763 conn,
764 &fid,
765 &ns,
766 &ct,
767 &embedding_bytes,
768 q8_bytes.as_deref(),
769 src.as_deref(),
770 meta.as_ref(),
771 )
772 })
773 .await?;
774
775 #[cfg(feature = "hnsw")]
776 self.sync_pending_hnsw_ops_best_effort("add_fact").await;
777
778 Ok(fact_id)
779 }
780
781 pub async fn add_fact_with_embedding(
783 &self,
784 namespace: &str,
785 content: &str,
786 embedding: &[f32],
787 source: Option<&str>,
788 metadata: Option<serde_json::Value>,
789 ) -> Result<String, MemoryError> {
790 self.add_fact_with_embedding_and_trace(
791 namespace, content, embedding, source, metadata, None,
792 )
793 .await
794 }
795
796 pub async fn add_fact_with_embedding_and_trace(
798 &self,
799 namespace: &str,
800 content: &str,
801 embedding: &[f32],
802 source: Option<&str>,
803 metadata: Option<serde_json::Value>,
804 trace_ctx: Option<&TraceCtx>,
805 ) -> Result<String, MemoryError> {
806 self.validate_content("fact.content", content)?;
807 self.validate_embedding_dimensions(embedding)?;
808 let embedding_bytes = db::embedding_to_bytes(embedding);
809 let fact_id = uuid::Uuid::new_v4().to_string();
810 let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
811
812 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
813 let q8_bytes = quantizer
815 .quantize(embedding)
816 .map(|qv| quantize::pack_quantized(&qv))
817 .ok();
818
819 let ns = namespace.to_string();
820 let ct = content.to_string();
821 let fid = fact_id.clone();
822 let src = source.map(|s| s.to_string());
823 let meta = merge_trace_ctx(metadata, trace_ctx);
824 self.with_write_conn(move |conn| {
825 let current_count: usize = conn.query_row(
826 "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
827 rusqlite::params![&ns],
828 |row| row.get(0),
829 )?;
830 if current_count >= max_facts_per_namespace {
831 return Err(MemoryError::NamespaceFull {
832 namespace: ns.clone(),
833 count: current_count,
834 limit: max_facts_per_namespace,
835 });
836 }
837 insert_fact_with_fts_q8(
838 conn,
839 &fid,
840 &ns,
841 &ct,
842 &embedding_bytes,
843 q8_bytes.as_deref(),
844 src.as_deref(),
845 meta.as_ref(),
846 )
847 })
848 .await?;
849
850 #[cfg(feature = "hnsw")]
851 self.sync_pending_hnsw_ops_best_effort("add_fact_with_embedding")
852 .await;
853
854 Ok(fact_id)
855 }
856
857 pub async fn update_fact(&self, fact_id: &str, content: &str) -> Result<(), MemoryError> {
859 self.validate_content("fact.content", content)?;
860 let embedding = self.embed_text_internal(content).await?;
861 self.validate_embedding_dimensions(&embedding)?;
862 let embedding_bytes = db::embedding_to_bytes(&embedding);
863 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
865 .quantize(&embedding)
866 .map(|qv| quantize::pack_quantized(&qv))
867 .ok();
868
869 let fid = fact_id.to_string();
870 let ct = content.to_string();
871 self.with_write_conn(move |conn| {
872 update_fact_with_fts(conn, &fid, &ct, &embedding_bytes, q8_bytes.as_deref())
873 })
874 .await?;
875
876 #[cfg(feature = "hnsw")]
877 self.sync_pending_hnsw_ops_best_effort("update_fact").await;
878
879 Ok(())
880 }
881
882 pub async fn delete_fact(&self, fact_id: &str) -> Result<(), MemoryError> {
884 let fid = fact_id.to_string();
885 self.with_write_conn(move |conn| delete_fact_with_fts(conn, &fid))
886 .await?;
887
888 #[cfg(feature = "hnsw")]
889 self.sync_pending_hnsw_ops_best_effort("delete_fact").await;
890
891 Ok(())
892 }
893
894 pub async fn delete_namespace(
896 &self,
897 namespace: &str,
898 ) -> Result<NamespaceDeleteReport, MemoryError> {
899 let ns = namespace.to_string();
900 let count = self
901 .with_write_conn(move |conn| delete_namespace(conn, &ns))
902 .await?;
903
904 #[cfg(feature = "hnsw")]
905 self.sync_pending_hnsw_ops_best_effort("delete_namespace")
906 .await;
907
908 Ok(count)
909 }
910
911 pub async fn get_fact(&self, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
913 let fid = fact_id.to_string();
914 self.with_read_conn(move |conn| get_fact(conn, &fid)).await
915 }
916
917 pub async fn get_fact_embedding(&self, fact_id: &str) -> Result<Option<Vec<f32>>, MemoryError> {
919 let fid = fact_id.to_string();
920 self.with_read_conn(move |conn| get_fact_embedding(conn, &fid))
921 .await
922 }
923
924 pub async fn list_facts(
926 &self,
927 namespace: &str,
928 limit: usize,
929 offset: usize,
930 ) -> Result<Vec<Fact>, MemoryError> {
931 let ns = namespace.to_string();
932 self.with_read_conn(move |conn| list_facts(conn, &ns, limit, offset))
933 .await
934 }
935}