Skip to main content

semantic_memory/
documents.rs

1//! Document ingestion pipeline: chunk, embed, store.
2
3use crate::db::with_transaction;
4use crate::error::MemoryError;
5use crate::types::Document;
6use rusqlite::{params, Connection};
7
8/// A single chunk to insert: (content, embedding_bytes, q8_bytes, token_count_estimate).
9pub type ChunkRow = (String, Vec<u8>, Option<Vec<u8>>, usize);
10
11/// Insert a document and all its chunks + FTS entries in a single transaction.
12///
13/// `chunks` is a vec of (content, embedding_bytes, q8_bytes, token_count_estimate).
14pub fn insert_document_with_chunks(
15    conn: &Connection,
16    doc_id: &str,
17    title: &str,
18    namespace: &str,
19    source_path: Option<&str>,
20    metadata: Option<&serde_json::Value>,
21    chunks: &[ChunkRow],
22) -> Result<(), MemoryError> {
23    let metadata_str = metadata.map(|m| m.to_string());
24    with_transaction(conn, |tx| {
25        // Insert document
26        tx.execute(
27            "INSERT INTO documents (id, title, source_path, namespace, metadata) VALUES (?1, ?2, ?3, ?4, ?5)",
28            params![doc_id, title, source_path, namespace, metadata_str],
29        )?;
30
31        // Insert each chunk
32        for (chunk_index, (content, embedding_bytes, q8_bytes, token_count)) in chunks.iter().enumerate() {
33            let chunk_id = uuid::Uuid::new_v4().to_string();
34
35            tx.execute(
36                "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
37                params![chunk_id, doc_id, chunk_index as i64, content, *token_count as i64, embedding_bytes, q8_bytes.as_deref()],
38            )?;
39
40            // Insert into rowid bridge
41            tx.execute(
42                "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
43                params![chunk_id],
44            )?;
45            let fts_rowid = tx.last_insert_rowid();
46
47            // Insert into FTS
48            tx.execute(
49                "INSERT INTO chunks_fts(rowid, content) VALUES (?1, ?2)",
50                params![fts_rowid, content],
51            )?;
52        }
53
54        Ok(())
55    })
56}
57
58/// Insert a document and all its chunks + FTS entries in a single transaction,
59/// using pre-generated chunk IDs (for HNSW key mapping).
60///
61/// `chunks` is a vec of (content, embedding_bytes, q8_bytes, token_count_estimate).
62/// `chunk_ids` must have the same length as `chunks`.
63#[allow(clippy::too_many_arguments)]
64pub fn insert_document_with_chunks_and_ids(
65    conn: &Connection,
66    doc_id: &str,
67    title: &str,
68    namespace: &str,
69    source_path: Option<&str>,
70    metadata: Option<&serde_json::Value>,
71    chunks: &[ChunkRow],
72    chunk_ids: &[String],
73) -> Result<(), MemoryError> {
74    assert_eq!(chunks.len(), chunk_ids.len(), "chunks and chunk_ids must have same length");
75    let metadata_str = metadata.map(|m| m.to_string());
76    with_transaction(conn, |tx| {
77        // Insert document
78        tx.execute(
79            "INSERT INTO documents (id, title, source_path, namespace, metadata) VALUES (?1, ?2, ?3, ?4, ?5)",
80            params![doc_id, title, source_path, namespace, metadata_str],
81        )?;
82
83        // Insert each chunk with pre-generated ID
84        for (chunk_index, ((content, embedding_bytes, q8_bytes, token_count), chunk_id)) in
85            chunks.iter().zip(chunk_ids.iter()).enumerate()
86        {
87            tx.execute(
88                "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
89                params![chunk_id, doc_id, chunk_index as i64, content, *token_count as i64, embedding_bytes, q8_bytes.as_deref()],
90            )?;
91
92            // Insert into rowid bridge
93            tx.execute(
94                "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
95                params![chunk_id],
96            )?;
97            let fts_rowid = tx.last_insert_rowid();
98
99            // Insert into FTS
100            tx.execute(
101                "INSERT INTO chunks_fts(rowid, content) VALUES (?1, ?2)",
102                params![fts_rowid, content],
103            )?;
104        }
105
106        Ok(())
107    })
108}
109
110/// Delete a document and all its chunks + FTS entries in a transaction.
111pub fn delete_document_with_chunks(
112    conn: &Connection,
113    document_id: &str,
114) -> Result<(), MemoryError> {
115    with_transaction(conn, |tx| {
116        // Get all chunks for this document (collect before mutating)
117        let chunk_data: Vec<(String, String, i64)> = {
118            let mut stmt = tx.prepare(
119                "SELECT c.id, c.content, cm.rowid
120                 FROM chunks c
121                 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
122                 WHERE c.document_id = ?1",
123            )?;
124            let result = stmt
125                .query_map(params![document_id], |row| {
126                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
127                })?
128                .collect::<Result<Vec<_>, _>>()?;
129            result
130        };
131
132        // Delete FTS entries for each chunk
133        for (chunk_id, content, fts_rowid) in &chunk_data {
134            tx.execute(
135                "INSERT INTO chunks_fts(chunks_fts, rowid, content) VALUES('delete', ?1, ?2)",
136                params![fts_rowid, content],
137            )?;
138            tx.execute(
139                "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
140                params![chunk_id],
141            )?;
142        }
143
144        // Delete chunks and document
145        tx.execute(
146            "DELETE FROM chunks WHERE document_id = ?1",
147            params![document_id],
148        )?;
149        let affected = tx.execute("DELETE FROM documents WHERE id = ?1", params![document_id])?;
150
151        if affected == 0 {
152            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
153        }
154
155        Ok(())
156    })
157}
158
159/// List documents in a namespace with chunk counts.
160pub fn list_documents(
161    conn: &Connection,
162    namespace: &str,
163    limit: usize,
164    offset: usize,
165) -> Result<Vec<Document>, MemoryError> {
166    let mut stmt = conn.prepare(
167        "SELECT d.id, d.title, d.source_path, d.namespace, d.created_at, d.metadata,
168                (SELECT COUNT(*) FROM chunks c WHERE c.document_id = d.id) AS chunk_count
169         FROM documents d
170         WHERE d.namespace = ?1
171         ORDER BY d.created_at DESC
172         LIMIT ?2 OFFSET ?3",
173    )?;
174
175    let docs = stmt
176        .query_map(params![namespace, limit as i64, offset as i64], |row| {
177            let metadata_str: Option<String> = row.get(5)?;
178            let chunk_count: i64 = row.get(6)?;
179            Ok(Document {
180                id: row.get(0)?,
181                title: row.get(1)?,
182                source_path: row.get(2)?,
183                namespace: row.get(3)?,
184                created_at: row.get(4)?,
185                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
186                chunk_count: chunk_count as u32,
187            })
188        })?
189        .collect::<Result<Vec<_>, _>>()?;
190
191    Ok(docs)
192}