Skip to main content

semantic_memory/
documents.rs

1//! Document ingestion pipeline: chunk, embed, store, and queue sidecar updates.
2
3use crate::chunker;
4use crate::db;
5#[cfg(feature = "hnsw")]
6use crate::db::IndexOpKind;
7use crate::error::MemoryError;
8use crate::quantize::{self, Quantizer};
9use crate::types::{
10    ChunkManifestChunkMapping, ChunkManifestEntry, ChunkManifestIngestOptions,
11    ChunkManifestIngestResult, Document, SearchResult, SearchSource,
12};
13use crate::{merge_trace_ctx, MemoryStore};
14use rusqlite::{params, Connection};
15use stack_ids::ScopeKey;
16use stack_ids::TraceCtx;
17use std::collections::{BTreeMap, BTreeSet};
18
19/// A single chunk to insert: `(content, embedding_bytes, q8_bytes, token_count_estimate)`.
20pub type ChunkRow = (String, Vec<u8>, Option<Vec<u8>>, usize);
21
22pub fn insert_document_with_chunks(
23    conn: &Connection,
24    doc_id: &str,
25    title: &str,
26    namespace: &str,
27    source_path: Option<&str>,
28    metadata: Option<&serde_json::Value>,
29    chunks: &[ChunkRow],
30) -> Result<Vec<String>, MemoryError> {
31    let chunk_ids: Vec<String> = (0..chunks.len())
32        .map(|_| uuid::Uuid::new_v4().to_string())
33        .collect();
34    insert_document_with_chunks_and_ids(
35        conn,
36        doc_id,
37        title,
38        namespace,
39        source_path,
40        metadata,
41        chunks,
42        &chunk_ids,
43    )?;
44    Ok(chunk_ids)
45}
46
47#[allow(clippy::too_many_arguments)]
48pub fn insert_document_with_chunks_and_ids(
49    conn: &Connection,
50    doc_id: &str,
51    title: &str,
52    namespace: &str,
53    source_path: Option<&str>,
54    metadata: Option<&serde_json::Value>,
55    chunks: &[ChunkRow],
56    chunk_ids: &[String],
57) -> Result<(), MemoryError> {
58    if chunks.len() != chunk_ids.len() {
59        return Err(MemoryError::Other(
60            "chunks and chunk_ids must have the same length".to_string(),
61        ));
62    }
63
64    let metadata_str = metadata.map(|value| value.to_string());
65    db::with_transaction(conn, |tx| {
66        tx.execute(
67            "INSERT INTO documents (id, title, source_path, namespace, metadata)
68             VALUES (?1, ?2, ?3, ?4, ?5)",
69            params![doc_id, title, source_path, namespace, metadata_str],
70        )?;
71
72        for (chunk_index, ((content, embedding_bytes, q8_bytes, token_count), chunk_id)) in
73            chunks.iter().zip(chunk_ids.iter()).enumerate()
74        {
75            tx.execute(
76                "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8)
77                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
78                params![
79                    chunk_id,
80                    doc_id,
81                    chunk_index as i64,
82                    content,
83                    *token_count as i64,
84                    embedding_bytes,
85                    q8_bytes.as_deref()
86                ],
87            )?;
88
89            tx.execute(
90                "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
91                params![chunk_id],
92            )?;
93            let fts_rowid = tx.last_insert_rowid();
94            tx.execute(
95                "INSERT INTO chunks_fts (rowid, content) VALUES (?1, ?2)",
96                params![fts_rowid, content],
97            )?;
98
99            #[cfg(feature = "hnsw")]
100            db::queue_pending_index_op(
101                tx,
102                &format!("chunk:{}", chunk_id),
103                "chunk",
104                IndexOpKind::Upsert,
105            )?;
106            db::invalidate_derived_vector_artifact(tx, &format!("chunk:{chunk_id}"))?;
107        }
108
109        Ok(())
110    })
111}
112
113pub fn delete_document_with_chunks(
114    conn: &Connection,
115    document_id: &str,
116) -> Result<Vec<String>, MemoryError> {
117    db::with_transaction(conn, |tx| {
118        let episode_rows: Vec<(String, String, i64)> = {
119            let mut stmt = tx.prepare(
120                "SELECT e.episode_id, e.search_text, erm.rowid
121                 FROM episodes e
122                 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
123                 WHERE e.document_id = ?1",
124            )?;
125            let rows = stmt.query_map(params![document_id], |row| {
126                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
127            })?;
128            rows.collect::<Result<Vec<_>, _>>()?
129        };
130
131        for (episode_id, search_text, fts_rowid) in &episode_rows {
132            tx.execute(
133                "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
134                params![fts_rowid, search_text],
135            )?;
136            tx.execute(
137                "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
138                params![episode_id],
139            )?;
140            tx.execute(
141                "DELETE FROM episode_causes WHERE episode_id = ?1",
142                params![episode_id],
143            )?;
144            #[cfg(feature = "hnsw")]
145            db::queue_pending_index_op(
146                tx,
147                &crate::episodes::episode_item_key(episode_id),
148                "episode",
149                IndexOpKind::Delete,
150            )?;
151            db::invalidate_derived_vector_artifact(
152                tx,
153                &crate::episodes::episode_item_key(episode_id),
154            )?;
155        }
156        tx.execute(
157            "DELETE FROM episodes WHERE document_id = ?1",
158            params![document_id],
159        )?;
160
161        let mut stmt = tx.prepare(
162            "SELECT c.id, c.content, cm.rowid
163             FROM chunks c
164             JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
165             WHERE c.document_id = ?1",
166        )?;
167        let chunk_rows: Vec<(String, String, i64)> = stmt
168            .query_map(params![document_id], |row| {
169                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
170            })?
171            .collect::<Result<Vec<_>, _>>()?;
172
173        let chunk_ids: Vec<String> = chunk_rows.iter().map(|(id, _, _)| id.clone()).collect();
174
175        for (chunk_id, content, fts_rowid) in &chunk_rows {
176            tx.execute(
177                "INSERT INTO chunks_fts (chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
178                params![fts_rowid, content],
179            )?;
180            tx.execute(
181                "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
182                params![chunk_id],
183            )?;
184            #[cfg(feature = "hnsw")]
185            db::queue_pending_index_op(
186                tx,
187                &format!("chunk:{}", chunk_id),
188                "chunk",
189                IndexOpKind::Delete,
190            )?;
191            db::invalidate_derived_vector_artifact(tx, &format!("chunk:{chunk_id}"))?;
192        }
193
194        tx.execute(
195            "DELETE FROM chunks WHERE document_id = ?1",
196            params![document_id],
197        )?;
198        let affected = tx.execute("DELETE FROM documents WHERE id = ?1", params![document_id])?;
199        if affected == 0 {
200            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
201        }
202
203        Ok(chunk_ids)
204    })
205}
206
207pub fn count_chunks_for_document(
208    conn: &Connection,
209    document_id: &str,
210) -> Result<usize, MemoryError> {
211    let count: i64 = conn.query_row(
212        "SELECT COUNT(*) FROM chunks WHERE document_id = ?1",
213        params![document_id],
214        |row| row.get(0),
215    )?;
216    Ok(count as usize)
217}
218
219pub fn list_documents(
220    conn: &Connection,
221    namespace: &str,
222    limit: usize,
223    offset: usize,
224) -> Result<Vec<Document>, MemoryError> {
225    let mut stmt = conn.prepare(
226        "SELECT d.id, d.title, d.source_path, d.namespace, d.created_at, d.metadata,
227                (SELECT COUNT(*) FROM chunks c WHERE c.document_id = d.id) AS chunk_count
228         FROM documents d
229         WHERE d.namespace = ?1
230         ORDER BY d.created_at DESC
231         LIMIT ?2 OFFSET ?3",
232    )?;
233
234    let rows = stmt
235        .query_map(params![namespace, limit as i64, offset as i64], |row| {
236            Ok((
237                row.get::<_, String>(0)?,
238                row.get::<_, String>(1)?,
239                row.get::<_, Option<String>>(2)?,
240                row.get::<_, String>(3)?,
241                row.get::<_, String>(4)?,
242                row.get::<_, Option<String>>(5)?,
243                row.get::<_, i64>(6)? as u32,
244            ))
245        })?
246        .collect::<Result<Vec<_>, _>>()?;
247
248    rows.into_iter()
249        .map(
250            |(id, title, source_path, namespace, created_at, metadata_raw, chunk_count)| {
251                Ok(Document {
252                    metadata: db::parse_optional_json(
253                        "documents",
254                        &id,
255                        "metadata",
256                        metadata_raw.as_deref(),
257                    )?,
258                    id,
259                    title,
260                    source_path,
261                    namespace,
262                    created_at,
263                    chunk_count,
264                })
265            },
266        )
267        .collect()
268}
269
270fn document_scope_keys_for_ids(
271    conn: &Connection,
272    document_ids: &[String],
273) -> Result<BTreeMap<String, ScopeKey>, MemoryError> {
274    if document_ids.is_empty() {
275        return Ok(BTreeMap::new());
276    }
277
278    let placeholders = (0..document_ids.len())
279        .map(|_| "?")
280        .collect::<Vec<_>>()
281        .join(", ");
282    let sql = format!("SELECT id, namespace, metadata FROM documents WHERE id IN ({placeholders})");
283    let params: Vec<&str> = document_ids.iter().map(|id| id.as_str()).collect();
284    let mut stmt = conn.prepare(&sql)?;
285    let rows = stmt
286        .query_map(rusqlite::params_from_iter(&params), |row| {
287            Ok((
288                row.get::<_, String>(0)?,
289                row.get::<_, String>(1)?,
290                row.get::<_, Option<String>>(2)?,
291            ))
292        })?
293        .collect::<Result<Vec<_>, _>>()?;
294
295    let mut by_id = BTreeMap::new();
296    for (id, namespace, metadata_raw) in rows {
297        let metadata =
298            db::parse_optional_json("documents", &id, "metadata", metadata_raw.as_deref())?;
299        let scope_key = ScopeKey {
300            namespace,
301            domain: metadata
302                .as_ref()
303                .and_then(|value| value.get("scope_domain"))
304                .and_then(|value| value.as_str())
305                .map(str::to_string),
306            workspace_id: metadata
307                .as_ref()
308                .and_then(|value| value.get("scope_workspace_id"))
309                .and_then(|value| value.as_str())
310                .map(str::to_string),
311            repo_id: metadata
312                .as_ref()
313                .and_then(|value| value.get("scope_repo_id"))
314                .and_then(|value| value.as_str())
315                .map(str::to_string),
316        };
317        by_id.insert(id, scope_key);
318    }
319
320    Ok(by_id)
321}
322
323impl MemoryStore {
324    /// Ingest a document: chunk, embed all chunks, store everything.
325    pub async fn ingest_document(
326        &self,
327        title: &str,
328        content: &str,
329        namespace: &str,
330        source_path: Option<&str>,
331        metadata: Option<serde_json::Value>,
332    ) -> Result<String, MemoryError> {
333        self.ingest_document_with_trace(title, content, namespace, source_path, metadata, None)
334            .await
335    }
336
337    /// Ingest a document with optional trace metadata.
338    pub async fn ingest_document_with_trace(
339        &self,
340        title: &str,
341        content: &str,
342        namespace: &str,
343        source_path: Option<&str>,
344        metadata: Option<serde_json::Value>,
345        trace_ctx: Option<&TraceCtx>,
346    ) -> Result<String, MemoryError> {
347        self.validate_content("document.content", content)?;
348
349        let text_chunks = chunker::chunk_text(
350            content,
351            &self.inner.config.chunking,
352            self.inner.token_counter.as_ref(),
353        );
354
355        let max_chunks = self.inner.config.limits.max_chunks_per_document;
356        if text_chunks.len() > max_chunks {
357            return Err(MemoryError::ContentTooLarge {
358                size: text_chunks.len(),
359                limit: max_chunks,
360            });
361        }
362
363        let chunk_texts: Vec<String> = text_chunks.iter().map(|c| c.content.clone()).collect();
364        let embeddings = self.embed_batch_internal(chunk_texts).await?;
365        for embedding in &embeddings {
366            self.validate_embedding_dimensions(embedding)?;
367        }
368
369        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
370        let chunks: Vec<ChunkRow> = text_chunks
371            .iter()
372            .zip(embeddings.iter())
373            .map(|(tc, emb)| {
374                // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
375                let q8 = quantizer
376                    .quantize(emb)
377                    .map(|qv| quantize::pack_quantized(&qv))
378                    .ok();
379                (
380                    tc.content.clone(),
381                    db::embedding_to_bytes(emb),
382                    q8,
383                    tc.token_count_estimate,
384                )
385            })
386            .collect();
387
388        let doc_id = uuid::Uuid::new_v4().to_string();
389
390        let did = doc_id.clone();
391        let t = title.to_string();
392        let ns = namespace.to_string();
393        let sp = source_path.map(|s| s.to_string());
394        let meta = merge_trace_ctx(metadata, trace_ctx);
395
396        self.with_write_conn(move |conn| {
397            insert_document_with_chunks(conn, &did, &t, &ns, sp.as_deref(), meta.as_ref(), &chunks)
398        })
399        .await?;
400
401        #[cfg(feature = "hnsw")]
402        self.sync_pending_hnsw_ops_best_effort("ingest_document")
403            .await;
404
405        Ok(doc_id)
406    }
407
408    /// Ingest an externally chunked document manifest and return exact chunk mappings.
409    ///
410    /// This API preserves semantic-memory as the owner of document/chunk storage and embeddings:
411    /// callers provide chunk boundaries and external IDs, while semantic-memory generates and
412    /// stores its own document/chunk IDs atomically.
413    pub async fn ingest_chunk_manifest(
414        &self,
415        options: ChunkManifestIngestOptions,
416        entries: Vec<ChunkManifestEntry>,
417    ) -> Result<ChunkManifestIngestResult, MemoryError> {
418        self.ingest_chunk_manifest_with_trace(options, entries, None)
419            .await
420    }
421
422    /// Ingest an externally chunked document manifest with optional trace metadata.
423    pub async fn ingest_chunk_manifest_with_trace(
424        &self,
425        options: ChunkManifestIngestOptions,
426        entries: Vec<ChunkManifestEntry>,
427        trace_ctx: Option<&TraceCtx>,
428    ) -> Result<ChunkManifestIngestResult, MemoryError> {
429        if entries.is_empty() {
430            return Err(MemoryError::InvalidConfig {
431                field: "chunk_manifest.entries",
432                reason: "at least one chunk is required".to_string(),
433            });
434        }
435
436        let max_chunks = self.inner.config.limits.max_chunks_per_document;
437        if entries.len() > max_chunks {
438            return Err(MemoryError::ContentTooLarge {
439                size: entries.len(),
440                limit: max_chunks,
441            });
442        }
443
444        let mut seen_external_ids = BTreeSet::new();
445        for (index, entry) in entries.iter().enumerate() {
446            let external_chunk_id = entry.external_chunk_id.trim();
447            if external_chunk_id.is_empty() {
448                return Err(MemoryError::InvalidConfig {
449                    field: "chunk_manifest.external_chunk_id",
450                    reason: format!("chunk {index} external_chunk_id must not be empty"),
451                });
452            }
453            if !seen_external_ids.insert(external_chunk_id.to_string()) {
454                return Err(MemoryError::InvalidConfig {
455                    field: "chunk_manifest.external_chunk_id",
456                    reason: format!("duplicate external_chunk_id '{external_chunk_id}'"),
457                });
458            }
459            if entry.content.trim().is_empty() {
460                return Err(MemoryError::InvalidConfig {
461                    field: "chunk_manifest.content",
462                    reason: format!(
463                        "content must not be empty (chunk index {index}, id='{external_chunk_id}')"
464                    ),
465                });
466            }
467            self.validate_content("chunk_manifest.content", &entry.content)?;
468            if entry
469                .content_digest
470                .as_deref()
471                .is_some_and(|digest| digest.trim().is_empty())
472            {
473                return Err(MemoryError::InvalidConfig {
474                    field: "chunk_manifest.content_digest",
475                    reason: format!("chunk {index} content_digest must not be empty when supplied"),
476                });
477            }
478        }
479
480        let chunk_texts: Vec<String> = entries.iter().map(|entry| entry.content.clone()).collect();
481        let embeddings = self.embed_batch_internal(chunk_texts).await?;
482        for embedding in &embeddings {
483            self.validate_embedding_dimensions(embedding)?;
484        }
485
486        let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
487        let chunks: Vec<ChunkRow> = entries
488            .iter()
489            .zip(embeddings.iter())
490            .map(|(entry, emb)| {
491                let q8 = quantizer
492                    .quantize(emb)
493                    .map(|qv| quantize::pack_quantized(&qv))
494                    .ok();
495                (
496                    entry.content.clone(),
497                    db::embedding_to_bytes(emb),
498                    q8,
499                    entry
500                        .token_count_estimate
501                        .unwrap_or_else(|| entry.content.len().div_ceil(4).max(1)),
502                )
503            })
504            .collect();
505
506        let doc_id = uuid::Uuid::new_v4().to_string();
507        let chunk_ids: Vec<String> = (0..entries.len())
508            .map(|_| uuid::Uuid::new_v4().to_string())
509            .collect();
510        let receipt_id = format!("chunk-manifest:{}", uuid::Uuid::new_v4());
511
512        let mappings: Vec<ChunkManifestChunkMapping> = entries
513            .iter()
514            .zip(chunk_ids.iter())
515            .enumerate()
516            .map(
517                |(chunk_index, (entry, sm_chunk_id))| ChunkManifestChunkMapping {
518                    external_chunk_id: entry.external_chunk_id.clone(),
519                    sm_document_id: doc_id.clone(),
520                    sm_chunk_id: sm_chunk_id.clone(),
521                    chunk_index,
522                    content_digest: entry.content_digest.clone(),
523                    metadata: entry.metadata.clone(),
524                },
525            )
526            .collect();
527
528        let did = doc_id.clone();
529        let title = options.title;
530        let namespace = options.namespace;
531        let source_path = options.source_path;
532        let metadata = merge_trace_ctx(options.metadata, trace_ctx);
533        let namespace_for_result = namespace.clone();
534
535        self.with_write_conn(move |conn| {
536            insert_document_with_chunks_and_ids(
537                conn,
538                &did,
539                &title,
540                &namespace,
541                source_path.as_deref(),
542                metadata.as_ref(),
543                &chunks,
544                &chunk_ids,
545            )
546        })
547        .await?;
548
549        #[cfg(feature = "hnsw")]
550        self.sync_pending_hnsw_ops_best_effort("ingest_chunk_manifest")
551            .await;
552
553        Ok(ChunkManifestIngestResult {
554            sm_document_id: doc_id,
555            namespace: namespace_for_result,
556            receipt_id,
557            chunks: mappings,
558        })
559    }
560
561    /// Delete a document and all its chunks.
562    pub async fn delete_document(&self, document_id: &str) -> Result<(), MemoryError> {
563        let did = document_id.to_string();
564        self.with_write_conn(move |conn| delete_document_with_chunks(conn, &did))
565            .await?;
566
567        #[cfg(feature = "hnsw")]
568        self.sync_pending_hnsw_ops_best_effort("delete_document")
569            .await;
570
571        Ok(())
572    }
573
574    /// List documents in a namespace.
575    pub async fn list_documents(
576        &self,
577        namespace: &str,
578        limit: usize,
579        offset: usize,
580    ) -> Result<Vec<Document>, MemoryError> {
581        let ns = namespace.to_string();
582        self.with_read_conn(move |conn| list_documents(conn, &ns, limit, offset))
583            .await
584    }
585
586    /// Count the number of chunks for a document.
587    pub async fn count_chunks_for_document(&self, document_id: &str) -> Result<usize, MemoryError> {
588        let did = document_id.to_string();
589        self.with_read_conn(move |conn| count_chunks_for_document(conn, &did))
590            .await
591    }
592
593    /// Filter search results to those whose source scope exactly matches the requested scope.
594    ///
595    /// Only source families that carry or can be joined to full scope metadata are retained:
596    /// chunks, episodes, and imported projection rows. Facts and messages are excluded because
597    /// they do not carry domain/workspace/repo provenance.
598    pub async fn filter_search_results_by_scope(
599        &self,
600        results: Vec<SearchResult>,
601        scope: &ScopeKey,
602    ) -> Result<Vec<SearchResult>, MemoryError> {
603        let mut document_ids = BTreeSet::new();
604        for result in &results {
605            match &result.source {
606                SearchSource::Chunk { document_id, .. }
607                | SearchSource::Episode { document_id, .. } => {
608                    document_ids.insert(document_id.clone());
609                }
610                _ => {}
611            }
612        }
613
614        let document_ids = document_ids.into_iter().collect::<Vec<_>>();
615        let scope_by_document = self
616            .with_read_conn(move |conn| document_scope_keys_for_ids(conn, &document_ids))
617            .await?;
618        let requested = scope.clone();
619
620        Ok(results
621            .into_iter()
622            .filter(|result| match &result.source {
623                SearchSource::Chunk { document_id, .. }
624                | SearchSource::Episode { document_id, .. } => scope_by_document
625                    .get(document_id)
626                    .map(|scope_key| scope_key == &requested)
627                    .unwrap_or(false),
628                SearchSource::Projection { scope_key, .. } => scope_key == &requested,
629                SearchSource::Fact { .. } | SearchSource::Message { .. } => false,
630            })
631            .collect())
632    }
633}