Skip to main content

semantic_memory/
knowledge.rs

1//! Fact CRUD with FTS5 synchronization.
2//!
3//! Every fact operation that touches `facts_fts` is transactional.
4
5use crate::db;
6use crate::db::{bytes_to_embedding, parse_optional_json, with_transaction};
7#[cfg(feature = "hnsw")]
8use crate::db::{enqueue_pending_index_op, PendingIndexOpKind};
9#[cfg(feature = "hnsw")]
10use crate::episodes;
11use crate::error::MemoryError;
12use crate::quantize::{self, Quantizer};
13use crate::types::{Fact, NamespaceDeleteReport};
14use crate::{merge_trace_ctx, MemoryStore};
15use rusqlite::{params, Connection};
16use stack_ids::TraceCtx;
17
18/// Insert a fact and its FTS entry in a transaction.
19#[allow(dead_code)]
20pub fn insert_fact_with_fts(
21    conn: &Connection,
22    fact_id: &str,
23    namespace: &str,
24    content: &str,
25    embedding_bytes: &[u8],
26    source: Option<&str>,
27    metadata: Option<&serde_json::Value>,
28) -> Result<(), MemoryError> {
29    insert_fact_with_fts_q8(
30        conn,
31        fact_id,
32        namespace,
33        content,
34        embedding_bytes,
35        None,
36        source,
37        metadata,
38    )
39}
40
41/// Insert a fact with both f32 and quantized embeddings.
42#[allow(clippy::too_many_arguments)]
43pub fn insert_fact_with_fts_q8(
44    conn: &Connection,
45    fact_id: &str,
46    namespace: &str,
47    content: &str,
48    embedding_bytes: &[u8],
49    q8_bytes: Option<&[u8]>,
50    source: Option<&str>,
51    metadata: Option<&serde_json::Value>,
52) -> Result<(), MemoryError> {
53    let metadata_str = metadata.map(|m| m.to_string());
54    with_transaction(conn, |tx| {
55        tx.execute(
56            "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
57             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
58            params![
59                fact_id,
60                namespace,
61                content,
62                source,
63                embedding_bytes,
64                q8_bytes,
65                metadata_str
66            ],
67        )?;
68
69        tx.execute(
70            "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
71            params![fact_id],
72        )?;
73        let fts_rowid = tx.last_insert_rowid();
74
75        tx.execute(
76            "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
77            params![fts_rowid, content],
78        )?;
79
80        #[cfg(feature = "hnsw")]
81        enqueue_pending_index_op(
82            tx,
83            &format!("fact:{}", fact_id),
84            "fact",
85            PendingIndexOpKind::Upsert,
86        )?;
87        db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
88
89        Ok(())
90    })
91}
92
93/// Insert a fact within an existing transaction (no nested transaction).
94///
95/// Used by the import boundary where the outer transaction is already active.
96#[allow(clippy::too_many_arguments)]
97pub fn insert_fact_in_tx(
98    tx: &rusqlite::Transaction<'_>,
99    fact_id: &str,
100    namespace: &str,
101    content: &str,
102    embedding_bytes: &[u8],
103    q8_bytes: Option<&[u8]>,
104    source: Option<&str>,
105    metadata: Option<&serde_json::Value>,
106) -> Result<(), MemoryError> {
107    let metadata_str = metadata.map(|m| m.to_string());
108    tx.execute(
109        "INSERT INTO facts (id, namespace, content, source, embedding, embedding_q8, metadata)
110         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
111        params![
112            fact_id,
113            namespace,
114            content,
115            source,
116            embedding_bytes,
117            q8_bytes,
118            metadata_str
119        ],
120    )?;
121
122    tx.execute(
123        "INSERT INTO facts_rowid_map (fact_id) VALUES (?1)",
124        params![fact_id],
125    )?;
126    let fts_rowid = tx.last_insert_rowid();
127
128    tx.execute(
129        "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
130        params![fts_rowid, content],
131    )?;
132
133    #[cfg(feature = "hnsw")]
134    enqueue_pending_index_op(
135        tx,
136        &format!("fact:{}", fact_id),
137        "fact",
138        PendingIndexOpKind::Upsert,
139    )?;
140    db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
141
142    Ok(())
143}
144
145/// Delete a fact and its FTS entry in a transaction.
146pub fn delete_fact_with_fts(conn: &Connection, fact_id: &str) -> Result<(), MemoryError> {
147    with_transaction(conn, |tx| {
148        let fts_rowid: i64 = tx
149            .query_row(
150                "SELECT rowid FROM facts_rowid_map WHERE fact_id = ?1",
151                params![fact_id],
152                |row| row.get(0),
153            )
154            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
155
156        let content: String = tx
157            .query_row(
158                "SELECT content FROM facts WHERE id = ?1",
159                params![fact_id],
160                |row| row.get(0),
161            )
162            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
163
164        tx.execute(
165            "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
166            params![fts_rowid, content],
167        )?;
168        tx.execute(
169            "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
170            params![fact_id],
171        )?;
172        tx.execute(
173            "DELETE FROM episode_causes WHERE cause_node_id IN (?1, ?2)",
174            params![fact_id, format!("fact:{fact_id}")],
175        )?;
176        tx.execute(
177            "DELETE FROM derivation_edges
178             WHERE (source_kind = 'fact' AND source_id = ?1)
179                OR (target_kind = 'fact' AND target_id = ?1)",
180            params![fact_id],
181        )?;
182        tx.execute("DELETE FROM facts WHERE id = ?1", params![fact_id])?;
183
184        #[cfg(feature = "hnsw")]
185        enqueue_pending_index_op(
186            tx,
187            &format!("fact:{}", fact_id),
188            "fact",
189            PendingIndexOpKind::Delete,
190        )?;
191        db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
192
193        Ok(())
194    })
195}
196
197/// Update a fact's content and embeddings, with FTS synchronization.
198pub fn update_fact_with_fts(
199    conn: &Connection,
200    fact_id: &str,
201    new_content: &str,
202    new_embedding_bytes: &[u8],
203    new_q8_bytes: Option<&[u8]>,
204) -> Result<(), MemoryError> {
205    with_transaction(conn, |tx| {
206        let (fts_rowid, old_content): (i64, String) = tx
207            .query_row(
208                "SELECT fm.rowid, f.content
209                 FROM facts f
210                 JOIN facts_rowid_map fm ON fm.fact_id = f.id
211                 WHERE f.id = ?1",
212                params![fact_id],
213                |row| Ok((row.get(0)?, row.get(1)?)),
214            )
215            .map_err(|e| MemoryError::FactNotFound(format!("{}: {e}", fact_id)))?;
216
217        tx.execute(
218            "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
219            params![fts_rowid, old_content],
220        )?;
221
222        tx.execute(
223            "UPDATE facts
224             SET content = ?1,
225                 embedding = ?2,
226                 embedding_q8 = ?3,
227                 updated_at = datetime('now')
228             WHERE id = ?4",
229            params![new_content, new_embedding_bytes, new_q8_bytes, fact_id],
230        )?;
231
232        tx.execute(
233            "INSERT INTO facts_fts(rowid, content) VALUES (?1, ?2)",
234            params![fts_rowid, new_content],
235        )?;
236        tx.execute(
237            "DELETE FROM derivation_edges
238             WHERE (source_kind = 'fact' AND source_id = ?1)
239                OR (target_kind = 'fact' AND target_id = ?1)",
240            params![fact_id],
241        )?;
242
243        #[cfg(feature = "hnsw")]
244        enqueue_pending_index_op(
245            tx,
246            &format!("fact:{}", fact_id),
247            "fact",
248            PendingIndexOpKind::Upsert,
249        )?;
250        db::invalidate_derived_vector_artifact(tx, &format!("fact:{fact_id}"))?;
251
252        Ok(())
253    })
254}
255
256/// Delete all namespace-scoped memory atomically and report every affected surface.
257pub fn delete_namespace(
258    conn: &Connection,
259    namespace: &str,
260) -> Result<NamespaceDeleteReport, MemoryError> {
261    with_transaction(conn, |tx| {
262        let mut report = NamespaceDeleteReport::default();
263        let delete_session = |session_id: &str| -> Result<(usize, usize), MemoryError> {
264            let message_data: Vec<(i64, String, i64, bool)> = {
265                let mut stmt = tx.prepare(
266                    "SELECT m.id, m.content, mm.rowid, m.embedding IS NOT NULL
267                     FROM messages m
268                     JOIN messages_rowid_map mm ON mm.message_id = m.id
269                     WHERE m.session_id = ?1",
270                )?;
271                let rows = stmt.query_map(params![session_id], |row| {
272                    Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
273                })?;
274                rows.collect::<Result<Vec<_>, _>>()?
275            };
276
277            for (message_id, content, fts_rowid, has_embedding) in &message_data {
278                #[cfg(not(feature = "hnsw"))]
279                let _ = (message_id, has_embedding);
280                tx.execute(
281                    "INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', ?1, ?2)",
282                    params![fts_rowid, content],
283                )?;
284                #[cfg(feature = "hnsw")]
285                if *has_embedding {
286                    enqueue_pending_index_op(
287                        tx,
288                        &format!("msg:{}", message_id),
289                        "message",
290                        PendingIndexOpKind::Delete,
291                    )?;
292                }
293            }
294
295            let affected = tx.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
296            if affected == 0 {
297                return Err(MemoryError::SessionNotFound(session_id.to_string()));
298            }
299            let hnsw_ops = message_data
300                .iter()
301                .filter(|(_, _, _, has_embedding)| *has_embedding)
302                .count();
303            Ok((message_data.len(), hnsw_ops))
304        };
305
306        let document_ids: Vec<String> = {
307            let mut stmt = tx.prepare("SELECT id FROM documents WHERE namespace = ?1")?;
308            let ids = stmt
309                .query_map(params![namespace], |row| row.get(0))?
310                .collect::<Result<Vec<_>, _>>()?;
311            ids
312        };
313
314        let session_ids: Vec<String> = {
315            let mut stmt = tx.prepare("SELECT id, metadata FROM sessions")?;
316            let rows = stmt.query_map([], |row| {
317                Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
318            })?;
319            let mut ids = Vec::new();
320            for row in rows {
321                let (session_id, metadata_raw) = row?;
322                let metadata = parse_optional_json(
323                    "sessions",
324                    &session_id,
325                    "metadata",
326                    metadata_raw.as_deref(),
327                )?;
328                let namespace_matches = metadata
329                    .as_ref()
330                    .and_then(|value| {
331                        value
332                            .get("namespace")
333                            .or_else(|| value.get("scope_namespace"))
334                    })
335                    .and_then(|value| value.as_str())
336                    == Some(namespace);
337                if namespace_matches {
338                    ids.push(session_id);
339                }
340            }
341            ids
342        };
343
344        for session_id in &session_ids {
345            let (messages, hnsw_ops) = delete_session(session_id)?;
346            report.messages += messages;
347            report.hnsw_ops += hnsw_ops;
348        }
349        report.sessions = session_ids.len();
350
351        let delete_derivation_edges_for_id = |kind: &str, id: &str| -> Result<(), MemoryError> {
352            tx.execute(
353                "DELETE FROM derivation_edges
354                 WHERE (source_kind = ?1 AND source_id = ?2)
355                    OR (target_kind = ?1 AND target_id = ?2)",
356                params![kind, id],
357            )?;
358            Ok(())
359        };
360
361        let delete_derivation_edges_for_ids =
362            |kind: &str, ids: &[String]| -> Result<(), MemoryError> {
363                for id in ids {
364                    delete_derivation_edges_for_id(kind, id)?;
365                }
366                Ok(())
367            };
368
369        let facts: Vec<(String, i64, String)> = {
370            let mut stmt = tx.prepare(
371                "SELECT f.id, fm.rowid, f.content
372                 FROM facts f
373                 JOIN facts_rowid_map fm ON fm.fact_id = f.id
374                 WHERE f.namespace = ?1",
375            )?;
376            let facts = stmt
377                .query_map(params![namespace], |row| {
378                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
379                })?
380                .collect::<Result<Vec<_>, _>>()?;
381            facts
382        };
383
384        for (fact_id, fts_rowid, content) in &facts {
385            tx.execute(
386                "INSERT INTO facts_fts(facts_fts, rowid, content) VALUES('delete', ?1, ?2)",
387                params![fts_rowid, content],
388            )?;
389            tx.execute(
390                "DELETE FROM facts_rowid_map WHERE fact_id = ?1",
391                params![fact_id],
392            )?;
393
394            #[cfg(feature = "hnsw")]
395            enqueue_pending_index_op(
396                tx,
397                &format!("fact:{}", fact_id),
398                "fact",
399                PendingIndexOpKind::Delete,
400            )?;
401            #[cfg(feature = "hnsw")]
402            {
403                report.hnsw_ops += 1;
404            }
405        }
406        tx.execute("DELETE FROM facts WHERE namespace = ?1", params![namespace])?;
407        report.facts = facts.len();
408
409        for doc_id in &document_ids {
410            let mut stmt = tx.prepare(
411                "SELECT c.id, c.content, cm.rowid
412                 FROM chunks c
413                 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
414                 WHERE c.document_id = ?1",
415            )?;
416            let chunk_rows: Vec<(String, String, i64)> = stmt
417                .query_map(params![doc_id], |row| {
418                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
419                })?
420                .collect::<Result<Vec<_>, _>>()?;
421            report.chunks += chunk_rows.len();
422
423            for (chunk_id, content, fts_rowid) in &chunk_rows {
424                tx.execute(
425                    "INSERT INTO chunks_fts(chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
426                    params![fts_rowid, content],
427                )?;
428                tx.execute(
429                    "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
430                    params![chunk_id],
431                )?;
432                #[cfg(feature = "hnsw")]
433                enqueue_pending_index_op(
434                    tx,
435                    &format!("chunk:{}", chunk_id),
436                    "chunk",
437                    PendingIndexOpKind::Delete,
438                )?;
439                #[cfg(feature = "hnsw")]
440                {
441                    report.hnsw_ops += 1;
442                }
443            }
444
445            tx.execute("DELETE FROM chunks WHERE document_id = ?1", params![doc_id])?;
446        }
447
448        for doc_id in &document_ids {
449            let mut stmt = tx.prepare(
450                "SELECT e.episode_id, e.search_text, erm.rowid
451                 FROM episodes e
452                 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
453                 WHERE e.document_id = ?1",
454            )?;
455            let episode_rows: Vec<(String, String, i64)> = stmt
456                .query_map(params![doc_id], |row| {
457                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
458                })?
459                .collect::<Result<Vec<_>, _>>()?;
460            report.episodes += episode_rows.len();
461
462            for (episode_id, search_text, fts_rowid) in &episode_rows {
463                tx.execute(
464                    "INSERT INTO episodes_fts(episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
465                    params![fts_rowid, search_text],
466                )?;
467                tx.execute(
468                    "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
469                    params![episode_id],
470                )?;
471                tx.execute(
472                    "DELETE FROM episode_causes WHERE episode_id = ?1",
473                    params![episode_id],
474                )?;
475                #[cfg(feature = "hnsw")]
476                enqueue_pending_index_op(
477                    tx,
478                    &episodes::episode_item_key(episode_id),
479                    "episode",
480                    PendingIndexOpKind::Delete,
481                )?;
482                #[cfg(feature = "hnsw")]
483                {
484                    report.hnsw_ops += 1;
485                }
486            }
487
488            tx.execute(
489                "DELETE FROM episodes WHERE document_id = ?1",
490                params![doc_id],
491            )?;
492            tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
493        }
494        report.documents = document_ids.len();
495
496        let claim_ids: Vec<String> = {
497            let mut stmt =
498                tx.prepare("SELECT claim_id FROM claim_versions WHERE scope_namespace = ?1")?;
499            let ids = stmt
500                .query_map(params![namespace], |row| row.get(0))?
501                .collect::<Result<Vec<_>, _>>()?;
502            ids
503        };
504
505        let claim_version_ids: Vec<String> = {
506            let mut stmt = tx.prepare(
507                "SELECT claim_version_id FROM claim_versions WHERE scope_namespace = ?1",
508            )?;
509            let ids = stmt
510                .query_map(params![namespace], |row| row.get(0))?
511                .collect::<Result<Vec<_>, _>>()?;
512            ids
513        };
514
515        let relation_version_ids: Vec<String> = {
516            let mut stmt = tx.prepare(
517                "SELECT relation_version_id FROM relation_versions WHERE scope_namespace = ?1",
518            )?;
519            let ids = stmt
520                .query_map(params![namespace], |row| row.get(0))?
521                .collect::<Result<Vec<_>, _>>()?;
522            ids
523        };
524
525        let alias_entity_ids: Vec<String> = {
526            let mut stmt = tx.prepare(
527                "SELECT canonical_entity_id FROM entity_aliases WHERE scope_namespace = ?1",
528            )?;
529            let ids = stmt
530                .query_map(params![namespace], |row| row.get(0))?
531                .collect::<Result<Vec<_>, _>>()?;
532            ids
533        };
534
535        let evidence_handles: Vec<String> = {
536            let mut stmt = tx.prepare(
537                "SELECT er.fetch_handle FROM evidence_refs er
538                 JOIN projection_import_log pil ON er.source_envelope_id = pil.source_envelope_id
539                 WHERE pil.scope_namespace = ?1",
540            )?;
541            let handles = stmt
542                .query_map(params![namespace], |row| row.get(0))?
543                .collect::<Result<Vec<_>, _>>()?;
544            handles
545        };
546
547        let episode_ids: Vec<String> = {
548            let mut stmt = tx.prepare(
549                "SELECT episode_id FROM episode_links
550                 WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
551            )?;
552            let ids = stmt
553                .query_map(params![namespace], |row| row.get(0))?
554                .collect::<Result<Vec<_>, _>>()?;
555            ids
556        };
557
558        delete_derivation_edges_for_ids("claim", &claim_ids)?;
559        delete_derivation_edges_for_ids("claim_version", &claim_version_ids)?;
560        delete_derivation_edges_for_ids("relation_version", &relation_version_ids)?;
561        delete_derivation_edges_for_ids("entity", &alias_entity_ids)?;
562        delete_derivation_edges_for_ids("evidence_ref", &evidence_handles)?;
563        delete_derivation_edges_for_ids("episode", &episode_ids)?;
564
565        report.projection_rows += tx.execute(
566            "DELETE FROM claim_versions WHERE scope_namespace = ?1",
567            params![namespace],
568        )?;
569        report.projection_rows += tx.execute(
570            "DELETE FROM relation_versions WHERE scope_namespace = ?1",
571            params![namespace],
572        )?;
573        report.projection_rows += tx.execute(
574            "DELETE FROM entity_aliases WHERE scope_namespace = ?1",
575            params![namespace],
576        )?;
577        report.projection_rows += tx.execute(
578            "DELETE FROM evidence_refs
579             WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
580            params![namespace],
581        )?;
582        report.projection_rows += tx.execute(
583            "DELETE FROM episode_links
584             WHERE source_envelope_id IN (SELECT source_envelope_id FROM projection_import_log WHERE scope_namespace = ?1)",
585            params![namespace],
586        )?;
587        report.projection_rows += tx.execute(
588            "DELETE FROM projection_import_failures WHERE scope_namespace = ?1",
589            params![namespace],
590        )?;
591        report.projection_rows += tx.execute(
592            "DELETE FROM projection_import_log WHERE scope_namespace = ?1",
593            params![namespace],
594        )?;
595
596        Ok(report)
597    })
598}
599
600/// Get a fact by ID.
601pub fn get_fact(conn: &Connection, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
602    let result = conn.query_row(
603        "SELECT id, namespace, content, source, created_at, updated_at, metadata
604         FROM facts WHERE id = ?1",
605        params![fact_id],
606        |row| {
607            Ok((
608                row.get::<_, String>(0)?,
609                row.get::<_, String>(1)?,
610                row.get::<_, String>(2)?,
611                row.get::<_, Option<String>>(3)?,
612                row.get::<_, String>(4)?,
613                row.get::<_, String>(5)?,
614                row.get::<_, Option<String>>(6)?,
615            ))
616        },
617    );
618
619    match result {
620        Ok((id, namespace, content, source, created_at, updated_at, metadata_raw)) => {
621            Ok(Some(Fact {
622                metadata: parse_optional_json("facts", &id, "metadata", metadata_raw.as_deref())?,
623                id,
624                namespace,
625                content,
626                source,
627                created_at,
628                updated_at,
629            }))
630        }
631        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
632        Err(err) => Err(MemoryError::Database(err)),
633    }
634}
635
636/// Get a fact embedding vector.
637pub fn get_fact_embedding(
638    conn: &Connection,
639    fact_id: &str,
640) -> Result<Option<Vec<f32>>, MemoryError> {
641    let result: Result<Option<Vec<u8>>, _> = conn.query_row(
642        "SELECT embedding FROM facts WHERE id = ?1",
643        params![fact_id],
644        |row| row.get(0),
645    );
646
647    match result {
648        Ok(Some(bytes)) => Ok(Some(bytes_to_embedding(&bytes)?)),
649        Ok(None) => Ok(None),
650        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
651        Err(err) => Err(MemoryError::Database(err)),
652    }
653}
654
655/// List facts within a namespace.
656pub fn list_facts(
657    conn: &Connection,
658    namespace: &str,
659    limit: usize,
660    offset: usize,
661) -> Result<Vec<Fact>, MemoryError> {
662    let mut stmt = conn.prepare(
663        "SELECT id, namespace, content, source, created_at, updated_at, metadata
664         FROM facts
665         WHERE namespace = ?1
666         ORDER BY updated_at DESC
667         LIMIT ?2 OFFSET ?3",
668    )?;
669
670    let facts = stmt
671        .query_map(params![namespace, limit as i64, offset as i64], |row| {
672            Ok((
673                row.get::<_, String>(0)?,
674                row.get::<_, String>(1)?,
675                row.get::<_, String>(2)?,
676                row.get::<_, Option<String>>(3)?,
677                row.get::<_, String>(4)?,
678                row.get::<_, String>(5)?,
679                row.get::<_, Option<String>>(6)?,
680            ))
681        })?
682        .collect::<Result<Vec<_>, _>>()?
683        .into_iter()
684        .map(
685            |(id, namespace, content, source, created_at, updated_at, metadata_raw)| {
686                Ok(Fact {
687                    metadata: parse_optional_json(
688                        "facts",
689                        &id,
690                        "metadata",
691                        metadata_raw.as_deref(),
692                    )?,
693                    id,
694                    namespace,
695                    content,
696                    source,
697                    created_at,
698                    updated_at,
699                })
700            },
701        )
702        .collect::<Result<Vec<_>, MemoryError>>()?;
703
704    Ok(facts)
705}
706
707impl MemoryStore {
708    /// Store a fact with automatic embedding. Returns the fact ID (UUID v4).
709    pub async fn add_fact(
710        &self,
711        namespace: &str,
712        content: &str,
713        source: Option<&str>,
714        metadata: Option<serde_json::Value>,
715    ) -> Result<String, MemoryError> {
716        self.add_fact_with_trace(namespace, content, source, metadata, None)
717            .await
718    }
719
720    /// Store a fact with automatic embedding and optional trace metadata.
721    pub async fn add_fact_with_trace(
722        &self,
723        namespace: &str,
724        content: &str,
725        source: Option<&str>,
726        metadata: Option<serde_json::Value>,
727        trace_ctx: Option<&TraceCtx>,
728    ) -> Result<String, MemoryError> {
729        self.validate_content("fact.content", content)?;
730
731        let embedding = self.embed_text_internal(content).await?;
732        self.validate_embedding_dimensions(&embedding)?;
733        let embedding_bytes = db::embedding_to_bytes(&embedding);
734        let fact_id = uuid::Uuid::new_v4().to_string();
735        let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
736
737        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
738        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
739        let q8_bytes = quantizer
740            .quantize(&embedding)
741            .map(|qv| quantize::pack_quantized(&qv))
742            .ok();
743
744        let ns = namespace.to_string();
745        let ct = content.to_string();
746        let fid = fact_id.clone();
747        let src = source.map(|s| s.to_string());
748        let meta = merge_trace_ctx(metadata, trace_ctx);
749        self.with_write_conn(move |conn| {
750            let current_count: usize = conn.query_row(
751                "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
752                rusqlite::params![&ns],
753                |row| row.get(0),
754            )?;
755            if current_count >= max_facts_per_namespace {
756                return Err(MemoryError::NamespaceFull {
757                    namespace: ns.clone(),
758                    count: current_count,
759                    limit: max_facts_per_namespace,
760                });
761            }
762            insert_fact_with_fts_q8(
763                conn,
764                &fid,
765                &ns,
766                &ct,
767                &embedding_bytes,
768                q8_bytes.as_deref(),
769                src.as_deref(),
770                meta.as_ref(),
771            )
772        })
773        .await?;
774
775        #[cfg(feature = "hnsw")]
776        self.sync_pending_hnsw_ops_best_effort("add_fact").await;
777
778        Ok(fact_id)
779    }
780
781    /// Store a fact with a pre-computed embedding.
782    pub async fn add_fact_with_embedding(
783        &self,
784        namespace: &str,
785        content: &str,
786        embedding: &[f32],
787        source: Option<&str>,
788        metadata: Option<serde_json::Value>,
789    ) -> Result<String, MemoryError> {
790        self.add_fact_with_embedding_and_trace(
791            namespace, content, embedding, source, metadata, None,
792        )
793        .await
794    }
795
796    /// Store a fact with a pre-computed embedding and optional trace metadata.
797    pub async fn add_fact_with_embedding_and_trace(
798        &self,
799        namespace: &str,
800        content: &str,
801        embedding: &[f32],
802        source: Option<&str>,
803        metadata: Option<serde_json::Value>,
804        trace_ctx: Option<&TraceCtx>,
805    ) -> Result<String, MemoryError> {
806        self.validate_content("fact.content", content)?;
807        self.validate_embedding_dimensions(embedding)?;
808        let embedding_bytes = db::embedding_to_bytes(embedding);
809        let fact_id = uuid::Uuid::new_v4().to_string();
810        let max_facts_per_namespace = self.inner.config.limits.max_facts_per_namespace;
811
812        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
813        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
814        let q8_bytes = quantizer
815            .quantize(embedding)
816            .map(|qv| quantize::pack_quantized(&qv))
817            .ok();
818
819        let ns = namespace.to_string();
820        let ct = content.to_string();
821        let fid = fact_id.clone();
822        let src = source.map(|s| s.to_string());
823        let meta = merge_trace_ctx(metadata, trace_ctx);
824        self.with_write_conn(move |conn| {
825            let current_count: usize = conn.query_row(
826                "SELECT COUNT(*) FROM facts WHERE namespace = ?1",
827                rusqlite::params![&ns],
828                |row| row.get(0),
829            )?;
830            if current_count >= max_facts_per_namespace {
831                return Err(MemoryError::NamespaceFull {
832                    namespace: ns.clone(),
833                    count: current_count,
834                    limit: max_facts_per_namespace,
835                });
836            }
837            insert_fact_with_fts_q8(
838                conn,
839                &fid,
840                &ns,
841                &ct,
842                &embedding_bytes,
843                q8_bytes.as_deref(),
844                src.as_deref(),
845                meta.as_ref(),
846            )
847        })
848        .await?;
849
850        #[cfg(feature = "hnsw")]
851        self.sync_pending_hnsw_ops_best_effort("add_fact_with_embedding")
852            .await;
853
854        Ok(fact_id)
855    }
856
857    /// Update a fact's content. Re-embeds automatically.
858    pub async fn update_fact(&self, fact_id: &str, content: &str) -> Result<(), MemoryError> {
859        self.validate_content("fact.content", content)?;
860        let embedding = self.embed_text_internal(content).await?;
861        self.validate_embedding_dimensions(&embedding)?;
862        let embedding_bytes = db::embedding_to_bytes(&embedding);
863        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
864        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
865            .quantize(&embedding)
866            .map(|qv| quantize::pack_quantized(&qv))
867            .ok();
868
869        let fid = fact_id.to_string();
870        let ct = content.to_string();
871        self.with_write_conn(move |conn| {
872            update_fact_with_fts(conn, &fid, &ct, &embedding_bytes, q8_bytes.as_deref())
873        })
874        .await?;
875
876        #[cfg(feature = "hnsw")]
877        self.sync_pending_hnsw_ops_best_effort("update_fact").await;
878
879        Ok(())
880    }
881
882    /// Delete a fact by ID.
883    pub async fn delete_fact(&self, fact_id: &str) -> Result<(), MemoryError> {
884        let fid = fact_id.to_string();
885        self.with_write_conn(move |conn| delete_fact_with_fts(conn, &fid))
886            .await?;
887
888        #[cfg(feature = "hnsw")]
889        self.sync_pending_hnsw_ops_best_effort("delete_fact").await;
890
891        Ok(())
892    }
893
894    /// Delete all memory in a namespace and return a per-surface report.
895    pub async fn delete_namespace(
896        &self,
897        namespace: &str,
898    ) -> Result<NamespaceDeleteReport, MemoryError> {
899        let ns = namespace.to_string();
900        let count = self
901            .with_write_conn(move |conn| delete_namespace(conn, &ns))
902            .await?;
903
904        #[cfg(feature = "hnsw")]
905        self.sync_pending_hnsw_ops_best_effort("delete_namespace")
906            .await;
907
908        Ok(count)
909    }
910
911    /// Get a fact by ID.
912    pub async fn get_fact(&self, fact_id: &str) -> Result<Option<Fact>, MemoryError> {
913        let fid = fact_id.to_string();
914        self.with_read_conn(move |conn| get_fact(conn, &fid)).await
915    }
916
917    /// Get a fact's embedding vector.
918    pub async fn get_fact_embedding(&self, fact_id: &str) -> Result<Option<Vec<f32>>, MemoryError> {
919        let fid = fact_id.to_string();
920        self.with_read_conn(move |conn| get_fact_embedding(conn, &fid))
921            .await
922    }
923
924    /// List all facts in a namespace.
925    pub async fn list_facts(
926        &self,
927        namespace: &str,
928        limit: usize,
929        offset: usize,
930    ) -> Result<Vec<Fact>, MemoryError> {
931        let ns = namespace.to_string();
932        self.with_read_conn(move |conn| list_facts(conn, &ns, limit, offset))
933            .await
934    }
935}