1use crate::db;
6use crate::db::{bytes_to_embedding, parse_optional_json, with_transaction};
7#[cfg(feature = "hnsw")]
8use crate::db::{enqueue_pending_index_op, PendingIndexOpKind};
9use crate::episodes;
10use crate::error::MemoryError;
11use crate::quantize::{self, Quantizer};
12use crate::types::Fact;
13use crate::{merge_trace_ctx, MemoryStore};
14use rusqlite::{params, Connection};
15use stack_ids::TraceCtx;
16
17#[allow(dead_code)]
19pub fn insert_fact_with_fts(
20 conn: &Connection,
21 fact_id: &str,
22 namespace: &str,
23 content: &str,
24 embedding_bytes: &[u8],
25 source: Option<&str>,
26 metadata: Option<&serde_json::Value>,
27) -> Result<(), MemoryError> {
28 insert_fact_with_fts_q8(
29 conn,
30 fact_id,
31 namespace,
32 content,
33 embedding_bytes,
34 None,
35 source,
36 metadata,
37 )
38}
39
40#[allow(clippy::too_many_arguments)]
42pub fn insert_fact_with_fts_q8(
43 conn: &Connection,
44 fact_id: &str,
45 namespace: &str,
46 content: &str,
47 embedding_bytes: &[u8],
48 q8_bytes: Option<&[u8]>,
49 source: Option<&str>,
50 metadata: Option<&serde_json::Value>,
51) -> Result<(), MemoryError> {
52 let metadata_str = metadata.map(|m| m.to_string());
53 with_transaction(conn, |tx| {
54 tx.execute(
55 "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
56 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
57 params![
58 fact_id,
59 namespace,
60 content,
61 source,
62 embedding_bytes,
63 q8_bytes,
64 metadata_str
65 ],
66 )?;
67
68 tx.execute(
69 "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
70 params![fact_id],
71 )?;
72 let fts_rowid = tx.last_insert_rowid();
73
74 tx.execute(
75 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
76 params![fts_rowid, content],
77 )?;
78
79 #[cfg(feature = "hnsw")]
80 enqueue_pending_index_op(
81 tx,
82 &format!("fact:{}", fact_id),
83 "fact",
84 PendingIndexOpKind::Upsert,
85 )?;
86
87 Ok(())
88 })
89}
90
91#[allow(clippy::too_many_arguments)]
95pub fn insert_fact_in_tx(
96 tx: &rusqlite::Transaction<'_>,
97 fact_id: &str,
98 namespace: &str,
99 content: &str,
100 embedding_bytes: &[u8],
101 q8_bytes: Option<&[u8]>,
102 source: Option<&str>,
103 metadata: Option<&serde_json::Value>,
104) -> Result<(), MemoryError> {
105 let metadata_str = metadata.map(|m| m.to_string());
106 tx.execute(
107 "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
108 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
109 params![
110 fact_id,
111 namespace,
112 content,
113 source,
114 embedding_bytes,
115 q8_bytes,
116 metadata_str
117 ],
118 )?;
119
120 tx.execute(
121 "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
122 params![fact_id],
123 )?;
124 let fts_rowid = tx.last_insert_rowid();
125
126 tx.execute(
127 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
128 params![fts_rowid, content],
129 )?;
130
131 #[cfg(feature = "hnsw")]
132 enqueue_pending_index_op(
133 tx,
134 &format!("fact:{}", fact_id),
135 "fact",
136 PendingIndexOpKind::Upsert,
137 )?;
138
139 Ok(())
140}
141
142pub fn delete_fact_with_fts(conn: &Connection, fact_id: &str) -> Result<(), MemoryError> {
144 with_transaction(conn, |tx| {
145 let fts_rowid: i64 = tx
146 .query_row(
147 "SELECT rowid FROM facts_rowid_map WHERE fact_id = ?1",
148 params![fact_id],
149 |row| row.get(0),
150 )
151 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
152
153 let content: String = tx
154 .query_row(
155 "SELECT content FROM facts WHERE id = ?1",
156 params![fact_id],
157 |row| row.get(0),
158 )
159 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
160
161 tx.execute(
162 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
163 params![fts_rowid, content],
164 )?;
165 tx.execute(
166 "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
167 params![fact_id],
168 )?;
169 tx.execute("DELETE FROM facts WHERE id = ?1", params![fact_id])?;
170
171 #[cfg(feature = "hnsw")]
172 enqueue_pending_index_op(
173 tx,
174 &format!("fact:{}", fact_id),
175 "fact",
176 PendingIndexOpKind::Delete,
177 )?;
178
179 Ok(())
180 })
181}
182
183pub fn update_fact_with_fts(
185 conn: &Connection,
186 fact_id: &str,
187 new_content: &str,
188 new_embedding_bytes: &[u8],
189 new_q8_bytes: Option<&[u8]>,
190) -> Result<(), MemoryError> {
191 with_transaction(conn, |tx| {
192 let (fts_rowid, old_content): (i64, String) = tx
193 .query_row(
194 "SELECT fm.rowid, f.content
195 FROM facts f
196 JOIN facts_rowid_map fm ON fm.fact_id = f.id
197 WHERE f.id = ?1",
198 params![fact_id],
199 |row| Ok((row.get(0)?, row.get(1)?)),
200 )
201 .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
202
203 tx.execute(
204 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
205 params![fts_rowid, old_content],
206 )?;
207
208 tx.execute(
209 "UPDATE facts
210 SET content = ?1,
211 embedding = ?2,
212 embedding_q8 = ?3,
213 updated_at = datetime('now')
214 WHERE id = ?4",
215 params![new_content, new_embedding_bytes, new_q8_bytes, fact_id],
216 )?;
217
218 tx.execute(
219 "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
220 params![fts_rowid, new_content],
221 )?;
222
223 #[cfg(feature = "hnsw")]
224 enqueue_pending_index_op(
225 tx,
226 &format!("fact:{}", fact_id),
227 "fact",
228 PendingIndexOpKind::Upsert,
229 )?;
230
231 Ok(())
232 })
233}
234
235pub fn delete_namespace(conn: &Connection, namespace: &str) -> Result<usize, MemoryError> {
237 with_transaction(conn, |tx| {
238 let delete_session = |session_id: &str| -> Result<(), MemoryError> {
239 let message_data: Vec<(i64, String, i64, bool)> = {
240 let mut stmt = tx.prepare(
241 "SELECT m.id, m.content, mm.rowid, m.embedding IS NOT NULL
242 FROM messages m
243 JOIN messages_rowid_map mm ON mm.message_id = m.id
244 WHERE m.session_id = ?1",
245 )?;
246 let rows = stmt.query_map(params![session_id], |row| {
247 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
248 })?;
249 rows.collect::<Result<Vec<_>, _>>()?
250 };
251
252 for (message_id, content, fts_rowid, has_embedding) in &message_data {
253 tx.execute(
254 "INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', ?1, ?2)",
255 params![fts_rowid, content],
256 )?;
257 #[cfg(feature = "hnsw")]
258 if *has_embedding {
259 enqueue_pending_index_op(
260 tx,
261 &format!("msg:{}", message_id),
262 "message",
263 PendingIndexOpKind::Delete,
264 )?;
265 }
266 }
267
268 let affected = tx.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
269 if affected == 0 {
270 return Err(MemoryError::SessionNotFound(session_id.to_string()));
271 }
272 Ok(())
273 };
274
275 let document_ids: Vec<String> = {
276 let mut stmt = tx.prepare("SELECT id FROM documents WHERE namespace = ?1")?;
277 let ids = stmt
278 .query_map(params![namespace], |row| row.get(0))?
279 .collect::<Result<Vec<_>, _>>()?;
280 ids
281 };
282
283 let session_ids: Vec<String> = {
284 let mut stmt = tx.prepare("SELECT id, metadata FROM sessions")?;
285 let rows = stmt.query_map([], |row| {
286 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
287 })?;
288 let mut ids = Vec::new();
289 for row in rows {
290 let (session_id, metadata_raw) = row?;
291 let metadata = parse_optional_json(
292 "sessions",
293 &session_id,
294 "metadata",
295 metadata_raw.as_deref(),
296 )?;
297 let namespace_matches = metadata
298 .as_ref()
299 .and_then(|value| {
300 value
301 .get("namespace")
302 .or_else(|| value.get("scope_namespace"))
303 })
304 .and_then(|value| value.as_str())
305 == Some(namespace);
306 if namespace_matches {
307 ids.push(session_id);
308 }
309 }
310 ids
311 };
312
313 for session_id in &session_ids {
314 delete_session(session_id)?;
315 }
316
317 let delete_derivation_edges_for_id = |kind: &str, id: &str| -> Result<(), MemoryError> {
318 tx.execute(
319 "DELETE FROM derivation_edges
320 WHERE (source_kind = ?1 AND source_id = ?2)
321 OR (target_kind = ?1 AND target_id = ?2)",
322 params![kind, id],
323 )?;
324 Ok(())
325 };
326
327 let delete_derivation_edges_for_ids =
328 |kind: &str, ids: &[String]| -> Result<(), MemoryError> {
329 for id in ids {
330 delete_derivation_edges_for_id(kind, id)?;
331 }
332 Ok(())
333 };
334
335 let facts: Vec<(String, i64, String)> = {
336 let mut stmt = tx.prepare(
337 "SELECT f.id, fm.rowid, f.content
338 FROM facts f
339 JOIN facts_rowid_map fm ON fm.fact_id = f.id
340 WHERE f.namespace = ?1",
341 )?;
342 let facts = stmt
343 .query_map(params![namespace], |row| {
344 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
345 })?
346 .collect::<Result<Vec<_>, _>>()?;
347 facts
348 };
349
350 for (fact_id, fts_rowid, content) in &facts {
351 tx.execute(
352 "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
353 params![fts_rowid, content],
354 )?;
355 tx.execute(
356 "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
357 params![fact_id],
358 )?;
359
360 #[cfg(feature = "hnsw")]
361 enqueue_pending_index_op(
362 tx,
363 &format!("fact:{}", fact_id),
364 "fact",
365 PendingIndexOpKind::Delete,
366 )?;
367 }
368 tx.execute("DELETE FROM facts WHERE namespace = ?1", params![namespace])?;
369
370 for doc_id in &document_ids {
371 let mut stmt = tx.prepare(
372 "SELECT c.id, c.content, cm.rowid
373 FROM chunks c
374 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
375 WHERE c.document_id = ?1",
376 )?;
377 let chunk_rows: Vec<(String, String, i64)> = stmt
378 .query_map(params![doc_id], |row| {
379 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
380 })?
381 .collect::<Result<Vec<_>, _>>()?;
382
383 for (chunk_id, content, fts_rowid) in &chunk_rows {
384 tx.execute(
385 "INSERT INTO chunks_fts(chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
386 params![fts_rowid, content],
387 )?;
388 tx.execute(
389 "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
390 params![chunk_id],
391 )?;
392 #[cfg(feature = "hnsw")]
393 enqueue_pending_index_op(
394 tx,
395 &format!("chunk:{}", chunk_id),
396 "chunk",
397 PendingIndexOpKind::Delete,
398 )?;
399 }
400
401 tx.execute("DELETE FROM chunks WHERE document_id = ?1", params![doc_id])?;
402 }
403
404 for doc_id in &document_ids {
405 let mut stmt = tx.prepare(
406 "SELECT e.episode_id, e.search_text, erm.rowid
407 FROM episodes e
408 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
409 WHERE e.document_id = ?1",
410 )?;
411 let episode_rows: Vec<(String, String, i64)> = stmt
412 .query_map(params![doc_id], |row| {
413 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
414 })?
415 .collect::<Result<Vec<_>, _>>()?;
416
417 for (episode_id, search_text, fts_rowid) in &episode_rows {
418 tx.execute(
419 "INSERT INTO episodes_fts(episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
420 params![fts_rowid, search_text],
421 )?;
422 tx.execute(
423 "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
424 params![episode_id],
425 )?;
426 tx.execute(
427 "DELETE FROM episode_causes WHERE episode_id = ?1",
428 params![episode_id],
429 )?;
430 #[cfg(feature = "hnsw")]
431 enqueue_pending_index_op(
432 tx,
433 &episodes::episode_item_key(episode_id),
434 "episode",
435 PendingIndexOpKind::Delete,
436 )?;
437 }
438
439 tx.execute(
440 "DELETE FROM episodes WHERE document_id = ?1",
441 params![doc_id],
442 )?;
443 tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
444 }
445
446 let claim_ids: Vec<String> = {
447 let mut stmt =
448 tx.prepare("SELECT claim_id FROM claim_versions WHERE scope_namespace = ?1")?;
449 let ids = stmt
450 .query_map(params![namespace], |row| row.get(0))?
451 .collect::<Result<Vec<_>, _>>()?;
452 ids
453 };
454
455 let claim_version_ids: Vec<String> = {
456 let mut stmt = tx.prepare(
457 "SELECT claim_version_id FROM claim_versions WHERE scope_namespace = ?1",
458 )?;
459 let ids = stmt
460 .query_map(params![namespace], |row| row.get(0))?
461 .collect::<Result<Vec<_>, _>>()?;
462 ids
463 };
464
465 let relation_version_ids: Vec<String> = {
466 let mut stmt = tx.prepare(
467 "SELECT relation_version_id FROM relation_versions WHERE scope_namespace = ?1",
468 )?;
469 let ids = stmt
470 .query_map(params![namespace], |row| row.get(0))?
471 .collect::<Result<Vec<_>, _>>()?;
472 ids
473 };
474
475 let alias_entity_ids: Vec<String> = {
476 let mut stmt = tx.prepare(
477 "SELECT canonical_entity_id FROM entity_aliases WHERE scope_namespace = ?1",
478 )?;
479 let ids = stmt
480 .query_map(params![namespace], |row| row.get(0))?
481 .collect::<Result<Vec<_>, _>>()?;
482 ids
483 };
484
485 let evidence_handles: Vec<String> = {
486 let mut stmt = tx.prepare(
487 "SELECT er.fetch_handle FROM evidence_refs er
488 JOIN projection_import_log pil ON er.source_envelope_id = pil.source_envelope_id
489 WHERE pil.scope_namespace = ?1",
490 )?;
491 let handles = stmt
492 .query_map(params![namespace], |row| row.get(0))?
493 .collect::<Result<Vec<_>, _>>()?;
494 handles
495 };
496
497 let episode_ids: Vec<String> = {
498 let mut stmt = tx.prepare(
499 "SELECT episode_id FROM episode_links
500 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
501 )?;
502 let ids = stmt
503 .query_map(params![namespace], |row| row.get(0))?
504 .collect::<Result<Vec<_>, _>>()?;
505 ids
506 };
507
508 delete_derivation_edges_for_ids("claim", &claim_ids)?;
509 delete_derivation_edges_for_ids("claim_version", &claim_version_ids)?;
510 delete_derivation_edges_for_ids("relation_version", &relation_version_ids)?;
511 delete_derivation_edges_for_ids("entity", &alias_entity_ids)?;
512 delete_derivation_edges_for_ids("evidence_ref", &evidence_handles)?;
513 delete_derivation_edges_for_ids("episode", &episode_ids)?;
514
515 tx.execute(
516 "DELETE FROM claim_versions WHERE scope_namespace = ?1",
517 params![namespace],
518 )?;
519 tx.execute(
520 "DELETE FROM relation_versions WHERE scope_namespace = ?1",
521 params![namespace],
522 )?;
523 tx.execute(
524 "DELETE FROM entity_aliases WHERE scope_namespace = ?1",
525 params![namespace],
526 )?;
527 tx.execute(
528 "DELETE FROM evidence_refs
529 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
530 params![namespace],
531 )?;
532 tx.execute(
533 "DELETE FROM episode_links
534 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
535 params![namespace],
536 )?;
537 tx.execute(
538 "DELETE FROM projection_import_failures WHERE scope_namespace = ?1",
539 params![namespace],
540 )?;
541 tx.execute(
542 "DELETE FROM projection_import_log WHERE scope_namespace = ?1",
543 params![namespace],
544 )?;
545
546 Ok(facts.len())
547 })
548}
549
550pub fn get_fact(conn: &Connection, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
552 let result = conn.query_row(
553 "SELECT id, namespace, content, source, created_at, updated_at, metadata
554 FROM facts WHERE id = ?1",
555 params![fact_id],
556 |row| {
557 Ok((
558 row.get::<_, String>(0)?,
559 row.get::<_, String>(1)?,
560 row.get::<_, String>(2)?,
561 row.get::<_, Option<String>>(3)?,
562 row.get::<_, String>(4)?,
563 row.get::<_, String>(5)?,
564 row.get::<_, Option<String>>(6)?,
565 ))
566 },
567 );
568
569 match result {
570 Ok((id, namespace, content, source, created_at, updated_at, metadata_raw)) => {
571 Ok(Some(Fact {
572 metadata: parse_optional_json("facts", &id, "metadata", metadata_raw.as_deref())?,
573 id,
574 namespace,
575 content,
576 source,
577 created_at,
578 updated_at,
579 }))
580 }
581 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
582 Err(err) => Err(MemoryError::Database(err)),
583 }
584}
585
586pub fn get_fact_embedding(
588 conn: &Connection,
589 fact_id: &str,
590) -> Result<Option<Vec<f32>>, MemoryError> {
591 let result: Result<Option<Vec<u8>>, _> = conn.query_row(
592 "SELECT embedding FROM facts WHERE id = ?1",
593 params![fact_id],
594 |row| row.get(0),
595 );
596
597 match result {
598 Ok(Some(bytes)) => Ok(Some(bytes_to_embedding(&bytes)?)),
599 Ok(None) => Ok(None),
600 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
601 Err(err) => Err(MemoryError::Database(err)),
602 }
603}
604
605pub fn list_facts(
607 conn: &Connection,
608 namespace: &str,
609 limit: usize,
610 offset: usize,
611) -> Result<Vec<Fact>, MemoryError> {
612 let mut stmt = conn.prepare(
613 "SELECT id, namespace, content, source, created_at, updated_at, metadata
614 FROM facts
615 WHERE namespace = ?1
616 ORDER BY updated_at DESC
617 LIMIT ?2 OFFSET ?3",
618 )?;
619
620 let facts = stmt
621 .query_map(params![namespace, limit as i64, offset as i64], |row| {
622 Ok((
623 row.get::<_, String>(0)?,
624 row.get::<_, String>(1)?,
625 row.get::<_, String>(2)?,
626 row.get::<_, Option<String>>(3)?,
627 row.get::<_, String>(4)?,
628 row.get::<_, String>(5)?,
629 row.get::<_, Option<String>>(6)?,
630 ))
631 })?
632 .collect::<Result<Vec<_>, _>>()?
633 .into_iter()
634 .map(
635 |(id, namespace, content, source, created_at, updated_at, metadata_raw)| {
636 Ok(Fact {
637 metadata: parse_optional_json(
638 "facts",
639 &id,
640 "metadata",
641 metadata_raw.as_deref(),
642 )?,
643 id,
644 namespace,
645 content,
646 source,
647 created_at,
648 updated_at,
649 })
650 },
651 )
652 .collect::<Result<Vec<_>, MemoryError>>()?;
653
654 Ok(facts)
655}
656
657impl MemoryStore {
658 pub async fn add_fact(
660 &self,
661 namespace: &str,
662 content: &str,
663 source: Option<&str>,
664 metadata: Option<serde_json::Value>,
665 ) -> Result<String, MemoryError> {
666 self.add_fact_with_trace(namespace, content, source, metadata, None)
667 .await
668 }
669
670 pub async fn add_fact_with_trace(
672 &self,
673 namespace: &str,
674 content: &str,
675 source: Option<&str>,
676 metadata: Option<serde_json::Value>,
677 trace_ctx: Option<&TraceCtx>,
678 ) -> Result<String, MemoryError> {
679 self.validate_content("fact.content", content)?;
680
681 let embedding = self.embed_text_internal(content).await?;
682 self.validate_embedding_dimensions(&embedding)?;
683 let embedding_bytes = db::embedding_to_bytes(&embedding);
684 let fact_id = uuid::Uuid::new_v4().to_string();
685 let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
686
687 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
688 let q8_bytes = quantizer
690 .quantize(&embedding)
691 .map(|qv| quantize::pack_quantized(&qv))
692 .ok();
693
694 let ns = namespace.to_string();
695 let ct = content.to_string();
696 let fid = fact_id.clone();
697 let src = source.map(|s| s.to_string());
698 let meta = merge_trace_ctx(metadata, trace_ctx);
699 self.with_write_conn(move |conn| {
700 let current_count: usize = conn.query_row(
701 "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
702 rusqlite::params![&ns],
703 |row| row.get(0),
704 )?;
705 if current_count >= max_facts_per_namespace {
706 return Err(MemoryError::NamespaceFull {
707 namespace: ns.clone(),
708 count: current_count,
709 limit: max_facts_per_namespace,
710 });
711 }
712 insert_fact_with_fts_q8(
713 conn,
714 &fid,
715 &ns,
716 &ct,
717 &embedding_bytes,
718 q8_bytes.as_deref(),
719 src.as_deref(),
720 meta.as_ref(),
721 )
722 })
723 .await?;
724
725 #[cfg(feature = "hnsw")]
726 self.sync_pending_hnsw_ops_best_effort("add_fact").await;
727
728 Ok(fact_id)
729 }
730
731 pub async fn add_fact_with_embedding(
733 &self,
734 namespace: &str,
735 content: &str,
736 embedding: &[f32],
737 source: Option<&str>,
738 metadata: Option<serde_json::Value>,
739 ) -> Result<String, MemoryError> {
740 self.add_fact_with_embedding_and_trace(
741 namespace, content, embedding, source, metadata, None,
742 )
743 .await
744 }
745
746 pub async fn add_fact_with_embedding_and_trace(
748 &self,
749 namespace: &str,
750 content: &str,
751 embedding: &[f32],
752 source: Option<&str>,
753 metadata: Option<serde_json::Value>,
754 trace_ctx: Option<&TraceCtx>,
755 ) -> Result<String, MemoryError> {
756 self.validate_content("fact.content", content)?;
757 self.validate_embedding_dimensions(embedding)?;
758 let embedding_bytes = db::embedding_to_bytes(embedding);
759 let fact_id = uuid::Uuid::new_v4().to_string();
760 let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
761
762 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
763 let q8_bytes = quantizer
765 .quantize(embedding)
766 .map(|qv| quantize::pack_quantized(&qv))
767 .ok();
768
769 let ns = namespace.to_string();
770 let ct = content.to_string();
771 let fid = fact_id.clone();
772 let src = source.map(|s| s.to_string());
773 let meta = merge_trace_ctx(metadata, trace_ctx);
774 self.with_write_conn(move |conn| {
775 let current_count: usize = conn.query_row(
776 "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
777 rusqlite::params![&ns],
778 |row| row.get(0),
779 )?;
780 if current_count >= max_facts_per_namespace {
781 return Err(MemoryError::NamespaceFull {
782 namespace: ns.clone(),
783 count: current_count,
784 limit: max_facts_per_namespace,
785 });
786 }
787 insert_fact_with_fts_q8(
788 conn,
789 &fid,
790 &ns,
791 &ct,
792 &embedding_bytes,
793 q8_bytes.as_deref(),
794 src.as_deref(),
795 meta.as_ref(),
796 )
797 })
798 .await?;
799
800 #[cfg(feature = "hnsw")]
801 self.sync_pending_hnsw_ops_best_effort("add_fact_with_embedding")
802 .await;
803
804 Ok(fact_id)
805 }
806
807 pub async fn update_fact(&self, fact_id: &str, content: &str) -> Result<(), MemoryError> {
809 self.validate_content("fact.content", content)?;
810 let embedding = self.embed_text_internal(content).await?;
811 self.validate_embedding_dimensions(&embedding)?;
812 let embedding_bytes = db::embedding_to_bytes(&embedding);
813 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
815 .quantize(&embedding)
816 .map(|qv| quantize::pack_quantized(&qv))
817 .ok();
818
819 let fid = fact_id.to_string();
820 let ct = content.to_string();
821 self.with_write_conn(move |conn| {
822 update_fact_with_fts(conn, &fid, &ct, &embedding_bytes, q8_bytes.as_deref())
823 })
824 .await?;
825
826 #[cfg(feature = "hnsw")]
827 self.sync_pending_hnsw_ops_best_effort("update_fact").await;
828
829 Ok(())
830 }
831
832 pub async fn delete_fact(&self, fact_id: &str) -> Result<(), MemoryError> {
834 let fid = fact_id.to_string();
835 self.with_write_conn(move |conn| delete_fact_with_fts(conn, &fid))
836 .await?;
837
838 #[cfg(feature = "hnsw")]
839 self.sync_pending_hnsw_ops_best_effort("delete_fact").await;
840
841 Ok(())
842 }
843
844 pub async fn delete_namespace(&self, namespace: &str) -> Result<usize, MemoryError> {
846 let ns = namespace.to_string();
847 let count = self
848 .with_write_conn(move |conn| delete_namespace(conn, &ns))
849 .await?;
850
851 #[cfg(feature = "hnsw")]
852 self.sync_pending_hnsw_ops_best_effort("delete_namespace")
853 .await;
854
855 Ok(count)
856 }
857
858 pub async fn get_fact(&self, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
860 let fid = fact_id.to_string();
861 self.with_read_conn(move |conn| get_fact(conn, &fid)).await
862 }
863
864 pub async fn get_fact_embedding(&self, fact_id: &str) -> Result<Option<Vec<f32>>, MemoryError> {
866 let fid = fact_id.to_string();
867 self.with_read_conn(move |conn| get_fact_embedding(conn, &fid))
868 .await
869 }
870
871 pub async fn list_facts(
873 &self,
874 namespace: &str,
875 limit: usize,
876 offset: usize,
877 ) -> Result<Vec<Fact>, MemoryError> {
878 let ns = namespace.to_string();
879 self.with_read_conn(move |conn| list_facts(conn, &ns, limit, offset))
880 .await
881 }
882}