Skip to main content

semantic_memory/
knowledge.rs

1//! Fact CRUD with FTS5 synchronization.
2//!
3//! Every fact operation that touches `facts_fts` is transactional.
4
5use crate::db;
6use crate::db::{bytes_to_embedding, parse_optional_json, with_transaction};
7#[cfg(feature = "hnsw")]
8use crate::db::{enqueue_pending_index_op, PendingIndexOpKind};
9use crate::episodes;
10use crate::error::MemoryError;
11use crate::quantize::{self, Quantizer};
12use crate::types::Fact;
13use crate::{merge_trace_ctx, MemoryStore};
14use rusqlite::{params, Connection};
15use stack_ids::TraceCtx;
16
17/// Insert a fact and its FTS entry in a transaction.
18#[allow(dead_code)]
19pub fn insert_fact_with_fts(
20    conn: &Connection,
21    fact_id: &str,
22    namespace: &str,
23    content: &str,
24    embedding_bytes: &[u8],
25    source: Option<&str>,
26    metadata: Option<&serde_json::Value>,
27) -> Result<(), MemoryError> {
28    insert_fact_with_fts_q8(
29        conn,
30        fact_id,
31        namespace,
32        content,
33        embedding_bytes,
34        None,
35        source,
36        metadata,
37    )
38}
39
40/// Insert a fact with both f32 and quantized embeddings.
41#[allow(clippy::too_many_arguments)]
42pub fn insert_fact_with_fts_q8(
43    conn: &Connection,
44    fact_id: &str,
45    namespace: &str,
46    content: &str,
47    embedding_bytes: &[u8],
48    q8_bytes: Option<&[u8]>,
49    source: Option<&str>,
50    metadata: Option<&serde_json::Value>,
51) -> Result<(), MemoryError> {
52    let metadata_str = metadata.map(|m| m.to_string());
53    with_transaction(conn, |tx| {
54        tx.execute(
55            "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
56             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
57            params![
58                fact_id,
59                namespace,
60                content,
61                source,
62                embedding_bytes,
63                q8_bytes,
64                metadata_str
65            ],
66        )?;
67
68        tx.execute(
69            "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
70            params![fact_id],
71        )?;
72        let fts_rowid = tx.last_insert_rowid();
73
74        tx.execute(
75            "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
76            params![fts_rowid, content],
77        )?;
78
79        #[cfg(feature = "hnsw")]
80        enqueue_pending_index_op(
81            tx,
82            &format!("fact:{}", fact_id),
83            "fact",
84            PendingIndexOpKind::Upsert,
85        )?;
86
87        Ok(())
88    })
89}
90
91/// Insert a fact within an existing transaction (no nested transaction).
92///
93/// Used by the import boundary where the outer transaction is already active.
94#[allow(clippy::too_many_arguments)]
95pub fn insert_fact_in_tx(
96    tx: &rusqlite::Transaction<'_>,
97    fact_id: &str,
98    namespace: &str,
99    content: &str,
100    embedding_bytes: &[u8],
101    q8_bytes: Option<&[u8]>,
102    source: Option<&str>,
103    metadata: Option<&serde_json::Value>,
104) -> Result<(), MemoryError> {
105    let metadata_str = metadata.map(|m| m.to_string());
106    tx.execute(
107        "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
108         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
109        params![
110            fact_id,
111            namespace,
112            content,
113            source,
114            embedding_bytes,
115            q8_bytes,
116            metadata_str
117        ],
118    )?;
119
120    tx.execute(
121        "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
122        params![fact_id],
123    )?;
124    let fts_rowid = tx.last_insert_rowid();
125
126    tx.execute(
127        "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
128        params![fts_rowid, content],
129    )?;
130
131    #[cfg(feature = "hnsw")]
132    enqueue_pending_index_op(
133        tx,
134        &format!("fact:{}", fact_id),
135        "fact",
136        PendingIndexOpKind::Upsert,
137    )?;
138
139    Ok(())
140}
141
142/// Delete a fact and its FTS entry in a transaction.
143pub fn delete_fact_with_fts(conn: &Connection, fact_id: &str) -> Result<(), MemoryError> {
144    with_transaction(conn, |tx| {
145        let fts_rowid: i64 = tx
146            .query_row(
147                "SELECT rowid FROM facts_rowid_map WHERE fact_id = ?1",
148                params![fact_id],
149                |row| row.get(0),
150            )
151            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
152
153        let content: String = tx
154            .query_row(
155                "SELECT content FROM facts WHERE id = ?1",
156                params![fact_id],
157                |row| row.get(0),
158            )
159            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
160
161        tx.execute(
162            "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
163            params![fts_rowid, content],
164        )?;
165        tx.execute(
166            "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
167            params![fact_id],
168        )?;
169        tx.execute("DELETE FROM facts WHERE id = ?1", params![fact_id])?;
170
171        #[cfg(feature = "hnsw")]
172        enqueue_pending_index_op(
173            tx,
174            &format!("fact:{}", fact_id),
175            "fact",
176            PendingIndexOpKind::Delete,
177        )?;
178
179        Ok(())
180    })
181}
182
183/// Update a fact's content and embeddings, with FTS synchronization.
184pub fn update_fact_with_fts(
185    conn: &Connection,
186    fact_id: &str,
187    new_content: &str,
188    new_embedding_bytes: &[u8],
189    new_q8_bytes: Option<&[u8]>,
190) -> Result<(), MemoryError> {
191    with_transaction(conn, |tx| {
192        let (fts_rowid, old_content): (i64, String) = tx
193            .query_row(
194                "SELECT fm.rowid, f.content
195                 FROM facts f
196                 JOIN facts_rowid_map fm ON fm.fact_id = f.id
197                 WHERE f.id = ?1",
198                params![fact_id],
199                |row| Ok((row.get(0)?, row.get(1)?)),
200            )
201            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
202
203        tx.execute(
204            "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
205            params![fts_rowid, old_content],
206        )?;
207
208        tx.execute(
209            "UPDATE facts
210             SET content = ?1,
211                 embedding = ?2,
212                 embedding_q8 = ?3,
213                 updated_at = datetime('now')
214             WHERE id = ?4",
215            params![new_content, new_embedding_bytes, new_q8_bytes, fact_id],
216        )?;
217
218        tx.execute(
219            "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
220            params![fts_rowid, new_content],
221        )?;
222
223        #[cfg(feature = "hnsw")]
224        enqueue_pending_index_op(
225            tx,
226            &format!("fact:{}", fact_id),
227            "fact",
228            PendingIndexOpKind::Upsert,
229        )?;
230
231        Ok(())
232    })
233}
234
235/// Delete all facts in a namespace atomically. Returns the deleted count.
236pub fn delete_namespace(conn: &Connection, namespace: &str) -> Result<usize, MemoryError> {
237    with_transaction(conn, |tx| {
238        let delete_session = |session_id: &str| -> Result<(), MemoryError> {
239            let message_data: Vec<(i64, String, i64, bool)> = {
240                let mut stmt = tx.prepare(
241                    "SELECT m.id, m.content, mm.rowid, m.embedding IS NOT NULL
242                     FROM messages m
243                     JOIN messages_rowid_map mm ON mm.message_id = m.id
244                     WHERE m.session_id = ?1",
245                )?;
246                let rows = stmt.query_map(params![session_id], |row| {
247                    Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
248                })?;
249                rows.collect::<Result<Vec<_>, _>>()?
250            };
251
252            for (message_id, content, fts_rowid, has_embedding) in &message_data {
253                tx.execute(
254                    "INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', ?1, ?2)",
255                    params![fts_rowid, content],
256                )?;
257                #[cfg(feature = "hnsw")]
258                if *has_embedding {
259                    enqueue_pending_index_op(
260                        tx,
261                        &format!("msg:{}", message_id),
262                        "message",
263                        PendingIndexOpKind::Delete,
264                    )?;
265                }
266            }
267
268            let affected = tx.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
269            if affected == 0 {
270                return Err(MemoryError::SessionNotFound(session_id.to_string()));
271            }
272            Ok(())
273        };
274
275        let document_ids: Vec<String> = {
276            let mut stmt = tx.prepare("SELECT id FROM documents WHERE namespace = ?1")?;
277            let ids = stmt
278                .query_map(params![namespace], |row| row.get(0))?
279                .collect::<Result<Vec<_>, _>>()?;
280            ids
281        };
282
283        let session_ids: Vec<String> = {
284            let mut stmt = tx.prepare("SELECT id, metadata FROM sessions")?;
285            let rows = stmt.query_map([], |row| {
286                Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
287            })?;
288            let mut ids = Vec::new();
289            for row in rows {
290                let (session_id, metadata_raw) = row?;
291                let metadata = parse_optional_json(
292                    "sessions",
293                    &session_id,
294                    "metadata",
295                    metadata_raw.as_deref(),
296                )?;
297                let namespace_matches = metadata
298                    .as_ref()
299                    .and_then(|value| {
300                        value
301                            .get("namespace")
302                            .or_else(|| value.get("scope_namespace"))
303                    })
304                    .and_then(|value| value.as_str())
305                    == Some(namespace);
306                if namespace_matches {
307                    ids.push(session_id);
308                }
309            }
310            ids
311        };
312
313        for session_id in &session_ids {
314            delete_session(session_id)?;
315        }
316
317        let delete_derivation_edges_for_id = |kind: &str, id: &str| -> Result<(), MemoryError> {
318            tx.execute(
319                "DELETE FROM derivation_edges
320                 WHERE (source_kind = ?1 AND source_id = ?2)
321                    OR (target_kind = ?1 AND target_id = ?2)",
322                params![kind, id],
323            )?;
324            Ok(())
325        };
326
327        let delete_derivation_edges_for_ids =
328            |kind: &str, ids: &[String]| -> Result<(), MemoryError> {
329                for id in ids {
330                    delete_derivation_edges_for_id(kind, id)?;
331                }
332                Ok(())
333            };
334
335        let facts: Vec<(String, i64, String)> = {
336            let mut stmt = tx.prepare(
337                "SELECT f.id, fm.rowid, f.content
338                 FROM facts f
339                 JOIN facts_rowid_map fm ON fm.fact_id = f.id
340                 WHERE f.namespace = ?1",
341            )?;
342            let facts = stmt
343                .query_map(params![namespace], |row| {
344                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
345                })?
346                .collect::<Result<Vec<_>, _>>()?;
347            facts
348        };
349
350        for (fact_id, fts_rowid, content) in &facts {
351            tx.execute(
352                "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
353                params![fts_rowid, content],
354            )?;
355            tx.execute(
356                "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
357                params![fact_id],
358            )?;
359
360            #[cfg(feature = "hnsw")]
361            enqueue_pending_index_op(
362                tx,
363                &format!("fact:{}", fact_id),
364                "fact",
365                PendingIndexOpKind::Delete,
366            )?;
367        }
368        tx.execute("DELETE FROM facts WHERE namespace = ?1", params![namespace])?;
369
370        for doc_id in &document_ids {
371            let mut stmt = tx.prepare(
372                "SELECT c.id, c.content, cm.rowid
373                 FROM chunks c
374                 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
375                 WHERE c.document_id = ?1",
376            )?;
377            let chunk_rows: Vec<(String, String, i64)> = stmt
378                .query_map(params![doc_id], |row| {
379                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
380                })?
381                .collect::<Result<Vec<_>, _>>()?;
382
383            for (chunk_id, content, fts_rowid) in &chunk_rows {
384                tx.execute(
385                    "INSERT INTO chunks_fts(chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
386                    params![fts_rowid, content],
387                )?;
388                tx.execute(
389                    "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
390                    params![chunk_id],
391                )?;
392                #[cfg(feature = "hnsw")]
393                enqueue_pending_index_op(
394                    tx,
395                    &format!("chunk:{}", chunk_id),
396                    "chunk",
397                    PendingIndexOpKind::Delete,
398                )?;
399            }
400
401            tx.execute("DELETE FROM chunks WHERE document_id = ?1", params![doc_id])?;
402        }
403
404        for doc_id in &document_ids {
405            let mut stmt = tx.prepare(
406                "SELECT e.episode_id, e.search_text, erm.rowid
407                 FROM episodes e
408                 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
409                 WHERE e.document_id = ?1",
410            )?;
411            let episode_rows: Vec<(String, String, i64)> = stmt
412                .query_map(params![doc_id], |row| {
413                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
414                })?
415                .collect::<Result<Vec<_>, _>>()?;
416
417            for (episode_id, search_text, fts_rowid) in &episode_rows {
418                tx.execute(
419                    "INSERT INTO episodes_fts(episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
420                    params![fts_rowid, search_text],
421                )?;
422                tx.execute(
423                    "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
424                    params![episode_id],
425                )?;
426                tx.execute(
427                    "DELETE FROM episode_causes WHERE episode_id = ?1",
428                    params![episode_id],
429                )?;
430                #[cfg(feature = "hnsw")]
431                enqueue_pending_index_op(
432                    tx,
433                    &episodes::episode_item_key(episode_id),
434                    "episode",
435                    PendingIndexOpKind::Delete,
436                )?;
437            }
438
439            tx.execute(
440                "DELETE FROM episodes WHERE document_id = ?1",
441                params![doc_id],
442            )?;
443            tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
444        }
445
446        let claim_ids: Vec<String> = {
447            let mut stmt =
448                tx.prepare("SELECT claim_id FROM claim_versions WHERE scope_namespace = ?1")?;
449            let ids = stmt
450                .query_map(params![namespace], |row| row.get(0))?
451                .collect::<Result<Vec<_>, _>>()?;
452            ids
453        };
454
455        let claim_version_ids: Vec<String> = {
456            let mut stmt = tx.prepare(
457                "SELECT claim_version_id FROM claim_versions WHERE scope_namespace = ?1",
458            )?;
459            let ids = stmt
460                .query_map(params![namespace], |row| row.get(0))?
461                .collect::<Result<Vec<_>, _>>()?;
462            ids
463        };
464
465        let relation_version_ids: Vec<String> = {
466            let mut stmt = tx.prepare(
467                "SELECT relation_version_id FROM relation_versions WHERE scope_namespace = ?1",
468            )?;
469            let ids = stmt
470                .query_map(params![namespace], |row| row.get(0))?
471                .collect::<Result<Vec<_>, _>>()?;
472            ids
473        };
474
475        let alias_entity_ids: Vec<String> = {
476            let mut stmt = tx.prepare(
477                "SELECT canonical_entity_id FROM entity_aliases WHERE scope_namespace = ?1",
478            )?;
479            let ids = stmt
480                .query_map(params![namespace], |row| row.get(0))?
481                .collect::<Result<Vec<_>, _>>()?;
482            ids
483        };
484
485        let evidence_handles: Vec<String> = {
486            let mut stmt = tx.prepare(
487                "SELECT er.fetch_handle FROM evidence_refs er
488                 JOIN projection_import_log pil ON er.source_envelope_id = pil.source_envelope_id
489                 WHERE pil.scope_namespace = ?1",
490            )?;
491            let handles = stmt
492                .query_map(params![namespace], |row| row.get(0))?
493                .collect::<Result<Vec<_>, _>>()?;
494            handles
495        };
496
497        let episode_ids: Vec<String> = {
498            let mut stmt = tx.prepare(
499                "SELECT episode_id FROM episode_links
500                 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
501            )?;
502            let ids = stmt
503                .query_map(params![namespace], |row| row.get(0))?
504                .collect::<Result<Vec<_>, _>>()?;
505            ids
506        };
507
508        delete_derivation_edges_for_ids("claim", &claim_ids)?;
509        delete_derivation_edges_for_ids("claim_version", &claim_version_ids)?;
510        delete_derivation_edges_for_ids("relation_version", &relation_version_ids)?;
511        delete_derivation_edges_for_ids("entity", &alias_entity_ids)?;
512        delete_derivation_edges_for_ids("evidence_ref", &evidence_handles)?;
513        delete_derivation_edges_for_ids("episode", &episode_ids)?;
514
515        tx.execute(
516            "DELETE FROM claim_versions WHERE scope_namespace = ?1",
517            params![namespace],
518        )?;
519        tx.execute(
520            "DELETE FROM relation_versions WHERE scope_namespace = ?1",
521            params![namespace],
522        )?;
523        tx.execute(
524            "DELETE FROM entity_aliases WHERE scope_namespace = ?1",
525            params![namespace],
526        )?;
527        tx.execute(
528            "DELETE FROM evidence_refs
529             WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
530            params![namespace],
531        )?;
532        tx.execute(
533            "DELETE FROM episode_links
534             WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
535            params![namespace],
536        )?;
537        tx.execute(
538            "DELETE FROM projection_import_failures WHERE scope_namespace = ?1",
539            params![namespace],
540        )?;
541        tx.execute(
542            "DELETE FROM projection_import_log WHERE scope_namespace = ?1",
543            params![namespace],
544        )?;
545
546        Ok(facts.len())
547    })
548}
549
550/// Get a fact by ID.
551pub fn get_fact(conn: &Connection, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
552    let result = conn.query_row(
553        "SELECT id, namespace, content, source, created_at, updated_at, metadata
554         FROM facts WHERE id = ?1",
555        params![fact_id],
556        |row| {
557            Ok((
558                row.get::<_, String>(0)?,
559                row.get::<_, String>(1)?,
560                row.get::<_, String>(2)?,
561                row.get::<_, Option<String>>(3)?,
562                row.get::<_, String>(4)?,
563                row.get::<_, String>(5)?,
564                row.get::<_, Option<String>>(6)?,
565            ))
566        },
567    );
568
569    match result {
570        Ok((id, namespace, content, source, created_at, updated_at, metadata_raw)) => {
571            Ok(Some(Fact {
572                metadata: parse_optional_json("facts", &id, "metadata", metadata_raw.as_deref())?,
573                id,
574                namespace,
575                content,
576                source,
577                created_at,
578                updated_at,
579            }))
580        }
581        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
582        Err(err) => Err(MemoryError::Database(err)),
583    }
584}
585
586/// Get a fact embedding vector.
587pub fn get_fact_embedding(
588    conn: &Connection,
589    fact_id: &str,
590) -> Result<Option<Vec<f32>>, MemoryError> {
591    let result: Result<Option<Vec<u8>>, _> = conn.query_row(
592        "SELECT embedding FROM facts WHERE id = ?1",
593        params![fact_id],
594        |row| row.get(0),
595    );
596
597    match result {
598        Ok(Some(bytes)) => Ok(Some(bytes_to_embedding(&bytes)?)),
599        Ok(None) => Ok(None),
600        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
601        Err(err) => Err(MemoryError::Database(err)),
602    }
603}
604
605/// List facts within a namespace.
606pub fn list_facts(
607    conn: &Connection,
608    namespace: &str,
609    limit: usize,
610    offset: usize,
611) -> Result<Vec<Fact>, MemoryError> {
612    let mut stmt = conn.prepare(
613        "SELECT id, namespace, content, source, created_at, updated_at, metadata
614         FROM facts
615         WHERE namespace = ?1
616         ORDER BY updated_at DESC
617         LIMIT ?2 OFFSET ?3",
618    )?;
619
620    let facts = stmt
621        .query_map(params![namespace, limit as i64, offset as i64], |row| {
622            Ok((
623                row.get::<_, String>(0)?,
624                row.get::<_, String>(1)?,
625                row.get::<_, String>(2)?,
626                row.get::<_, Option<String>>(3)?,
627                row.get::<_, String>(4)?,
628                row.get::<_, String>(5)?,
629                row.get::<_, Option<String>>(6)?,
630            ))
631        })?
632        .collect::<Result<Vec<_>, _>>()?
633        .into_iter()
634        .map(
635            |(id, namespace, content, source, created_at, updated_at, metadata_raw)| {
636                Ok(Fact {
637                    metadata: parse_optional_json(
638                        "facts",
639                        &id,
640                        "metadata",
641                        metadata_raw.as_deref(),
642                    )?,
643                    id,
644                    namespace,
645                    content,
646                    source,
647                    created_at,
648                    updated_at,
649                })
650            },
651        )
652        .collect::<Result<Vec<_>, MemoryError>>()?;
653
654    Ok(facts)
655}
656
657impl MemoryStore {
658    /// Store a fact with automatic embedding. Returns the fact ID (UUID v4).
659    pub async fn add_fact(
660        &self,
661        namespace: &str,
662        content: &str,
663        source: Option<&str>,
664        metadata: Option<serde_json::Value>,
665    ) -> Result<String, MemoryError> {
666        self.add_fact_with_trace(namespace, content, source, metadata, None)
667            .await
668    }
669
670    /// Store a fact with automatic embedding and optional trace metadata.
671    pub async fn add_fact_with_trace(
672        &self,
673        namespace: &str,
674        content: &str,
675        source: Option<&str>,
676        metadata: Option<serde_json::Value>,
677        trace_ctx: Option<&TraceCtx>,
678    ) -> Result<String, MemoryError> {
679        self.validate_content("fact.content", content)?;
680
681        let embedding = self.embed_text_internal(content).await?;
682        self.validate_embedding_dimensions(&embedding)?;
683        let embedding_bytes = db::embedding_to_bytes(&embedding);
684        let fact_id = uuid::Uuid::new_v4().to_string();
685        let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
686
687        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
688        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
689        let q8_bytes = quantizer
690            .quantize(&embedding)
691            .map(|qv| quantize::pack_quantized(&qv))
692            .ok();
693
694        let ns = namespace.to_string();
695        let ct = content.to_string();
696        let fid = fact_id.clone();
697        let src = source.map(|s| s.to_string());
698        let meta = merge_trace_ctx(metadata, trace_ctx);
699        self.with_write_conn(move |conn| {
700            let current_count: usize = conn.query_row(
701                "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
702                rusqlite::params![&ns],
703                |row| row.get(0),
704            )?;
705            if current_count >= max_facts_per_namespace {
706                return Err(MemoryError::NamespaceFull {
707                    namespace: ns.clone(),
708                    count: current_count,
709                    limit: max_facts_per_namespace,
710                });
711            }
712            insert_fact_with_fts_q8(
713                conn,
714                &fid,
715                &ns,
716                &ct,
717                &embedding_bytes,
718                q8_bytes.as_deref(),
719                src.as_deref(),
720                meta.as_ref(),
721            )
722        })
723        .await?;
724
725        #[cfg(feature = "hnsw")]
726        self.sync_pending_hnsw_ops_best_effort("add_fact").await;
727
728        Ok(fact_id)
729    }
730
731    /// Store a fact with a pre-computed embedding.
732    pub async fn add_fact_with_embedding(
733        &self,
734        namespace: &str,
735        content: &str,
736        embedding: &[f32],
737        source: Option<&str>,
738        metadata: Option<serde_json::Value>,
739    ) -> Result<String, MemoryError> {
740        self.add_fact_with_embedding_and_trace(
741            namespace, content, embedding, source, metadata, None,
742        )
743        .await
744    }
745
746    /// Store a fact with a pre-computed embedding and optional trace metadata.
747    pub async fn add_fact_with_embedding_and_trace(
748        &self,
749        namespace: &str,
750        content: &str,
751        embedding: &[f32],
752        source: Option<&str>,
753        metadata: Option<serde_json::Value>,
754        trace_ctx: Option<&TraceCtx>,
755    ) -> Result<String, MemoryError> {
756        self.validate_content("fact.content", content)?;
757        self.validate_embedding_dimensions(embedding)?;
758        let embedding_bytes = db::embedding_to_bytes(embedding);
759        let fact_id = uuid::Uuid::new_v4().to_string();
760        let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
761
762        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
763        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
764        let q8_bytes = quantizer
765            .quantize(embedding)
766            .map(|qv| quantize::pack_quantized(&qv))
767            .ok();
768
769        let ns = namespace.to_string();
770        let ct = content.to_string();
771        let fid = fact_id.clone();
772        let src = source.map(|s| s.to_string());
773        let meta = merge_trace_ctx(metadata, trace_ctx);
774        self.with_write_conn(move |conn| {
775            let current_count: usize = conn.query_row(
776                "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
777                rusqlite::params![&ns],
778                |row| row.get(0),
779            )?;
780            if current_count >= max_facts_per_namespace {
781                return Err(MemoryError::NamespaceFull {
782                    namespace: ns.clone(),
783                    count: current_count,
784                    limit: max_facts_per_namespace,
785                });
786            }
787            insert_fact_with_fts_q8(
788                conn,
789                &fid,
790                &ns,
791                &ct,
792                &embedding_bytes,
793                q8_bytes.as_deref(),
794                src.as_deref(),
795                meta.as_ref(),
796            )
797        })
798        .await?;
799
800        #[cfg(feature = "hnsw")]
801        self.sync_pending_hnsw_ops_best_effort("add_fact_with_embedding")
802            .await;
803
804        Ok(fact_id)
805    }
806
807    /// Update a fact's content. Re-embeds automatically.
808    pub async fn update_fact(&self, fact_id: &str, content: &str) -> Result<(), MemoryError> {
809        self.validate_content("fact.content", content)?;
810        let embedding = self.embed_text_internal(content).await?;
811        self.validate_embedding_dimensions(&embedding)?;
812        let embedding_bytes = db::embedding_to_bytes(&embedding);
813        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
814        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
815            .quantize(&embedding)
816            .map(|qv| quantize::pack_quantized(&qv))
817            .ok();
818
819        let fid = fact_id.to_string();
820        let ct = content.to_string();
821        self.with_write_conn(move |conn| {
822            update_fact_with_fts(conn, &fid, &ct, &embedding_bytes, q8_bytes.as_deref())
823        })
824        .await?;
825
826        #[cfg(feature = "hnsw")]
827        self.sync_pending_hnsw_ops_best_effort("update_fact").await;
828
829        Ok(())
830    }
831
832    /// Delete a fact by ID.
833    pub async fn delete_fact(&self, fact_id: &str) -> Result<(), MemoryError> {
834        let fid = fact_id.to_string();
835        self.with_write_conn(move |conn| delete_fact_with_fts(conn, &fid))
836            .await?;
837
838        #[cfg(feature = "hnsw")]
839        self.sync_pending_hnsw_ops_best_effort("delete_fact").await;
840
841        Ok(())
842    }
843
844    /// Delete all facts in a namespace. Returns the count of deleted facts.
845    pub async fn delete_namespace(&self, namespace: &str) -> Result<usize, MemoryError> {
846        let ns = namespace.to_string();
847        let count = self
848            .with_write_conn(move |conn| delete_namespace(conn, &ns))
849            .await?;
850
851        #[cfg(feature = "hnsw")]
852        self.sync_pending_hnsw_ops_best_effort("delete_namespace")
853            .await;
854
855        Ok(count)
856    }
857
858    /// Get a fact by ID.
859    pub async fn get_fact(&self, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
860        let fid = fact_id.to_string();
861        self.with_read_conn(move |conn| get_fact(conn, &fid)).await
862    }
863
864    /// Get a fact's embedding vector.
865    pub async fn get_fact_embedding(&self, fact_id: &str) -> Result<Option<Vec<f32>>, MemoryError> {
866        let fid = fact_id.to_string();
867        self.with_read_conn(move |conn| get_fact_embedding(conn, &fid))
868            .await
869    }
870
871    /// List all facts in a namespace.
872    pub async fn list_facts(
873        &self,
874        namespace: &str,
875        limit: usize,
876        offset: usize,
877    ) -> Result<Vec<Fact>, MemoryError> {
878        let ns = namespace.to_string();
879        self.with_read_conn(move |conn| list_facts(conn, &ns, limit, offset))
880            .await
881    }
882}