1use crate::chunker;
4use crate::db;
5#[cfg(feature = "hnsw")]
6use crate::db::IndexOpKind;
7use crate::error::MemoryError;
8use crate::quantize::{self, Quantizer};
9use crate::types::{
10 ChunkManifestChunkMapping, ChunkManifestEntry, ChunkManifestIngestOptions,
11 ChunkManifestIngestResult, Document, SearchResult, SearchSource,
12};
13use crate::{merge_trace_ctx, MemoryStore};
14use rusqlite::{params, Connection};
15use stack_ids::ScopeKey;
16use stack_ids::TraceCtx;
17use std::collections::{BTreeMap, BTreeSet};
18
19pub type ChunkRow = (String, Vec<u8>, Option<Vec<u8>>, usize);
21
22pub fn insert_document_with_chunks(
23 conn: &Connection,
24 doc_id: &str,
25 title: &str,
26 namespace: &str,
27 source_path: Option<&str>,
28 metadata: Option<&serde_json::Value>,
29 chunks: &[ChunkRow],
30) -> Result<Vec<String>, MemoryError> {
31 let chunk_ids: Vec<String> = (0..chunks.len())
32 .map(|_| uuid::Uuid::new_v4().to_string())
33 .collect();
34 insert_document_with_chunks_and_ids(
35 conn,
36 doc_id,
37 title,
38 namespace,
39 source_path,
40 metadata,
41 chunks,
42 &chunk_ids,
43 )?;
44 Ok(chunk_ids)
45}
46
47#[allow(clippy::too_many_arguments)]
48pub fn insert_document_with_chunks_and_ids(
49 conn: &Connection,
50 doc_id: &str,
51 title: &str,
52 namespace: &str,
53 source_path: Option<&str>,
54 metadata: Option<&serde_json::Value>,
55 chunks: &[ChunkRow],
56 chunk_ids: &[String],
57) -> Result<(), MemoryError> {
58 if chunks.len() != chunk_ids.len() {
59 return Err(MemoryError::Other(
60 "chunks and chunk_ids must have the same length".to_string(),
61 ));
62 }
63
64 let metadata_str = metadata.map(|value| value.to_string());
65 db::with_transaction(conn, |tx| {
66 tx.execute(
67 "INSERT INTO documents (id, title, source_path, namespace, metadata)
68 VALUES (?1, ?2, ?3, ?4, ?5)",
69 params![doc_id, title, source_path, namespace, metadata_str],
70 )?;
71
72 for (chunk_index, ((content, embedding_bytes, q8_bytes, token_count), chunk_id)) in
73 chunks.iter().zip(chunk_ids.iter()).enumerate()
74 {
75 tx.execute(
76 "INSERT INTO chunks (id, document_id, chunk_index, content, token_count, embedding, embedding_q8)
77 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
78 params![
79 chunk_id,
80 doc_id,
81 chunk_index as i64,
82 content,
83 *token_count as i64,
84 embedding_bytes,
85 q8_bytes.as_deref()
86 ],
87 )?;
88
89 tx.execute(
90 "INSERT INTO chunks_rowid_map (chunk_id) VALUES (?1)",
91 params![chunk_id],
92 )?;
93 let fts_rowid = tx.last_insert_rowid();
94 tx.execute(
95 "INSERT INTO chunks_fts (rowid, content) VALUES (?1, ?2)",
96 params![fts_rowid, content],
97 )?;
98
99 #[cfg(feature = "hnsw")]
100 db::queue_pending_index_op(
101 tx,
102 &format!("chunk:{}", chunk_id),
103 "chunk",
104 IndexOpKind::Upsert,
105 )?;
106 db::invalidate_derived_vector_artifact(tx, &format!("chunk:{chunk_id}"))?;
107 }
108
109 Ok(())
110 })
111}
112
113pub fn delete_document_with_chunks(
114 conn: &Connection,
115 document_id: &str,
116) -> Result<Vec<String>, MemoryError> {
117 db::with_transaction(conn, |tx| {
118 let episode_rows: Vec<(String, String, i64)> = {
119 let mut stmt = tx.prepare(
120 "SELECT e.episode_id, e.search_text, erm.rowid
121 FROM episodes e
122 JOIN episodes_rowid_map erm ON erm.episode_id = e.episode_id
123 WHERE e.document_id = ?1",
124 )?;
125 let rows = stmt.query_map(params![document_id], |row| {
126 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
127 })?;
128 rows.collect::<Result<Vec<_>, _>>()?
129 };
130
131 for (episode_id, search_text, fts_rowid) in &episode_rows {
132 tx.execute(
133 "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
134 params![fts_rowid, search_text],
135 )?;
136 tx.execute(
137 "DELETE FROM episodes_rowid_map WHERE episode_id = ?1",
138 params![episode_id],
139 )?;
140 tx.execute(
141 "DELETE FROM episode_causes WHERE episode_id = ?1",
142 params![episode_id],
143 )?;
144 #[cfg(feature = "hnsw")]
145 db::queue_pending_index_op(
146 tx,
147 &crate::episodes::episode_item_key(episode_id),
148 "episode",
149 IndexOpKind::Delete,
150 )?;
151 db::invalidate_derived_vector_artifact(
152 tx,
153 &crate::episodes::episode_item_key(episode_id),
154 )?;
155 }
156 tx.execute(
157 "DELETE FROM episodes WHERE document_id = ?1",
158 params![document_id],
159 )?;
160
161 let mut stmt = tx.prepare(
162 "SELECT c.id, c.content, cm.rowid
163 FROM chunks c
164 JOIN chunks_rowid_map cm ON cm.chunk_id = c.id
165 WHERE c.document_id = ?1",
166 )?;
167 let chunk_rows: Vec<(String, String, i64)> = stmt
168 .query_map(params![document_id], |row| {
169 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
170 })?
171 .collect::<Result<Vec<_>, _>>()?;
172
173 let chunk_ids: Vec<String> = chunk_rows.iter().map(|(id, _, _)| id.clone()).collect();
174
175 for (chunk_id, content, fts_rowid) in &chunk_rows {
176 tx.execute(
177 "INSERT INTO chunks_fts (chunks_fts, rowid, content) VALUES ('delete', ?1, ?2)",
178 params![fts_rowid, content],
179 )?;
180 tx.execute(
181 "DELETE FROM chunks_rowid_map WHERE chunk_id = ?1",
182 params![chunk_id],
183 )?;
184 #[cfg(feature = "hnsw")]
185 db::queue_pending_index_op(
186 tx,
187 &format!("chunk:{}", chunk_id),
188 "chunk",
189 IndexOpKind::Delete,
190 )?;
191 db::invalidate_derived_vector_artifact(tx, &format!("chunk:{chunk_id}"))?;
192 }
193
194 tx.execute(
195 "DELETE FROM chunks WHERE document_id = ?1",
196 params![document_id],
197 )?;
198 let affected = tx.execute("DELETE FROM documents WHERE id = ?1", params![document_id])?;
199 if affected == 0 {
200 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
201 }
202
203 Ok(chunk_ids)
204 })
205}
206
207pub fn count_chunks_for_document(
208 conn: &Connection,
209 document_id: &str,
210) -> Result<usize, MemoryError> {
211 let count: i64 = conn.query_row(
212 "SELECT COUNT(*) FROM chunks WHERE document_id = ?1",
213 params![document_id],
214 |row| row.get(0),
215 )?;
216 Ok(count as usize)
217}
218
219pub fn list_documents(
220 conn: &Connection,
221 namespace: &str,
222 limit: usize,
223 offset: usize,
224) -> Result<Vec<Document>, MemoryError> {
225 let mut stmt = conn.prepare(
226 "SELECT d.id, d.title, d.source_path, d.namespace, d.created_at, d.metadata,
227 (SELECT COUNT(*) FROM chunks c WHERE c.document_id = d.id) AS chunk_count
228 FROM documents d
229 WHERE d.namespace = ?1
230 ORDER BY d.created_at DESC
231 LIMIT ?2 OFFSET ?3",
232 )?;
233
234 let rows = stmt
235 .query_map(params![namespace, limit as i64, offset as i64], |row| {
236 Ok((
237 row.get::<_, String>(0)?,
238 row.get::<_, String>(1)?,
239 row.get::<_, Option<String>>(2)?,
240 row.get::<_, String>(3)?,
241 row.get::<_, String>(4)?,
242 row.get::<_, Option<String>>(5)?,
243 row.get::<_, i64>(6)? as u32,
244 ))
245 })?
246 .collect::<Result<Vec<_>, _>>()?;
247
248 rows.into_iter()
249 .map(
250 |(id, title, source_path, namespace, created_at, metadata_raw, chunk_count)| {
251 Ok(Document {
252 metadata: db::parse_optional_json(
253 "documents",
254 &id,
255 "metadata",
256 metadata_raw.as_deref(),
257 )?,
258 id,
259 title,
260 source_path,
261 namespace,
262 created_at,
263 chunk_count,
264 })
265 },
266 )
267 .collect()
268}
269
270fn document_scope_keys_for_ids(
271 conn: &Connection,
272 document_ids: &[String],
273) -> Result<BTreeMap<String, ScopeKey>, MemoryError> {
274 if document_ids.is_empty() {
275 return Ok(BTreeMap::new());
276 }
277
278 let placeholders = (0..document_ids.len())
279 .map(|_| "?")
280 .collect::<Vec<_>>()
281 .join(", ");
282 let sql = format!("SELECT id, namespace, metadata FROM documents WHERE id IN ({placeholders})");
283 let params: Vec<&str> = document_ids.iter().map(|id| id.as_str()).collect();
284 let mut stmt = conn.prepare(&sql)?;
285 let rows = stmt
286 .query_map(rusqlite::params_from_iter(¶ms), |row| {
287 Ok((
288 row.get::<_, String>(0)?,
289 row.get::<_, String>(1)?,
290 row.get::<_, Option<String>>(2)?,
291 ))
292 })?
293 .collect::<Result<Vec<_>, _>>()?;
294
295 let mut by_id = BTreeMap::new();
296 for (id, namespace, metadata_raw) in rows {
297 let metadata =
298 db::parse_optional_json("documents", &id, "metadata", metadata_raw.as_deref())?;
299 let scope_key = ScopeKey {
300 namespace,
301 domain: metadata
302 .as_ref()
303 .and_then(|value| value.get("scope_domain"))
304 .and_then(|value| value.as_str())
305 .map(str::to_string),
306 workspace_id: metadata
307 .as_ref()
308 .and_then(|value| value.get("scope_workspace_id"))
309 .and_then(|value| value.as_str())
310 .map(str::to_string),
311 repo_id: metadata
312 .as_ref()
313 .and_then(|value| value.get("scope_repo_id"))
314 .and_then(|value| value.as_str())
315 .map(str::to_string),
316 };
317 by_id.insert(id, scope_key);
318 }
319
320 Ok(by_id)
321}
322
323impl MemoryStore {
324 pub async fn ingest_document(
326 &self,
327 title: &str,
328 content: &str,
329 namespace: &str,
330 source_path: Option<&str>,
331 metadata: Option<serde_json::Value>,
332 ) -> Result<String, MemoryError> {
333 self.ingest_document_with_trace(title, content, namespace, source_path, metadata, None)
334 .await
335 }
336
337 pub async fn ingest_document_with_trace(
339 &self,
340 title: &str,
341 content: &str,
342 namespace: &str,
343 source_path: Option<&str>,
344 metadata: Option<serde_json::Value>,
345 trace_ctx: Option<&TraceCtx>,
346 ) -> Result<String, MemoryError> {
347 self.validate_content("document.content", content)?;
348
349 let text_chunks = chunker::chunk_text(
350 content,
351 &self.inner.config.chunking,
352 self.inner.token_counter.as_ref(),
353 );
354
355 let max_chunks = self.inner.config.limits.max_chunks_per_document;
356 if text_chunks.len() > max_chunks {
357 return Err(MemoryError::ContentTooLarge {
358 size: text_chunks.len(),
359 limit: max_chunks,
360 });
361 }
362
363 let chunk_texts: Vec<String> = text_chunks.iter().map(|c| c.content.clone()).collect();
364 let embeddings = self.embed_batch_internal(chunk_texts).await?;
365 for embedding in &embeddings {
366 self.validate_embedding_dimensions(embedding)?;
367 }
368
369 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
370 let chunks: Vec<ChunkRow> = text_chunks
371 .iter()
372 .zip(embeddings.iter())
373 .map(|(tc, emb)| {
374 let q8 = quantizer
376 .quantize(emb)
377 .map(|qv| quantize::pack_quantized(&qv))
378 .ok();
379 (
380 tc.content.clone(),
381 db::embedding_to_bytes(emb),
382 q8,
383 tc.token_count_estimate,
384 )
385 })
386 .collect();
387
388 let doc_id = uuid::Uuid::new_v4().to_string();
389
390 let did = doc_id.clone();
391 let t = title.to_string();
392 let ns = namespace.to_string();
393 let sp = source_path.map(|s| s.to_string());
394 let meta = merge_trace_ctx(metadata, trace_ctx);
395
396 self.with_write_conn(move |conn| {
397 insert_document_with_chunks(conn, &did, &t, &ns, sp.as_deref(), meta.as_ref(), &chunks)
398 })
399 .await?;
400
401 #[cfg(feature = "hnsw")]
402 self.sync_pending_hnsw_ops_best_effort("ingest_document")
403 .await;
404
405 Ok(doc_id)
406 }
407
408 pub async fn ingest_chunk_manifest(
414 &self,
415 options: ChunkManifestIngestOptions,
416 entries: Vec<ChunkManifestEntry>,
417 ) -> Result<ChunkManifestIngestResult, MemoryError> {
418 self.ingest_chunk_manifest_with_trace(options, entries, None)
419 .await
420 }
421
422 pub async fn ingest_chunk_manifest_with_trace(
424 &self,
425 options: ChunkManifestIngestOptions,
426 entries: Vec<ChunkManifestEntry>,
427 trace_ctx: Option<&TraceCtx>,
428 ) -> Result<ChunkManifestIngestResult, MemoryError> {
429 if entries.is_empty() {
430 return Err(MemoryError::InvalidConfig {
431 field: "chunk_manifest.entries",
432 reason: "at least one chunk is required".to_string(),
433 });
434 }
435
436 let max_chunks = self.inner.config.limits.max_chunks_per_document;
437 if entries.len() > max_chunks {
438 return Err(MemoryError::ContentTooLarge {
439 size: entries.len(),
440 limit: max_chunks,
441 });
442 }
443
444 let mut seen_external_ids = BTreeSet::new();
445 for (index, entry) in entries.iter().enumerate() {
446 let external_chunk_id = entry.external_chunk_id.trim();
447 if external_chunk_id.is_empty() {
448 return Err(MemoryError::InvalidConfig {
449 field: "chunk_manifest.external_chunk_id",
450 reason: format!("chunk {index} external_chunk_id must not be empty"),
451 });
452 }
453 if !seen_external_ids.insert(external_chunk_id.to_string()) {
454 return Err(MemoryError::InvalidConfig {
455 field: "chunk_manifest.external_chunk_id",
456 reason: format!("duplicate external_chunk_id '{external_chunk_id}'"),
457 });
458 }
459 if entry.content.trim().is_empty() {
460 return Err(MemoryError::InvalidConfig {
461 field: "chunk_manifest.content",
462 reason: format!(
463 "content must not be empty (chunk index {index}, id='{external_chunk_id}')"
464 ),
465 });
466 }
467 self.validate_content("chunk_manifest.content", &entry.content)?;
468 if entry
469 .content_digest
470 .as_deref()
471 .is_some_and(|digest| digest.trim().is_empty())
472 {
473 return Err(MemoryError::InvalidConfig {
474 field: "chunk_manifest.content_digest",
475 reason: format!("chunk {index} content_digest must not be empty when supplied"),
476 });
477 }
478 }
479
480 let chunk_texts: Vec<String> = entries.iter().map(|entry| entry.content.clone()).collect();
481 let embeddings = self.embed_batch_internal(chunk_texts).await?;
482 for embedding in &embeddings {
483 self.validate_embedding_dimensions(embedding)?;
484 }
485
486 let quantizer = Quantizer::new(self.inner.config.embedding.dimensions);
487 let chunks: Vec<ChunkRow> = entries
488 .iter()
489 .zip(embeddings.iter())
490 .map(|(entry, emb)| {
491 let q8 = quantizer
492 .quantize(emb)
493 .map(|qv| quantize::pack_quantized(&qv))
494 .ok();
495 (
496 entry.content.clone(),
497 db::embedding_to_bytes(emb),
498 q8,
499 entry
500 .token_count_estimate
501 .unwrap_or_else(|| entry.content.len().div_ceil(4).max(1)),
502 )
503 })
504 .collect();
505
506 let doc_id = uuid::Uuid::new_v4().to_string();
507 let chunk_ids: Vec<String> = (0..entries.len())
508 .map(|_| uuid::Uuid::new_v4().to_string())
509 .collect();
510 let receipt_id = format!("chunk-manifest:{}", uuid::Uuid::new_v4());
511
512 let mappings: Vec<ChunkManifestChunkMapping> = entries
513 .iter()
514 .zip(chunk_ids.iter())
515 .enumerate()
516 .map(
517 |(chunk_index, (entry, sm_chunk_id))| ChunkManifestChunkMapping {
518 external_chunk_id: entry.external_chunk_id.clone(),
519 sm_document_id: doc_id.clone(),
520 sm_chunk_id: sm_chunk_id.clone(),
521 chunk_index,
522 content_digest: entry.content_digest.clone(),
523 metadata: entry.metadata.clone(),
524 },
525 )
526 .collect();
527
528 let did = doc_id.clone();
529 let title = options.title;
530 let namespace = options.namespace;
531 let source_path = options.source_path;
532 let metadata = merge_trace_ctx(options.metadata, trace_ctx);
533 let namespace_for_result = namespace.clone();
534
535 self.with_write_conn(move |conn| {
536 insert_document_with_chunks_and_ids(
537 conn,
538 &did,
539 &title,
540 &namespace,
541 source_path.as_deref(),
542 metadata.as_ref(),
543 &chunks,
544 &chunk_ids,
545 )
546 })
547 .await?;
548
549 #[cfg(feature = "hnsw")]
550 self.sync_pending_hnsw_ops_best_effort("ingest_chunk_manifest")
551 .await;
552
553 Ok(ChunkManifestIngestResult {
554 sm_document_id: doc_id,
555 namespace: namespace_for_result,
556 receipt_id,
557 chunks: mappings,
558 })
559 }
560
561 pub async fn delete_document(&self, document_id: &str) -> Result<(), MemoryError> {
563 let did = document_id.to_string();
564 self.with_write_conn(move |conn| delete_document_with_chunks(conn, &did))
565 .await?;
566
567 #[cfg(feature = "hnsw")]
568 self.sync_pending_hnsw_ops_best_effort("delete_document")
569 .await;
570
571 Ok(())
572 }
573
574 pub async fn list_documents(
576 &self,
577 namespace: &str,
578 limit: usize,
579 offset: usize,
580 ) -> Result<Vec<Document>, MemoryError> {
581 let ns = namespace.to_string();
582 self.with_read_conn(move |conn| list_documents(conn, &ns, limit, offset))
583 .await
584 }
585
586 pub async fn count_chunks_for_document(&self, document_id: &str) -> Result<usize, MemoryError> {
588 let did = document_id.to_string();
589 self.with_read_conn(move |conn| count_chunks_for_document(conn, &did))
590 .await
591 }
592
593 pub async fn filter_search_results_by_scope(
599 &self,
600 results: Vec<SearchResult>,
601 scope: &ScopeKey,
602 ) -> Result<Vec<SearchResult>, MemoryError> {
603 let mut document_ids = BTreeSet::new();
604 for result in &results {
605 match &result.source {
606 SearchSource::Chunk { document_id, .. }
607 | SearchSource::Episode { document_id, .. } => {
608 document_ids.insert(document_id.clone());
609 }
610 _ => {}
611 }
612 }
613
614 let document_ids = document_ids.into_iter().collect::<Vec<_>>();
615 let scope_by_document = self
616 .with_read_conn(move |conn| document_scope_keys_for_ids(conn, &document_ids))
617 .await?;
618 let requested = scope.clone();
619
620 Ok(results
621 .into_iter()
622 .filter(|result| match &result.source {
623 SearchSource::Chunk { document_id, .. }
624 | SearchSource::Episode { document_id, .. } => scope_by_document
625 .get(document_id)
626 .map(|scope_key| scope_key == &requested)
627 .unwrap_or(false),
628 SearchSource::Projection { scope_key, .. } => scope_key == &requested,
629 SearchSource::Fact { .. } | SearchSource::Message { .. } => false,
630 })
631 .collect())
632 }
633}