1use crate::chunker;
4use crate::db;
5#[cfg(feature = "hnsw")]
6use crate::db::IndexOpKind;
7use crate::error::MemoryError;
8use crate::quantize::{self, Quantizer};
9use crate::types::{Document, SearchResult, SearchSource};
10use crate::{merge_trace_ctx, MemoryStore};
11use rusqlite::{params, Connection};
12use stack_ids::ScopeKey;
13use stack_ids::TraceCtx;
14use std::collections::{BTreeMap, BTreeSet};
15
16pub type ChunkRow = (String, Vec<u8>, Option<Vec<u8>>, usize);
18
19pub fn insert_document_with_chunks(
20 conn: &Connection,
21 doc_id: &str,
22 title: &str,
23 namespace: &str,
24 source_path: Option<&str>,
25 metadata: Option<&serde_json::Value>,
26 chunks: &[ChunkRow],
27) -> Result<Vec<String>, MemoryError> {
28 let chunk_ids: Vec<String> = (0..chunks.len())
29 .map(|_| uuid::Uuid::new_v4().to_string())
30 .collect();
31 insert_document_with_chunks_and_ids(
32 conn,
33 doc_id,
34 title,
35 namespace,
36 source_path,
37 metadata,
38 chunks,
39 &chunk_ids,
40 )?;
41 Ok(chunk_ids)
42}
43
44#[allow(clippy::too_many_arguments)]
45pub fn insert_document_with_chunks_and_ids(
46 conn: &Connection,
47 doc_id: &str,
48 title: &str,
49 namespace: &str,
50 source_path: Option<&str>,
51 metadata: Option<&serde_json::Value>,
52 chunks: &[ChunkRow],
53 chunk_ids: &[String],
54) -> Result<(), MemoryError> {
55 if chunks.len() != chunk_ids.len() {
56 return Err(MemoryError::Other(
57 "chunks and chunk_ids must have the same length".to_string(),
58 ));
59 }
60
61 let metadata_str = metadata.map(|value| value.to_string());
62 db::with_transaction(conn, |tx| {
63 tx.execute(
64 "INSERT INTO documents (id, title, source_path, namespace, metadata)
65 VALUES (?1, ?2, ?3, ?4, ?5)",
66 params![doc_id, title, source_path, namespace, metadata_str],
67 )?;
68
69 for (chunk_index, ((content, embedding_bytes, q8_bytes, token_count), chunk_id)) in
70 chunks.iter().zip(chunk_ids.iter()).enumerate()
71 {
72 tx.execute(
73 "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8)
74 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
75 params![
76 chunk_id,
77 doc_id,
78 chunk_index as i64,
79 content,
80 *token_count as i64,
81 embedding_bytes,
82 q8_bytes.as_deref()
83 ],
84 )?;
85
86 tx.execute(
87 "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
88 params![chunk_id],
89 )?;
90 let fts_rowid = tx.last_insert_rowid();
91 tx.execute(
92 "INSERT INTO chunks_fts (rowid, content) VALUES (?1, ?2)",
93 params![fts_rowid, content],
94 )?;
95
96 #[cfg(feature = "hnsw")]
97 db::queue_pending_index_op(
98 tx,
99 &format!("chunk:{}", chunk_id),
100 "chunk",
101 IndexOpKind::Upsert,
102 )?;
103 }
104
105 Ok(())
106 })
107}
108
109pub fn delete_document_with_chunks(
110 conn: &Connection,
111 document_id: &str,
112) -> Result<Vec<String>, MemoryError> {
113 db::with_transaction(conn, |tx| {
114 let mut stmt = tx.prepare(
115 "SELECT c.id, c.content, cm.rowid
116 FROM chunks c
117 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
118 WHERE c.document_id = ?1",
119 )?;
120 let chunk_rows: Vec<(String, String, i64)> = stmt
121 .query_map(params![document_id], |row| {
122 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
123 })?
124 .collect::<Result<Vec<_>, _>>()?;
125
126 let chunk_ids: Vec<String> = chunk_rows.iter().map(|(id, _, _)| id.clone()).collect();
127
128 for (chunk_id, content, fts_rowid) in &chunk_rows {
129 tx.execute(
130 "INSERT INTO chunks_fts (chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
131 params![fts_rowid, content],
132 )?;
133 tx.execute(
134 "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
135 params![chunk_id],
136 )?;
137 #[cfg(feature = "hnsw")]
138 db::queue_pending_index_op(
139 tx,
140 &format!("chunk:{}", chunk_id),
141 "chunk",
142 IndexOpKind::Delete,
143 )?;
144 }
145
146 tx.execute(
147 "DELETE FROM chunks WHERE document_id = ?1",
148 params![document_id],
149 )?;
150 let affected = tx.execute("DELETE FROM documents WHERE id = ?1", params![document_id])?;
151 if affected == 0 {
152 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
153 }
154
155 Ok(chunk_ids)
156 })
157}
158
159pub fn count_chunks_for_document(
160 conn: &Connection,
161 document_id: &str,
162) -> Result<usize, MemoryError> {
163 let count: i64 = conn.query_row(
164 "SELECT COUNT(*) FROM chunks WHERE document_id = ?1",
165 params![document_id],
166 |row| row.get(0),
167 )?;
168 Ok(count as usize)
169}
170
171pub fn list_documents(
172 conn: &Connection,
173 namespace: &str,
174 limit: usize,
175 offset: usize,
176) -> Result<Vec<Document>, MemoryError> {
177 let mut stmt = conn.prepare(
178 "SELECT d.id, d.title, d.source_path, d.namespace, d.created_at, d.metadata,
179 (SELECT COUNT(*) FROM chunks c WHERE c.document_id = d.id) AS chunk_count
180 FROM documents d
181 WHERE d.namespace = ?1
182 ORDER BY d.created_at DESC
183 LIMIT ?2 OFFSET ?3",
184 )?;
185
186 let rows = stmt
187 .query_map(params![namespace, limit as i64, offset as i64], |row| {
188 Ok((
189 row.get::<_, String>(0)?,
190 row.get::<_, String>(1)?,
191 row.get::<_, Option<String>>(2)?,
192 row.get::<_, String>(3)?,
193 row.get::<_, String>(4)?,
194 row.get::<_, Option<String>>(5)?,
195 row.get::<_, i64>(6)? as u32,
196 ))
197 })?
198 .collect::<Result<Vec<_>, _>>()?;
199
200 rows.into_iter()
201 .map(
202 |(id, title, source_path, namespace, created_at, metadata_raw, chunk_count)| {
203 Ok(Document {
204 metadata: db::parse_optional_json(
205 "documents",
206 &id,
207 "metadata",
208 metadata_raw.as_deref(),
209 )?,
210 id,
211 title,
212 source_path,
213 namespace,
214 created_at,
215 chunk_count,
216 })
217 },
218 )
219 .collect()
220}
221
222fn document_scope_keys_for_ids(
223 conn: &Connection,
224 document_ids: &[String],
225) -> Result<BTreeMap<String, ScopeKey>, MemoryError> {
226 if document_ids.is_empty() {
227 return Ok(BTreeMap::new());
228 }
229
230 let placeholders = (0..document_ids.len())
231 .map(|_| "?")
232 .collect::<Vec<_>>()
233 .join(", ");
234 let sql = format!("SELECT id, namespace, metadata FROM documents WHERE id IN ({placeholders})");
235 let params: Vec<&str> = document_ids.iter().map(|id| id.as_str()).collect();
236 let mut stmt = conn.prepare(&sql)?;
237 let rows = stmt
238 .query_map(rusqlite::params_from_iter(¶ms), |row| {
239 Ok((
240 row.get::<_, String>(0)?,
241 row.get::<_, String>(1)?,
242 row.get::<_, Option<String>>(2)?,
243 ))
244 })?
245 .collect::<Result<Vec<_>, _>>()?;
246
247 let mut by_id = BTreeMap::new();
248 for (id, namespace, metadata_raw) in rows {
249 let metadata =
250 db::parse_optional_json("documents", &id, "metadata", metadata_raw.as_deref())?;
251 let scope_key = ScopeKey {
252 namespace,
253 domain: metadata
254 .as_ref()
255 .and_then(|value| value.get("scope_domain"))
256 .and_then(|value| value.as_str())
257 .map(str::to_string),
258 workspace_id: metadata
259 .as_ref()
260 .and_then(|value| value.get("scope_workspace_id"))
261 .and_then(|value| value.as_str())
262 .map(str::to_string),
263 repo_id: metadata
264 .as_ref()
265 .and_then(|value| value.get("scope_repo_id"))
266 .and_then(|value| value.as_str())
267 .map(str::to_string),
268 };
269 by_id.insert(id, scope_key);
270 }
271
272 Ok(by_id)
273}
274
275impl MemoryStore {
276 pub async fn ingest_document(
278 &self,
279 title: &str,
280 content: &str,
281 namespace: &str,
282 source_path: Option<&str>,
283 metadata: Option<serde_json::Value>,
284 ) -> Result<String, MemoryError> {
285 self.ingest_document_with_trace(title, content, namespace, source_path, metadata, None)
286 .await
287 }
288
289 pub async fn ingest_document_with_trace(
291 &self,
292 title: &str,
293 content: &str,
294 namespace: &str,
295 source_path: Option<&str>,
296 metadata: Option<serde_json::Value>,
297 trace_ctx: Option<&TraceCtx>,
298 ) -> Result<String, MemoryError> {
299 self.validate_content("document.content", content)?;
300
301 let text_chunks = chunker::chunk_text(
302 content,
303 &self.inner.config.chunking,
304 self.inner.token_counter.as_ref(),
305 );
306
307 let max_chunks = self.inner.config.limits.max_chunks_per_document;
308 if text_chunks.len() > max_chunks {
309 return Err(MemoryError::ContentTooLarge {
310 size: text_chunks.len(),
311 limit: max_chunks,
312 });
313 }
314
315 let chunk_texts: Vec<String> = text_chunks.iter().map(|c| c.content.clone()).collect();
316 let embeddings = self.embed_batch_internal(chunk_texts).await?;
317 for embedding in &embeddings {
318 self.validate_embedding_dimensions(embedding)?;
319 }
320
321 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
322 let chunks: Vec<ChunkRow> = text_chunks
323 .iter()
324 .zip(embeddings.iter())
325 .map(|(tc, emb)| {
326 let q8 = quantizer
328 .quantize(emb)
329 .map(|qv| quantize::pack_quantized(&qv))
330 .ok();
331 (
332 tc.content.clone(),
333 db::embedding_to_bytes(emb),
334 q8,
335 tc.token_count_estimate,
336 )
337 })
338 .collect();
339
340 let doc_id = uuid::Uuid::new_v4().to_string();
341
342 let did = doc_id.clone();
343 let t = title.to_string();
344 let ns = namespace.to_string();
345 let sp = source_path.map(|s| s.to_string());
346 let meta = merge_trace_ctx(metadata, trace_ctx);
347
348 self.with_write_conn(move |conn| {
349 insert_document_with_chunks(conn, &did, &t, &ns, sp.as_deref(), meta.as_ref(), &chunks)
350 })
351 .await?;
352
353 #[cfg(feature = "hnsw")]
354 self.sync_pending_hnsw_ops_best_effort("ingest_document")
355 .await;
356
357 Ok(doc_id)
358 }
359
360 pub async fn delete_document(&self, document_id: &str) -> Result<(), MemoryError> {
362 let did = document_id.to_string();
363 self.with_write_conn(move |conn| delete_document_with_chunks(conn, &did))
364 .await?;
365
366 #[cfg(feature = "hnsw")]
367 self.sync_pending_hnsw_ops_best_effort("delete_document")
368 .await;
369
370 Ok(())
371 }
372
373 pub async fn list_documents(
375 &self,
376 namespace: &str,
377 limit: usize,
378 offset: usize,
379 ) -> Result<Vec<Document>, MemoryError> {
380 let ns = namespace.to_string();
381 self.with_read_conn(move |conn| list_documents(conn, &ns, limit, offset))
382 .await
383 }
384
385 pub async fn count_chunks_for_document(&self, document_id: &str) -> Result<usize, MemoryError> {
387 let did = document_id.to_string();
388 self.with_read_conn(move |conn| count_chunks_for_document(conn, &did))
389 .await
390 }
391
392 pub async fn filter_search_results_by_scope(
398 &self,
399 results: Vec<SearchResult>,
400 scope: &ScopeKey,
401 ) -> Result<Vec<SearchResult>, MemoryError> {
402 let mut document_ids = BTreeSet::new();
403 for result in &results {
404 match &result.source {
405 SearchSource::Chunk { document_id, .. }
406 | SearchSource::Episode { document_id, .. } => {
407 document_ids.insert(document_id.clone());
408 }
409 _ => {}
410 }
411 }
412
413 let document_ids = document_ids.into_iter().collect::<Vec<_>>();
414 let scope_by_document = self
415 .with_read_conn(move |conn| document_scope_keys_for_ids(conn, &document_ids))
416 .await?;
417 let requested = scope.clone();
418
419 Ok(results
420 .into_iter()
421 .filter(|result| match &result.source {
422 SearchSource::Chunk { document_id, .. }
423 | SearchSource::Episode { document_id, .. } => scope_by_document
424 .get(document_id)
425 .map(|scope_key| scope_key == &requested)
426 .unwrap_or(false),
427 SearchSource::Projection { scope_key, .. } => scope_key == &requested,
428 SearchSource::Fact { .. } | SearchSource::Message { .. } => false,
429 })
430 .collect())
431 }
432}