Skip to main content

semantic_memory/
documents.rs

1//! Document ingestion pipeline: chunk, embed, store, and queue sidecar updates.
2
3use crate::chunker;
4use crate::db;
5#[cfg(feature = "hnsw")]
6use crate::db::IndexOpKind;
7use crate::error::MemoryError;
8use crate::quantize::{self, Quantizer};
9use crate::types::{Document, SearchResult, SearchSource};
10use crate::{merge_trace_ctx, MemoryStore};
11use rusqlite::{params, Connection};
12use stack_ids::ScopeKey;
13use stack_ids::TraceCtx;
14use std::collections::{BTreeMap, BTreeSet};
15
16/// A single chunk to insert: `(content, embedding_bytes, q8_bytes, token_count_estimate)`.
17pub type ChunkRow = (String, Vec<u8>, Option<Vec<u8>>, usize);
18
19pub fn insert_document_with_chunks(
20    conn: &Connection,
21    doc_id: &str,
22    title: &str,
23    namespace: &str,
24    source_path: Option<&str>,
25    metadata: Option<&serde_json::Value>,
26    chunks: &[ChunkRow],
27) -> Result<Vec<String>, MemoryError> {
28    let chunk_ids: Vec<String> = (0..chunks.len())
29        .map(|_| uuid::Uuid::new_v4().to_string())
30        .collect();
31    insert_document_with_chunks_and_ids(
32        conn,
33        doc_id,
34        title,
35        namespace,
36        source_path,
37        metadata,
38        chunks,
39        &chunk_ids,
40    )?;
41    Ok(chunk_ids)
42}
43
44#[allow(clippy::too_many_arguments)]
45pub fn insert_document_with_chunks_and_ids(
46    conn: &Connection,
47    doc_id: &str,
48    title: &str,
49    namespace: &str,
50    source_path: Option<&str>,
51    metadata: Option<&serde_json::Value>,
52    chunks: &[ChunkRow],
53    chunk_ids: &[String],
54) -> Result<(), MemoryError> {
55    if chunks.len() != chunk_ids.len() {
56        return Err(MemoryError::Other(
57            "chunks and chunk_ids must have the same length".to_string(),
58        ));
59    }
60
61    let metadata_str = metadata.map(|value| value.to_string());
62    db::with_transaction(conn, |tx| {
63        tx.execute(
64            "INSERT INTO documents (id, title, source_path, namespace, metadata)
65             VALUES (?1, ?2, ?3, ?4, ?5)",
66            params![doc_id, title, source_path, namespace, metadata_str],
67        )?;
68
69        for (chunk_index, ((content, embedding_bytes, q8_bytes, token_count), chunk_id)) in
70            chunks.iter().zip(chunk_ids.iter()).enumerate()
71        {
72            tx.execute(
73                "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8)
74                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
75                params![
76                    chunk_id,
77                    doc_id,
78                    chunk_index as i64,
79                    content,
80                    *token_count as i64,
81                    embedding_bytes,
82                    q8_bytes.as_deref()
83                ],
84            )?;
85
86            tx.execute(
87                "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
88                params![chunk_id],
89            )?;
90            let fts_rowid = tx.last_insert_rowid();
91            tx.execute(
92                "INSERT INTO chunks_fts (rowid, content) VALUES (?1, ?2)",
93                params![fts_rowid, content],
94            )?;
95
96            #[cfg(feature = "hnsw")]
97            db::queue_pending_index_op(
98                tx,
99                &format!("chunk:{}", chunk_id),
100                "chunk",
101                IndexOpKind::Upsert,
102            )?;
103        }
104
105        Ok(())
106    })
107}
108
109pub fn delete_document_with_chunks(
110    conn: &Connection,
111    document_id: &str,
112) -> Result<Vec<String>, MemoryError> {
113    db::with_transaction(conn, |tx| {
114        let mut stmt = tx.prepare(
115            "SELECT c.id, c.content, cm.rowid
116             FROM chunks c
117             JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
118             WHERE c.document_id = ?1",
119        )?;
120        let chunk_rows: Vec<(String, String, i64)> = stmt
121            .query_map(params![document_id], |row| {
122                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
123            })?
124            .collect::<Result<Vec<_>, _>>()?;
125
126        let chunk_ids: Vec<String> = chunk_rows.iter().map(|(id, _, _)| id.clone()).collect();
127
128        for (chunk_id, content, fts_rowid) in &chunk_rows {
129            tx.execute(
130                "INSERT INTO chunks_fts (chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
131                params![fts_rowid, content],
132            )?;
133            tx.execute(
134                "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
135                params![chunk_id],
136            )?;
137            #[cfg(feature = "hnsw")]
138            db::queue_pending_index_op(
139                tx,
140                &format!("chunk:{}", chunk_id),
141                "chunk",
142                IndexOpKind::Delete,
143            )?;
144        }
145
146        tx.execute(
147            "DELETE FROM chunks WHERE document_id = ?1",
148            params![document_id],
149        )?;
150        let affected = tx.execute("DELETE FROM documents WHERE id = ?1", params![document_id])?;
151        if affected == 0 {
152            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
153        }
154
155        Ok(chunk_ids)
156    })
157}
158
159pub fn count_chunks_for_document(
160    conn: &Connection,
161    document_id: &str,
162) -> Result<usize, MemoryError> {
163    let count: i64 = conn.query_row(
164        "SELECT COUNT(*) FROM chunks WHERE document_id = ?1",
165        params![document_id],
166        |row| row.get(0),
167    )?;
168    Ok(count as usize)
169}
170
171pub fn list_documents(
172    conn: &Connection,
173    namespace: &str,
174    limit: usize,
175    offset: usize,
176) -> Result<Vec<Document>, MemoryError> {
177    let mut stmt = conn.prepare(
178        "SELECT d.id, d.title, d.source_path, d.namespace, d.created_at, d.metadata,
179                (SELECT COUNT(*) FROM chunks c WHERE c.document_id = d.id) AS chunk_count
180         FROM documents d
181         WHERE d.namespace = ?1
182         ORDER BY d.created_at DESC
183         LIMIT ?2 OFFSET ?3",
184    )?;
185
186    let rows = stmt
187        .query_map(params![namespace, limit as i64, offset as i64], |row| {
188            Ok((
189                row.get::<_, String>(0)?,
190                row.get::<_, String>(1)?,
191                row.get::<_, Option<String>>(2)?,
192                row.get::<_, String>(3)?,
193                row.get::<_, String>(4)?,
194                row.get::<_, Option<String>>(5)?,
195                row.get::<_, i64>(6)? as u32,
196            ))
197        })?
198        .collect::<Result<Vec<_>, _>>()?;
199
200    rows.into_iter()
201        .map(
202            |(id, title, source_path, namespace, created_at, metadata_raw, chunk_count)| {
203                Ok(Document {
204                    metadata: db::parse_optional_json(
205                        "documents",
206                        &id,
207                        "metadata",
208                        metadata_raw.as_deref(),
209                    )?,
210                    id,
211                    title,
212                    source_path,
213                    namespace,
214                    created_at,
215                    chunk_count,
216                })
217            },
218        )
219        .collect()
220}
221
222fn document_scope_keys_for_ids(
223    conn: &Connection,
224    document_ids: &[String],
225) -> Result<BTreeMap<String, ScopeKey>, MemoryError> {
226    if document_ids.is_empty() {
227        return Ok(BTreeMap::new());
228    }
229
230    let placeholders = (0..document_ids.len())
231        .map(|_| "?")
232        .collect::<Vec<_>>()
233        .join(", ");
234    let sql = format!("SELECT id, namespace, metadata FROM documents WHERE id IN ({placeholders})");
235    let params: Vec<&str> = document_ids.iter().map(|id| id.as_str()).collect();
236    let mut stmt = conn.prepare(&sql)?;
237    let rows = stmt
238        .query_map(rusqlite::params_from_iter(&params), |row| {
239            Ok((
240                row.get::<_, String>(0)?,
241                row.get::<_, String>(1)?,
242                row.get::<_, Option<String>>(2)?,
243            ))
244        })?
245        .collect::<Result<Vec<_>, _>>()?;
246
247    let mut by_id = BTreeMap::new();
248    for (id, namespace, metadata_raw) in rows {
249        let metadata =
250            db::parse_optional_json("documents", &id, "metadata", metadata_raw.as_deref())?;
251        let scope_key = ScopeKey {
252            namespace,
253            domain: metadata
254                .as_ref()
255                .and_then(|value| value.get("scope_domain"))
256                .and_then(|value| value.as_str())
257                .map(str::to_string),
258            workspace_id: metadata
259                .as_ref()
260                .and_then(|value| value.get("scope_workspace_id"))
261                .and_then(|value| value.as_str())
262                .map(str::to_string),
263            repo_id: metadata
264                .as_ref()
265                .and_then(|value| value.get("scope_repo_id"))
266                .and_then(|value| value.as_str())
267                .map(str::to_string),
268        };
269        by_id.insert(id, scope_key);
270    }
271
272    Ok(by_id)
273}
274
275impl MemoryStore {
276    /// Ingest a document: chunk, embed all chunks, store everything.
277    pub async fn ingest_document(
278        &self,
279        title: &str,
280        content: &str,
281        namespace: &str,
282        source_path: Option<&str>,
283        metadata: Option<serde_json::Value>,
284    ) -> Result<String, MemoryError> {
285        self.ingest_document_with_trace(title, content, namespace, source_path, metadata, None)
286            .await
287    }
288
289    /// Ingest a document with optional trace metadata.
290    pub async fn ingest_document_with_trace(
291        &self,
292        title: &str,
293        content: &str,
294        namespace: &str,
295        source_path: Option<&str>,
296        metadata: Option<serde_json::Value>,
297        trace_ctx: Option<&TraceCtx>,
298    ) -> Result<String, MemoryError> {
299        self.validate_content("document.content", content)?;
300
301        let text_chunks = chunker::chunk_text(
302            content,
303            &self.inner.config.chunking,
304            self.inner.token_counter.as_ref(),
305        );
306
307        let max_chunks = self.inner.config.limits.max_chunks_per_document;
308        if text_chunks.len() > max_chunks {
309            return Err(MemoryError::ContentTooLarge {
310                size: text_chunks.len(),
311                limit: max_chunks,
312            });
313        }
314
315        let chunk_texts: Vec<String> = text_chunks.iter().map(|c| c.content.clone()).collect();
316        let embeddings = self.embed_batch_internal(chunk_texts).await?;
317        for embedding in &embeddings {
318            self.validate_embedding_dimensions(embedding)?;
319        }
320
321        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
322        let chunks: Vec<ChunkRow> = text_chunks
323            .iter()
324            .zip(embeddings.iter())
325            .map(|(tc, emb)| {
326                // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
327                let q8 = quantizer
328                    .quantize(emb)
329                    .map(|qv| quantize::pack_quantized(&qv))
330                    .ok();
331                (
332                    tc.content.clone(),
333                    db::embedding_to_bytes(emb),
334                    q8,
335                    tc.token_count_estimate,
336                )
337            })
338            .collect();
339
340        let doc_id = uuid::Uuid::new_v4().to_string();
341
342        let did = doc_id.clone();
343        let t = title.to_string();
344        let ns = namespace.to_string();
345        let sp = source_path.map(|s| s.to_string());
346        let meta = merge_trace_ctx(metadata, trace_ctx);
347
348        self.with_write_conn(move |conn| {
349            insert_document_with_chunks(conn, &did, &t, &ns, sp.as_deref(), meta.as_ref(), &chunks)
350        })
351        .await?;
352
353        #[cfg(feature = "hnsw")]
354        self.sync_pending_hnsw_ops_best_effort("ingest_document")
355            .await;
356
357        Ok(doc_id)
358    }
359
360    /// Delete a document and all its chunks.
361    pub async fn delete_document(&self, document_id: &str) -> Result<(), MemoryError> {
362        let did = document_id.to_string();
363        self.with_write_conn(move |conn| delete_document_with_chunks(conn, &did))
364            .await?;
365
366        #[cfg(feature = "hnsw")]
367        self.sync_pending_hnsw_ops_best_effort("delete_document")
368            .await;
369
370        Ok(())
371    }
372
373    /// List documents in a namespace.
374    pub async fn list_documents(
375        &self,
376        namespace: &str,
377        limit: usize,
378        offset: usize,
379    ) -> Result<Vec<Document>, MemoryError> {
380        let ns = namespace.to_string();
381        self.with_read_conn(move |conn| list_documents(conn, &ns, limit, offset))
382            .await
383    }
384
385    /// Count the number of chunks for a document.
386    pub async fn count_chunks_for_document(&self, document_id: &str) -> Result<usize, MemoryError> {
387        let did = document_id.to_string();
388        self.with_read_conn(move |conn| count_chunks_for_document(conn, &did))
389            .await
390    }
391
392    /// Filter search results to those whose source scope exactly matches the requested scope.
393    ///
394    /// Only source families that carry or can be joined to full scope metadata are retained:
395    /// chunks, episodes, and imported projection rows. Facts and messages are excluded because
396    /// they do not carry domain/workspace/repo provenance.
397    pub async fn filter_search_results_by_scope(
398        &self,
399        results: Vec<SearchResult>,
400        scope: &ScopeKey,
401    ) -> Result<Vec<SearchResult>, MemoryError> {
402        let mut document_ids = BTreeSet::new();
403        for result in &results {
404            match &result.source {
405                SearchSource::Chunk { document_id, .. }
406                | SearchSource::Episode { document_id, .. } => {
407                    document_ids.insert(document_id.clone());
408                }
409                _ => {}
410            }
411        }
412
413        let document_ids = document_ids.into_iter().collect::<Vec<_>>();
414        let scope_by_document = self
415            .with_read_conn(move |conn| document_scope_keys_for_ids(conn, &document_ids))
416            .await?;
417        let requested = scope.clone();
418
419        Ok(results
420            .into_iter()
421            .filter(|result| match &result.source {
422                SearchSource::Chunk { document_id, .. }
423                | SearchSource::Episode { document_id, .. } => scope_by_document
424                    .get(document_id)
425                    .map(|scope_key| scope_key == &requested)
426                    .unwrap_or(false),
427                SearchSource::Projection { scope_key, .. } => scope_key == &requested,
428                SearchSource::Fact { .. } | SearchSource::Message { .. } => false,
429            })
430            .collect())
431    }
432}