1use anyhow::{Result, anyhow};
2pub use memex_contracts::audit::{AuditRecommendation, AuditResult, ChunkQuality, QualityTier};
3pub use memex_contracts::progress::{
4 AuditProgress, MergeProgress, ReindexProgress, RepairResult, ReprocessProgress,
5};
6use pdf_extract;
7use serde::{Deserialize, Serialize};
8use serde_json::{Value, json};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, HashSet, hash_map::DefaultHasher};
11use std::hash::{Hash, Hasher};
12use std::path::Path;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15use tracing::debug;
16
17use crate::{
18 embeddings::MLXBridge,
19 preprocessing::{PreprocessingConfig, Preprocessor},
20 search::BM25Index,
21 storage::{ChromaDocument, CrossStoreRecoveryBatch, CrossStoreRecoveryStatus, StorageManager},
22};
23
24pub mod pipeline;
26pub mod provider;
27pub mod structured;
28pub use pipeline::{
29 Chunk, EmbeddedChunk, FileContent, PipelineConfig, PipelineEvent, PipelineGovernorConfig,
30 PipelineResult, PipelineSnapshot, PipelineStats, run_pipeline,
31};
32pub use provider::{
33 AicxChunkProvider, ChunkOpts, ChunkProvider, ChunkerKind, FlatChunkProvider,
34 OnionChunkProvider, detect_default_chunker,
35};
36
37const DEFAULT_NAMESPACE: &str = "rag";
38
39const STORAGE_BATCH_SIZE: usize = 100;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59#[repr(u8)]
60pub enum SliceLayer {
61 Outer = 1,
63 Middle = 2,
65 Inner = 3,
67 Core = 4,
69}
70
71impl SliceLayer {
72 pub fn target_chars(&self) -> usize {
74 match self {
75 SliceLayer::Outer => 100,
76 SliceLayer::Middle => 300,
77 SliceLayer::Inner => 600,
78 SliceLayer::Core => usize::MAX,
79 }
80 }
81
82 pub fn as_u8(&self) -> u8 {
84 *self as u8
85 }
86
87 pub fn from_u8(v: u8) -> Option<Self> {
89 match v {
90 1 => Some(SliceLayer::Outer),
91 2 => Some(SliceLayer::Middle),
92 3 => Some(SliceLayer::Inner),
93 4 => Some(SliceLayer::Core),
94 _ => None,
95 }
96 }
97
98 pub fn name(&self) -> &'static str {
100 match self {
101 SliceLayer::Outer => "outer",
102 SliceLayer::Middle => "middle",
103 SliceLayer::Inner => "inner",
104 SliceLayer::Core => "core",
105 }
106 }
107}
108
109impl std::fmt::Display for SliceLayer {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 write!(f, "{}", self.name())
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct OnionSlice {
118 pub id: String,
120 pub layer: SliceLayer,
122 pub content: String,
124 pub parent_id: Option<String>,
126 pub children_ids: Vec<String>,
128 pub keywords: Vec<String>,
130}
131
132impl OnionSlice {
133 pub fn generate_id(content: &str, layer: SliceLayer) -> String {
135 let mut hasher = DefaultHasher::new();
136 content.hash(&mut hasher);
137 layer.as_u8().hash(&mut hasher);
138 format!("slice_{:016x}_{}", hasher.finish(), layer.name())
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
144pub enum SliceMode {
145 #[default]
147 Onion,
148 OnionFast,
150 Flat,
152}
153
154impl std::str::FromStr for SliceMode {
155 type Err = String;
156
157 fn from_str(s: &str) -> Result<Self, Self::Err> {
158 match s.to_lowercase().as_str() {
159 "onion" => Ok(SliceMode::Onion),
160 "onion-fast" | "fast" => Ok(SliceMode::OnionFast),
161 "flat" => Ok(SliceMode::Flat),
162 other => Err(format!(
163 "Invalid slice mode: '{}'. Use 'onion', 'onion-fast', or 'flat'",
164 other
165 )),
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
172pub enum IndexResult {
173 Indexed {
175 chunks_indexed: usize,
177 content_hash: String,
179 embedder_ms: Option<u64>,
181 tokens_estimated: Option<usize>,
183 },
184 Skipped {
186 reason: String,
188 content_hash: String,
190 },
191}
192
193impl IndexResult {
194 pub fn was_indexed(&self) -> bool {
196 matches!(self, IndexResult::Indexed { .. })
197 }
198
199 #[deprecated(note = "use was_indexed")]
200 pub fn is_indexed(&self) -> bool {
201 self.was_indexed()
202 }
203
204 pub fn is_skipped(&self) -> bool {
206 matches!(self, IndexResult::Skipped { .. })
207 }
208
209 pub fn content_hash(&self) -> &str {
211 match self {
212 IndexResult::Indexed { content_hash, .. } => content_hash,
213 IndexResult::Skipped { content_hash, .. } => content_hash,
214 }
215 }
216}
217
218#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
219#[serde(rename_all = "snake_case")]
220pub enum CrossStoreRecoveryState {
221 Clean,
222 Divergent,
223 RolledBack,
224 Stale,
225}
226
227#[derive(Debug, Clone, Serialize)]
228pub struct CrossStoreRecoveryBatchReport {
229 pub batch_id: String,
230 pub namespace: String,
231 pub created_at: String,
232 pub state: CrossStoreRecoveryState,
233 pub status: CrossStoreRecoveryStatus,
234 pub document_count: usize,
235 pub lance_documents: usize,
236 pub bm25_documents: usize,
237 pub missing_bm25_ids: Vec<String>,
238 pub missing_lance_ids: Vec<String>,
239 pub last_error: Option<String>,
240}
241
242#[derive(Debug, Clone, Serialize, Default)]
243pub struct CrossStoreRecoveryReport {
244 pub recovery_dir: String,
245 pub pending_batches: usize,
246 pub clean_batches: usize,
247 pub divergent_batches: usize,
248 pub rolled_back_batches: usize,
249 pub stale_batches: usize,
250 pub documents_examined: usize,
251 pub documents_missing_bm25: usize,
252 pub documents_missing_lance: usize,
253 pub repaired_documents: usize,
254 pub skipped_documents: usize,
255 pub cleared_batches: usize,
256 pub batches_repaired: usize,
257 pub batches: Vec<CrossStoreRecoveryBatchReport>,
258}
259
260impl CrossStoreRecoveryReport {
261 pub fn is_clean(&self) -> bool {
262 self.pending_batches == 0
263 || (self.divergent_batches == 0
264 && self.rolled_back_batches == 0
265 && self.stale_batches == 0)
266 }
267}
268
269async fn run_cross_store_recovery(
270 storage: &StorageManager,
271 bm25: &BM25Index,
272 namespace: Option<&str>,
273 execute: bool,
274) -> Result<CrossStoreRecoveryReport> {
275 let recovery_dir = storage.cross_store_recovery_dir();
276 let mut report = CrossStoreRecoveryReport {
277 recovery_dir: recovery_dir.display().to_string(),
278 ..Default::default()
279 };
280
281 let batches = storage
282 .list_cross_store_recovery_batches()?
283 .into_iter()
284 .filter(|batch| {
285 namespace.is_none_or(|expected| {
286 batch
287 .documents
288 .iter()
289 .any(|document| document.namespace == expected)
290 })
291 })
292 .collect::<Vec<_>>();
293 report.pending_batches = batches.len();
294
295 if batches.is_empty() {
296 return Ok(report);
297 }
298
299 let mut lance_cache: HashMap<String, HashMap<String, ChromaDocument>> = HashMap::new();
300 let mut bm25_cache: HashMap<String, HashSet<String>> = HashMap::new();
301
302 for batch in batches {
303 let namespace_name = batch
304 .documents
305 .first()
306 .map(|document| document.namespace.clone())
307 .unwrap_or_else(|| "unknown".to_string());
308
309 let lance_documents = if let Some(documents) = lance_cache.get(&namespace_name) {
310 documents
311 } else {
312 let documents = storage
313 .get_all_in_namespace(&namespace_name)
314 .await?
315 .into_iter()
316 .map(|document| (document.id.clone(), document))
317 .collect::<HashMap<_, _>>();
318 lance_cache.insert(namespace_name.clone(), documents);
319 lance_cache
320 .get(&namespace_name)
321 .expect("just inserted lance cache")
322 };
323
324 let bm25_documents = if let Some(ids) = bm25_cache.get(&namespace_name) {
325 ids
326 } else {
327 let ids = bm25
328 .document_keys(Some(&namespace_name))?
329 .into_iter()
330 .filter_map(|(doc_namespace, id)| (doc_namespace == namespace_name).then_some(id))
331 .collect::<HashSet<_>>();
332 bm25_cache.insert(namespace_name.clone(), ids);
333 bm25_cache
334 .get(&namespace_name)
335 .expect("just inserted bm25 cache")
336 };
337
338 let mut missing_bm25_ids = Vec::new();
339 let mut missing_lance_ids = Vec::new();
340 let mut repair_documents = Vec::new();
341 let mut lance_present = 0usize;
342 let mut bm25_present = 0usize;
343
344 for document_ref in &batch.documents {
345 report.documents_examined += 1;
346
347 if let Some(document) = lance_documents.get(&document_ref.id) {
348 lance_present += 1;
349 if bm25_documents.contains(&document_ref.id) {
350 bm25_present += 1;
351 } else {
352 missing_bm25_ids.push(document_ref.id.clone());
353 repair_documents.push((
354 document.id.clone(),
355 document.namespace.clone(),
356 document.document.clone(),
357 ));
358 }
359 } else {
360 missing_lance_ids.push(document_ref.id.clone());
361 }
362 }
363
364 report.documents_missing_bm25 += missing_bm25_ids.len();
365 report.documents_missing_lance += missing_lance_ids.len();
366
367 let state = if !missing_bm25_ids.is_empty() {
368 report.divergent_batches += 1;
369 CrossStoreRecoveryState::Divergent
370 } else if batch.status == CrossStoreRecoveryStatus::RolledBack {
371 report.rolled_back_batches += 1;
372 CrossStoreRecoveryState::RolledBack
373 } else if !missing_lance_ids.is_empty() || lance_present == 0 {
374 report.stale_batches += 1;
375 CrossStoreRecoveryState::Stale
376 } else {
377 report.clean_batches += 1;
378 CrossStoreRecoveryState::Clean
379 };
380
381 if execute {
382 match state {
383 CrossStoreRecoveryState::Divergent => {
384 bm25.add_documents(&repair_documents).await?;
385 if let Some(ids) = bm25_cache.get_mut(&namespace_name) {
386 for (id, _, _) in &repair_documents {
387 ids.insert(id.clone());
388 }
389 }
390 report.repaired_documents += repair_documents.len();
391 report.skipped_documents += missing_lance_ids.len();
392 report.batches_repaired += 1;
393 storage.clear_cross_store_recovery_batch(&batch.batch_id)?;
394 report.cleared_batches += 1;
395 }
396 CrossStoreRecoveryState::RolledBack
397 | CrossStoreRecoveryState::Stale
398 | CrossStoreRecoveryState::Clean => {
399 report.skipped_documents += missing_lance_ids.len();
400 storage.clear_cross_store_recovery_batch(&batch.batch_id)?;
401 report.cleared_batches += 1;
402 }
403 }
404 }
405
406 report.batches.push(CrossStoreRecoveryBatchReport {
407 batch_id: batch.batch_id,
408 namespace: namespace_name,
409 created_at: batch.created_at,
410 state,
411 status: batch.status,
412 document_count: batch.documents.len(),
413 lance_documents: lance_present,
414 bm25_documents: bm25_present,
415 missing_bm25_ids,
416 missing_lance_ids,
417 last_error: batch.last_error,
418 });
419 }
420
421 Ok(report)
422}
423
424pub async fn inspect_cross_store_recovery(
425 storage: &StorageManager,
426 bm25: &BM25Index,
427 namespace: Option<&str>,
428) -> Result<CrossStoreRecoveryReport> {
429 run_cross_store_recovery(storage, bm25, namespace, false).await
430}
431
432pub async fn repair_cross_store_recovery(
433 storage: &StorageManager,
434 bm25: &BM25Index,
435 namespace: Option<&str>,
436) -> Result<CrossStoreRecoveryReport> {
437 run_cross_store_recovery(storage, bm25, namespace, true).await
438}
439
440pub fn compute_content_hash(content: &str) -> String {
442 let mut hasher = Sha256::new();
443 hasher.update(content.as_bytes());
444 let result = hasher.finalize();
445 result.iter().map(|b| format!("{:02x}", b)).collect()
447}
448
449#[derive(Debug, Clone, Default)]
457pub enum OuterSynthesis {
458 #[default]
460 Keyword,
461 Llm { model: String, endpoint: String },
464}
465
466#[derive(Debug, Clone)]
468pub struct OnionSliceConfig {
469 pub outer_target: usize,
471 pub middle_target: usize,
473 pub inner_target: usize,
475 pub min_content_for_slicing: usize,
477 pub outer_synthesis: OuterSynthesis,
480}
481
482impl Default for OnionSliceConfig {
483 fn default() -> Self {
484 Self {
485 outer_target: 100,
486 middle_target: 300,
487 inner_target: 600,
488 min_content_for_slicing: 200,
489 outer_synthesis: OuterSynthesis::default(),
490 }
491 }
492}
493
494fn create_core_only_slice(content: &str) -> Vec<OnionSlice> {
495 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
496 let core_keywords = extract_keywords(content, 5);
497
498 let outer_id = OnionSlice::generate_id(content, SliceLayer::Outer);
501 let outer_keywords = extract_keywords(content, 3);
502
503 vec![
504 OnionSlice {
505 id: outer_id.clone(),
506 layer: SliceLayer::Outer,
507 content: content.to_string(),
508 parent_id: Some(core_id.clone()),
509 children_ids: vec![],
510 keywords: outer_keywords,
511 },
512 OnionSlice {
513 id: core_id,
514 layer: SliceLayer::Core,
515 content: content.to_string(),
516 parent_id: None,
517 children_ids: vec![outer_id],
518 keywords: core_keywords,
519 },
520 ]
521}
522
523pub fn create_onion_slices(
531 content: &str,
532 metadata: &serde_json::Value,
533 config: &OnionSliceConfig,
534) -> Vec<OnionSlice> {
535 if structured::is_structured_conversation(metadata) {
536 return structured::create_structured_onion_slices(content, metadata, config);
537 }
538
539 let content = content.trim();
540
541 if content.len() < config.min_content_for_slicing {
544 return create_core_only_slice(content);
545 }
546
547 let mut slices = Vec::with_capacity(4);
548
549 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
551 let core_keywords = extract_keywords(content, 10);
552
553 let inner_content = extract_key_content(content, config.inner_target);
555 let inner_id = OnionSlice::generate_id(&inner_content, SliceLayer::Inner);
556 let inner_keywords = extract_keywords(&inner_content, 7);
557
558 let middle_content = extract_key_content(&inner_content, config.middle_target);
560 let middle_id = OnionSlice::generate_id(&middle_content, SliceLayer::Middle);
561 let middle_keywords = extract_keywords(&middle_content, 5);
562
563 let outer_content = create_outer_summary(&middle_content, &core_keywords, config.outer_target);
565 let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
566 let outer_keywords = extract_keywords(&outer_content, 3);
567
568 slices.push(OnionSlice {
570 id: outer_id.clone(),
571 layer: SliceLayer::Outer,
572 content: outer_content,
573 parent_id: Some(middle_id.clone()),
574 children_ids: vec![],
575 keywords: outer_keywords,
576 });
577
578 slices.push(OnionSlice {
579 id: middle_id.clone(),
580 layer: SliceLayer::Middle,
581 content: middle_content,
582 parent_id: Some(inner_id.clone()),
583 children_ids: vec![outer_id],
584 keywords: middle_keywords,
585 });
586
587 slices.push(OnionSlice {
588 id: inner_id.clone(),
589 layer: SliceLayer::Inner,
590 content: inner_content,
591 parent_id: Some(core_id.clone()),
592 children_ids: vec![middle_id],
593 keywords: inner_keywords,
594 });
595
596 slices.push(OnionSlice {
597 id: core_id.clone(),
598 layer: SliceLayer::Core,
599 content: content.to_string(),
600 parent_id: None,
601 children_ids: vec![inner_id],
602 keywords: core_keywords,
603 });
604
605 slices
606}
607
608const OLLAMA_OUTER_INPUT_CHAR_BUDGET: usize = 8_000;
612
613const OLLAMA_OUTER_TIMEOUT_SECS: u64 = 60;
617
618const OLLAMA_OUTER_CONNECT_TIMEOUT_SECS: u64 = 5;
622
623pub async fn synthesize_outer_via_ollama(
638 transcript_text: &str,
639 model: &str,
640 endpoint: &str,
641) -> Option<String> {
642 let trimmed = transcript_text.trim();
643 if trimmed.is_empty() {
644 return None;
645 }
646
647 let mut prompt_input: String = trimmed
648 .chars()
649 .take(OLLAMA_OUTER_INPUT_CHAR_BUDGET)
650 .collect();
651 if prompt_input.chars().count() < trimmed.chars().count() {
652 prompt_input.push_str("\n\n[…transcript truncated for outer summary…]");
653 }
654
655 let prompt = format!(
656 "You are a precise transcript summarizer. Output 1-3 sentences in Polish.\n\
657 \n\
658 Summarize this conversation transcript. Focus on:\n\
659 1. What was the user's goal/question.\n\
660 2. What was decided/built/fixed.\n\
661 3. What was the outcome (success, blocker, follow-up).\n\
662 \n\
663 Skip UI/CLI noise (Brewing…, Frosting…, Grooving…, tokens·, shifttab, ⎿, ⎯).\n\
664 Be specific: name projects, technologies, files mentioned.\n\
665 \n\
666 Transcript:\n{prompt_input}"
667 );
668
669 let url = format!("{}/api/generate", endpoint.trim_end_matches('/'));
670 let body = serde_json::json!({
671 "model": model,
672 "prompt": prompt,
673 "stream": false,
674 });
675
676 let client = match reqwest::Client::builder()
677 .connect_timeout(std::time::Duration::from_secs(
678 OLLAMA_OUTER_CONNECT_TIMEOUT_SECS,
679 ))
680 .timeout(std::time::Duration::from_secs(OLLAMA_OUTER_TIMEOUT_SECS))
681 .build()
682 {
683 Ok(client) => client,
684 Err(err) => {
685 tracing::warn!("Ollama outer synthesis: client build failed: {err}");
686 return None;
687 }
688 };
689
690 let response = match client.post(&url).json(&body).send().await {
691 Ok(response) => response,
692 Err(err) => {
693 tracing::warn!("Ollama outer synthesis: POST {url} failed: {err}");
694 return None;
695 }
696 };
697
698 let status = response.status();
699 if !status.is_success() {
700 tracing::warn!("Ollama outer synthesis: POST {url} returned status {status}");
701 return None;
702 }
703
704 let parsed: serde_json::Value = match response.json().await {
705 Ok(value) => value,
706 Err(err) => {
707 tracing::warn!("Ollama outer synthesis: response decode failed: {err}");
708 return None;
709 }
710 };
711
712 let summary = parsed
713 .get("response")
714 .and_then(|value| value.as_str())
715 .map(|raw| raw.trim().to_string())
716 .filter(|text| !text.is_empty())?;
717
718 Some(summary)
719}
720
721pub fn replace_outer_slice(slices: Vec<OnionSlice>, new_outer_content: String) -> Vec<OnionSlice> {
729 let new_outer_content = new_outer_content.trim().to_string();
730 if new_outer_content.is_empty() {
731 return slices;
732 }
733
734 let mut old_outer_id: Option<String> = None;
735 let new_outer_id = OnionSlice::generate_id(&new_outer_content, SliceLayer::Outer);
736
737 let mut rebuilt: Vec<OnionSlice> = slices
738 .into_iter()
739 .map(|slice| {
740 if slice.layer == SliceLayer::Outer {
741 old_outer_id = Some(slice.id.clone());
742 let new_keywords = extract_keywords(&new_outer_content, 3);
743 OnionSlice {
744 id: new_outer_id.clone(),
745 layer: SliceLayer::Outer,
746 content: new_outer_content.clone(),
747 parent_id: slice.parent_id,
748 children_ids: slice.children_ids,
749 keywords: new_keywords,
750 }
751 } else {
752 slice
753 }
754 })
755 .collect();
756
757 if let Some(old_id) = old_outer_id {
758 for slice in &mut rebuilt {
759 for child in &mut slice.children_ids {
760 if *child == old_id {
761 *child = new_outer_id.clone();
762 }
763 }
764 }
765 }
766
767 rebuilt
768}
769
770pub async fn create_onion_slices_async(
779 content: &str,
780 metadata: &serde_json::Value,
781 config: &OnionSliceConfig,
782) -> Vec<OnionSlice> {
783 let llm_summary = resolve_llm_outer(content, &config.outer_synthesis).await;
784 let slices = create_onion_slices(content, metadata, config);
785 apply_optional_outer_override(slices, llm_summary)
786}
787
788pub async fn create_onion_slices_fast_async(
791 content: &str,
792 metadata: &serde_json::Value,
793 config: &OnionSliceConfig,
794) -> Vec<OnionSlice> {
795 let llm_summary = resolve_llm_outer(content, &config.outer_synthesis).await;
796 let slices = create_onion_slices_fast(content, metadata, config);
797 apply_optional_outer_override(slices, llm_summary)
798}
799
800async fn resolve_llm_outer(content: &str, strategy: &OuterSynthesis) -> Option<String> {
801 match strategy {
802 OuterSynthesis::Keyword => None,
803 OuterSynthesis::Llm { model, endpoint } => {
804 synthesize_outer_via_ollama(content, model, endpoint).await
805 }
806 }
807}
808
809fn apply_optional_outer_override(
810 slices: Vec<OnionSlice>,
811 summary: Option<String>,
812) -> Vec<OnionSlice> {
813 match summary {
814 Some(text) => replace_outer_slice(slices, text),
815 None => slices,
816 }
817}
818
819pub fn create_onion_slices_fast(
824 content: &str,
825 metadata: &serde_json::Value,
826 config: &OnionSliceConfig,
827) -> Vec<OnionSlice> {
828 if structured::is_structured_conversation(metadata) {
829 return structured::create_structured_onion_slices_fast(content, metadata, config);
830 }
831
832 let content = content.trim();
833
834 if content.len() < config.min_content_for_slicing {
836 return create_core_only_slice(content);
837 }
838
839 let mut slices = Vec::with_capacity(2);
840
841 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
843 let core_keywords = extract_keywords(content, 10);
844
845 let outer_content = create_outer_summary(content, &core_keywords, config.outer_target);
848 let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
849 let outer_keywords = extract_keywords(&outer_content, 3);
850
851 slices.push(OnionSlice {
853 id: outer_id.clone(),
854 layer: SliceLayer::Outer,
855 content: outer_content,
856 parent_id: Some(core_id.clone()),
857 children_ids: vec![],
858 keywords: outer_keywords,
859 });
860
861 slices.push(OnionSlice {
862 id: core_id,
863 layer: SliceLayer::Core,
864 content: content.to_string(),
865 parent_id: None,
866 children_ids: vec![outer_id],
867 keywords: core_keywords,
868 });
869
870 slices
871}
872
873const STOP_WORDS_EN: &[&str] = &[
875 "the",
876 "a",
877 "an",
878 "and",
879 "or",
880 "but",
881 "in",
882 "on",
883 "at",
884 "to",
885 "for",
886 "of",
887 "with",
888 "by",
889 "from",
890 "as",
891 "is",
892 "was",
893 "are",
894 "were",
895 "been",
896 "be",
897 "have",
898 "has",
899 "had",
900 "do",
901 "does",
902 "did",
903 "will",
904 "would",
905 "could",
906 "should",
907 "may",
908 "might",
909 "must",
910 "shall",
911 "can",
912 "this",
913 "that",
914 "these",
915 "those",
916 "i",
917 "you",
918 "he",
919 "she",
920 "it",
921 "we",
922 "they",
923 "what",
924 "which",
925 "who",
926 "whom",
927 "when",
928 "where",
929 "why",
930 "how",
931 "all",
932 "each",
933 "every",
934 "both",
935 "few",
936 "more",
937 "most",
938 "other",
939 "some",
940 "such",
941 "no",
942 "not",
943 "only",
944 "own",
945 "same",
946 "so",
947 "than",
948 "too",
949 "very",
950 "just",
951 "also",
952 "now",
953 "here",
954 "there",
955 "then",
956 "once",
957 "if",
958 "into",
959 "through",
960 "during",
961 "before",
962 "after",
963 "above",
964 "below",
965 "between",
966 "under",
967 "again",
968 "further",
969 "about",
970 "out",
971 "over",
972 "up",
973 "down",
974 "off",
975 "any",
976 "because",
977 "until",
978 "while",
979 "i'm",
980 "i've",
981 "i'll",
982 "you're",
983 "he's",
984 "she's",
985 "we're",
986 "they're",
987 "let's",
988 "that's",
989 "isn't",
990 "wasn't",
991 "aren't",
992 "weren't",
993 "doesn't",
994 "didn't",
995 "won't",
996 "wouldn't",
997 "shouldn't",
998 "couldn't",
999 "haven't",
1000 "hasn't",
1001 "hadn't",
1002];
1003
1004const STOP_WORDS_PL: &[&str] = &[
1008 "i",
1009 "w",
1010 "z",
1011 "na",
1012 "do",
1013 "od",
1014 "po",
1015 "za",
1016 "o",
1017 "u",
1018 "to",
1019 "ten",
1020 "ta",
1021 "te",
1022 "ci",
1023 "tej",
1024 "tym",
1025 "się",
1026 "być",
1027 "był",
1028 "była",
1029 "było",
1030 "byli",
1031 "być",
1032 "mam",
1033 "masz",
1034 "ma",
1035 "mamy",
1036 "macie",
1037 "mają",
1038 "jest",
1039 "są",
1040 "jestem",
1041 "jesteś",
1042 "był",
1043 "byli",
1044 "nie",
1045 "tak",
1046 "tu",
1047 "tam",
1048 "już",
1049 "jeszcze",
1050 "też",
1051 "także",
1052 "ale",
1053 "lub",
1054 "albo",
1055 "czy",
1056 "że",
1057 "iż",
1058 "który",
1059 "która",
1060 "które",
1061 "którzy",
1062 "co",
1063 "kto",
1064 "kogo",
1065 "kim",
1066 "czym",
1067 "gdzie",
1068 "kiedy",
1069 "skąd",
1070 "dokąd",
1071 "jak",
1072 "jaki",
1073 "jaka",
1074 "jakie",
1075 "moje",
1076 "moja",
1077 "mój",
1078 "moi",
1079 "twój",
1080 "twoja",
1081 "twoje",
1082 "nasz",
1083 "nasza",
1084 "nasze",
1085 "wasz",
1086 "wasza",
1087 "wasze",
1088 "ich",
1089 "jego",
1090 "jej",
1091 "im",
1092 "mu",
1093 "mi",
1094 "ci",
1095 "go",
1096 "ją",
1097 "je",
1098 "nas",
1099 "was",
1100 "wam",
1101 "nam",
1102 "tylko",
1103 "bardzo",
1104 "bardziej",
1105 "może",
1106 "można",
1107 "trzeba",
1108 "musi",
1109 "powinien",
1110 "raz",
1111 "razy",
1112 "potem",
1113 "wtedy",
1114 "więc",
1115 "wówczas",
1116 "natomiast",
1117 "jednak",
1118 "jeśli",
1119 "jeżeli",
1120 "kiedy",
1121 "podczas",
1122 "przed",
1123 "przez",
1124 "podczas",
1125 "ponieważ",
1126 "dlatego",
1127 "więc",
1128 "zatem",
1129 "tylko",
1130 "także",
1131 "również",
1132 "ponadto",
1133 "oraz",
1134 "lecz",
1135 "kiedyś",
1136 "nigdy",
1137 "zawsze",
1138 "często",
1139 "rzadko",
1140 "czasem",
1141 "może",
1142 "powinno",
1143 "może",
1144];
1145
1146const CLI_ANIMATION_GERUNDY: &[&str] = &[
1151 "brewing",
1152 "cogitating",
1153 "frosting",
1154 "grooving",
1155 "beaming",
1156 "booping",
1157 "schlepping",
1158 "computing",
1159 "mulling",
1160 "pondering",
1161 "meditating",
1162 "reflecting",
1163 "crunching",
1164 "synthesizing",
1165 "distilling",
1166 "forging",
1167 "crafting",
1168 "conjuring",
1169 "whipping",
1170 "channeling",
1171 "decoding",
1172 "encoding",
1173 "reasoning",
1174 "iterating",
1175 "marinating",
1176 "percolating",
1177 "simmering",
1178 "crystallizing",
1179 "massaging",
1180 "tinkering",
1181 "polishing",
1182 "thinking",
1183 "proofing",
1184 "bootstrapping",
1185 "shifttab",
1186 "tokens",
1187 "permissions",
1188 "bypass",
1189 "running",
1190 "thought",
1191];
1192
1193const CLI_CONTROL_TOKENS: &[&str] = &[
1195 "shifttab",
1196 "bypass",
1197 "thought",
1198 "thoughts",
1199 "tokens",
1200 "permissions",
1201 "running",
1202 "ran",
1203 "stdout",
1204 "stderr",
1205 "tool",
1206 "input",
1207 "output",
1208 "args",
1209 "result",
1210];
1211
1212const MARKDOWN_STRUCTURAL: &[&str] = &[
1215 "transcript",
1216 "user",
1217 "assistant",
1218 "system",
1219 "human",
1220 "model",
1221 "date",
1222 "started",
1223 "source",
1224 "cwd",
1225 "session",
1226 "session_id",
1227 "agent",
1228 "slice_mode",
1229 "layer",
1230 "metadata",
1231 "frontmatter",
1232 "claude",
1233 "codex",
1234 "gemini",
1235];
1236
1237fn build_default_stop_set() -> std::collections::HashSet<&'static str> {
1239 let mut set = std::collections::HashSet::new();
1240 set.extend(STOP_WORDS_EN.iter().copied());
1241 set.extend(STOP_WORDS_PL.iter().copied());
1242 set.extend(CLI_ANIMATION_GERUNDY.iter().copied());
1243 set.extend(CLI_CONTROL_TOKENS.iter().copied());
1244 set.extend(MARKDOWN_STRUCTURAL.iter().copied());
1245 set
1246}
1247
1248fn extract_keywords(text: &str, max_keywords: usize) -> Vec<String> {
1255 use std::collections::HashMap;
1256
1257 let stop_set = build_default_stop_set();
1258
1259 let mut word_counts: HashMap<String, usize> = HashMap::new();
1260 for raw in text.split_whitespace() {
1261 for token in tokenize_keyword_candidates(raw) {
1262 if token.len() >= 3
1263 && token.len() <= 30
1264 && !stop_set.contains(token.as_str())
1265 && !looks_like_session_token(&token)
1266 && !looks_like_path_fragment(&token)
1267 {
1268 *word_counts.entry(token).or_insert(0) += 1;
1269 }
1270 }
1271 }
1272
1273 let mut words: Vec<_> = word_counts.into_iter().collect();
1274 words.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
1282
1283 words
1284 .into_iter()
1285 .take(max_keywords)
1286 .map(|(word, _)| word)
1287 .collect()
1288}
1289
1290fn tokenize_keyword_candidates(raw: &str) -> Vec<String> {
1291 let mut tokens = Vec::new();
1292
1293 for segment in raw
1294 .split(|ch: char| !ch.is_alphanumeric())
1295 .filter(|segment| !segment.is_empty())
1296 {
1297 let compact: String = segment.chars().flat_map(|ch| ch.to_lowercase()).collect();
1298 let mut normalized = String::with_capacity(segment.len() * 2);
1299 let mut previous_is_lowercase = false;
1300
1301 for ch in segment.chars() {
1302 if ch.is_ascii_uppercase() && previous_is_lowercase {
1303 normalized.push(' ');
1304 }
1305
1306 normalized.push(ch.to_ascii_lowercase());
1307 previous_is_lowercase = ch.is_ascii_lowercase();
1308 }
1309
1310 let segment_tokens = normalized
1311 .split_whitespace()
1312 .map(str::trim)
1313 .filter(|value| !value.is_empty())
1314 .map(ToOwned::to_owned)
1315 .collect::<Vec<_>>();
1316
1317 tokens.extend(segment_tokens.iter().cloned());
1318
1319 if segment_tokens.len() > 1
1320 && compact.len() >= 3
1321 && compact.len() <= 30
1322 && !tokens.iter().any(|token| token == &compact)
1323 {
1324 tokens.push(compact);
1325 }
1326 }
1327
1328 tokens
1329}
1330
1331fn looks_like_session_token(token: &str) -> bool {
1332 let hex_chars = token.chars().filter(|ch| ch.is_ascii_hexdigit()).count();
1333 let digit_chars = token.chars().filter(|ch| ch.is_ascii_digit()).count();
1334 let alpha_chars = token.chars().filter(|ch| ch.is_ascii_alphabetic()).count();
1335
1336 token.len() > 12 && hex_chars == token.len()
1337 || digit_chars >= 6
1338 || (token.len() > 20 && alpha_chars < token.len() / 3)
1339}
1340
1341fn looks_like_path_fragment(token: &str) -> bool {
1349 if token.len() < 30 {
1350 return false;
1351 }
1352
1353 let separator_count = token
1356 .chars()
1357 .filter(|ch| matches!(ch, '/' | '_' | '-' | '.'))
1358 .count();
1359 if separator_count >= 3 {
1360 return true;
1361 }
1362
1363 let common_path_segments = [
1366 "src",
1367 "components",
1368 "users",
1369 "library",
1370 "claude",
1371 "polyversai",
1372 "vibecrafted",
1373 "rust",
1374 "memex",
1375 "session",
1376 "sessionid",
1377 "branch",
1378 "tsx",
1379 "json",
1380 "rs",
1381 "py",
1382 "node_modules",
1383 "git",
1384 ];
1385 let lower = token.to_ascii_lowercase();
1386 let segment_hits = common_path_segments
1387 .iter()
1388 .filter(|seg| lower.contains(*seg))
1389 .count();
1390 if segment_hits >= 2 {
1391 return true;
1392 }
1393
1394 let vowels: std::collections::HashSet<char> =
1397 ['a', 'e', 'i', 'o', 'u', 'y'].into_iter().collect();
1398 let mut max_vowel_run = 0;
1399 let mut current_run = 0;
1400 for ch in token.chars() {
1401 if vowels.contains(&ch.to_ascii_lowercase()) {
1402 current_run += 1;
1403 if current_run > max_vowel_run {
1404 max_vowel_run = current_run;
1405 }
1406 } else {
1407 current_run = 0;
1408 }
1409 }
1410 if token.len() > 40 && max_vowel_run <= 2 {
1411 return true;
1412 }
1413
1414 false
1415}
1416
1417fn hash_content(text: &str) -> String {
1419 let mut hash = compute_content_hash(text);
1420 hash.truncate(16);
1421 hash
1422}
1423
1424#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1425enum TranscriptRole {
1426 User,
1427 Assistant,
1428 Reasoning,
1429}
1430
1431#[derive(Debug, Clone)]
1432struct StructuredDialogEntry {
1433 time: Option<String>,
1434 role: TranscriptRole,
1435 text: String,
1436}
1437
1438#[derive(Debug, Default, Clone)]
1439struct MarkdownTranscriptTurn {
1440 start_time: Option<String>,
1441 end_time: Option<String>,
1442 user_segments: Vec<String>,
1443 assistant_segments: Vec<String>,
1444 reasoning_segments: Vec<String>,
1445}
1446
1447impl MarkdownTranscriptTurn {
1448 fn is_empty(&self) -> bool {
1449 self.user_segments.is_empty()
1450 && self.assistant_segments.is_empty()
1451 && self.reasoning_segments.is_empty()
1452 }
1453
1454 fn push(&mut self, role: TranscriptRole, time: &str, text: String) {
1455 if !time.is_empty() {
1456 if self.start_time.is_none() {
1457 self.start_time = Some(time.to_string());
1458 }
1459 self.end_time = Some(time.to_string());
1460 }
1461
1462 let target = match role {
1463 TranscriptRole::User => &mut self.user_segments,
1464 TranscriptRole::Assistant => &mut self.assistant_segments,
1465 TranscriptRole::Reasoning => &mut self.reasoning_segments,
1466 };
1467
1468 if target.last().is_some_and(|existing| existing == &text) {
1469 return;
1470 }
1471 target.push(text);
1472 }
1473}
1474
1475fn parse_transcript_header(line: &str) -> Option<HashMap<String, String>> {
1476 let inner = line.trim().strip_prefix('[')?.strip_suffix(']')?.trim();
1477
1478 if !inner.starts_with("project:") {
1479 return None;
1480 }
1481
1482 let mut fields = HashMap::new();
1483 for segment in inner.split(" | ") {
1484 let (key, value) = segment.split_once(':')?;
1485 fields.insert(key.trim().to_string(), value.trim().to_string());
1486 }
1487
1488 Some(fields)
1489}
1490
1491fn parse_transcript_role(role: &str) -> Option<TranscriptRole> {
1492 match role.trim().to_ascii_lowercase().as_str() {
1493 "user" | "human" => Some(TranscriptRole::User),
1494 "assistant" | "bot" | "model" => Some(TranscriptRole::Assistant),
1495 "reasoning" | "analysis" | "thinking" | "tool" => Some(TranscriptRole::Reasoning),
1496 _ => None,
1497 }
1498}
1499
1500fn parse_transcript_entry_line(line: &str) -> Option<(String, TranscriptRole, String)> {
1501 let trimmed = line.trim_start();
1502 let rest = trimmed.strip_prefix('[')?;
1503 let (time, rest) = rest.split_once(']')?;
1504 let is_timestamp = time.len() == 8
1505 && time.chars().enumerate().all(|(idx, ch)| match idx {
1506 2 | 5 => ch == ':',
1507 _ => ch.is_ascii_digit(),
1508 });
1509 if !is_timestamp {
1510 return None;
1511 }
1512
1513 let (role, body) = rest.trim_start().split_once(':')?;
1514 let role = parse_transcript_role(role)?;
1515
1516 Some((time.to_string(), role, body.trim_start().to_string()))
1517}
1518
1519fn normalize_role_aware_turn(turn: &MarkdownTranscriptTurn) -> Option<String> {
1520 let mut sections = Vec::new();
1521
1522 if !turn.user_segments.is_empty() {
1523 sections.push(format!("User request:\n{}", turn.user_segments.join("\n")));
1524 }
1525 if !turn.assistant_segments.is_empty() {
1526 sections.push(format!(
1527 "Assistant response:\n{}",
1528 turn.assistant_segments.join("\n")
1529 ));
1530 }
1531 if !turn.reasoning_segments.is_empty() {
1532 sections.push(format!(
1533 "Reasoning focus:\n{}",
1534 turn.reasoning_segments.join("\n")
1535 ));
1536 }
1537
1538 if sections.is_empty() {
1539 return None;
1540 }
1541
1542 Some(sections.join("\n\n"))
1543}
1544
1545fn build_role_aware_turn_documents(
1546 entries: Vec<StructuredDialogEntry>,
1547 doc_prefix: &str,
1548 base_metadata: serde_json::Map<String, serde_json::Value>,
1549) -> Vec<(String, String, serde_json::Value)> {
1550 let mut turns = Vec::new();
1551 let mut current_turn = MarkdownTranscriptTurn::default();
1552
1553 for entry in entries {
1554 let text = entry.text.trim();
1555 if text.is_empty() {
1556 continue;
1557 }
1558
1559 if matches!(entry.role, TranscriptRole::User) && !current_turn.is_empty() {
1560 turns.push(current_turn);
1561 current_turn = MarkdownTranscriptTurn::default();
1562 }
1563
1564 current_turn.push(
1565 entry.role,
1566 entry.time.as_deref().unwrap_or_default(),
1567 text.to_string(),
1568 );
1569 }
1570
1571 if !current_turn.is_empty() {
1572 turns.push(current_turn);
1573 }
1574
1575 let mut docs = Vec::new();
1576 for (idx, turn) in turns.into_iter().enumerate() {
1577 let Some(content) = normalize_role_aware_turn(&turn) else {
1578 continue;
1579 };
1580
1581 if content.len() < 20 {
1582 continue;
1583 }
1584
1585 let doc_id = format!("{doc_prefix}-{idx:04}-{}", hash_content(&content));
1586 let mut metadata = serde_json::Value::Object(base_metadata.clone());
1587
1588 if let serde_json::Value::Object(ref mut map) = metadata {
1589 map.insert("turn_index".to_string(), json!(idx));
1590 map.insert("start_time".to_string(), json!(turn.start_time));
1591 map.insert("end_time".to_string(), json!(turn.end_time));
1592 }
1593
1594 docs.push((doc_id, content, metadata));
1595 }
1596
1597 docs
1598}
1599
1600fn extract_markdown_transcript_documents(
1601 raw: &str,
1602 source_path: &std::path::Path,
1603) -> Option<Vec<(String, String, serde_json::Value)>> {
1604 let mut header = HashMap::new();
1605 let mut current_entry: Option<(String, TranscriptRole, String)> = None;
1606 let mut entries = Vec::new();
1607 let mut in_signals = false;
1608 let mut signal_lines = Vec::new();
1609
1610 for line in raw.lines() {
1611 if header.is_empty()
1612 && let Some(parsed) = parse_transcript_header(line)
1613 {
1614 header = parsed;
1615 continue;
1616 }
1617
1618 let trimmed = line.trim();
1619 if trimmed == "[signals]" {
1620 in_signals = true;
1621 continue;
1622 }
1623 if trimmed == "[/signals]" {
1624 in_signals = false;
1625 continue;
1626 }
1627
1628 if let Some((time, role, body)) = parse_transcript_entry_line(line) {
1629 if let Some(entry) = current_entry.take() {
1630 entries.push(entry);
1631 }
1632 current_entry = Some((time, role, body));
1633 continue;
1634 }
1635
1636 if in_signals {
1637 if !trimmed.is_empty() {
1638 signal_lines.push(trimmed.to_string());
1639 }
1640 continue;
1641 }
1642
1643 if let Some((_, _, ref mut text)) = current_entry
1644 && !trimmed.is_empty()
1645 {
1646 if !text.is_empty() {
1647 text.push('\n');
1648 }
1649 text.push_str(trimmed);
1650 }
1651 }
1652
1653 if let Some(entry) = current_entry.take() {
1654 entries.push(entry);
1655 }
1656
1657 if header.is_empty() || entries.is_empty() {
1658 return None;
1659 }
1660
1661 let project = header
1662 .get("project")
1663 .cloned()
1664 .unwrap_or_else(|| "unknown".to_string());
1665 let agent = header
1666 .get("agent")
1667 .cloned()
1668 .unwrap_or_else(|| "unknown".to_string());
1669 let date = header
1670 .get("date")
1671 .cloned()
1672 .unwrap_or_else(|| "unknown".to_string());
1673 let source_name = source_path
1674 .file_name()
1675 .and_then(|n| n.to_str())
1676 .unwrap_or("unknown");
1677 let transcript_id = hash_content(&source_path.to_string_lossy());
1678 let signals_summary = if signal_lines.is_empty() {
1679 None
1680 } else {
1681 Some(signal_lines.join("\n"))
1682 };
1683
1684 let entries = entries
1685 .into_iter()
1686 .map(|(time, role, text)| StructuredDialogEntry {
1687 time: Some(time),
1688 role,
1689 text,
1690 })
1691 .collect();
1692
1693 let mut metadata = serde_json::Map::new();
1694 metadata.insert("project".to_string(), json!(project));
1695 metadata.insert("agent".to_string(), json!(agent));
1696 metadata.insert("date".to_string(), json!(date));
1697 metadata.insert("source".to_string(), json!(source_name));
1698 metadata.insert("path".to_string(), json!(source_path.to_str()));
1699 metadata.insert("type".to_string(), json!("transcript_turn"));
1700 metadata.insert("format".to_string(), json!("markdown_transcript"));
1701
1702 if let Some(summary) = signals_summary.as_ref() {
1703 metadata.insert("signals".to_string(), json!(summary));
1704 }
1705
1706 let docs =
1707 build_role_aware_turn_documents(entries, &format!("mdturn-{transcript_id}"), metadata);
1708
1709 if docs.is_empty() { None } else { Some(docs) }
1710}
1711
1712fn extract_conversation_documents(
1716 value: &serde_json::Value,
1717 source_path: &std::path::Path,
1718) -> Option<Vec<(String, String, serde_json::Value)>> {
1719 let obj = value.as_object()?;
1720 let source_name = source_path
1721 .file_name()
1722 .and_then(|n| n.to_str())
1723 .unwrap_or("unknown");
1724
1725 fn extract_text_blocks(msg_obj: &serde_json::Map<String, serde_json::Value>) -> String {
1726 if let Some(text) = msg_obj.get("text").and_then(serde_json::Value::as_str) {
1727 return text.to_string();
1728 }
1729 if let Some(content) = msg_obj.get("content") {
1730 if let Some(text) = content.as_str() {
1731 return text.to_string();
1732 }
1733 if let Some(blocks) = content.as_array() {
1734 return blocks
1735 .iter()
1736 .filter_map(|block| block.get("text").and_then(serde_json::Value::as_str))
1737 .collect::<Vec<_>>()
1738 .join(" ");
1739 }
1740 }
1741 String::new()
1742 }
1743
1744 if let Some(serde_json::Value::Array(sessions)) = obj.get("sessions") {
1747 let project = obj
1748 .get("project")
1749 .and_then(|v| v.as_str())
1750 .unwrap_or("unknown");
1751
1752 let mut docs = Vec::new();
1753 for session in sessions {
1754 let session_obj = session.as_object()?;
1755 let session_id = session_obj
1756 .get("info")
1757 .and_then(|i| i.get("sessionId"))
1758 .and_then(|v| v.as_str())
1759 .unwrap_or("unknown");
1760 let session_short = &session_id[..session_id.len().min(8)];
1761
1762 let mut entries = Vec::new();
1763 if let Some(serde_json::Value::Array(messages)) = session_obj.get("messages") {
1764 for msg in messages {
1765 let msg_obj = match msg.as_object() {
1766 Some(o) => o,
1767 None => continue,
1768 };
1769 let Some(role) = msg_obj
1770 .get("role")
1771 .and_then(|v| v.as_str())
1772 .and_then(parse_transcript_role)
1773 else {
1774 continue;
1775 };
1776
1777 let text = msg_obj
1778 .get("text")
1779 .and_then(|v| v.as_str())
1780 .unwrap_or("")
1781 .trim()
1782 .to_string();
1783 if text.len() < 20 {
1784 continue;
1785 }
1786
1787 let timestamp = msg_obj
1788 .get("timestamp")
1789 .and_then(|v| v.as_str())
1790 .map(ToOwned::to_owned);
1791
1792 entries.push(StructuredDialogEntry {
1793 time: timestamp,
1794 role,
1795 text,
1796 });
1797 }
1798 }
1799
1800 let mut metadata = serde_json::Map::new();
1801 metadata.insert("session".to_string(), json!(session_short));
1802 metadata.insert("project".to_string(), json!(project));
1803 metadata.insert("source".to_string(), json!(source_name));
1804 metadata.insert("type".to_string(), json!("conversation"));
1805 metadata.insert("format".to_string(), json!("sessions"));
1806
1807 let mut session_docs = build_role_aware_turn_documents(
1808 entries,
1809 &format!("sess-{session_short}"),
1810 metadata,
1811 );
1812 docs.append(&mut session_docs);
1813 }
1814
1815 if !docs.is_empty() {
1816 tracing::info!(
1817 "Sessions format detected: {} -> {} turn documents",
1818 source_path.display(),
1819 docs.len()
1820 );
1821 return Some(docs);
1822 }
1823 }
1824
1825 if let Some(serde_json::Value::Array(messages)) = obj.get("messages") {
1829 let conv_id = obj
1830 .get("uuid")
1831 .or_else(|| obj.get("id"))
1832 .and_then(|v| v.as_str())
1833 .unwrap_or("unknown");
1834 let conv_short = &conv_id[..conv_id.len().min(8)];
1835 let title = obj
1836 .get("name")
1837 .or_else(|| obj.get("title"))
1838 .and_then(|v| v.as_str())
1839 .unwrap_or("");
1840
1841 let looks_like_conversation = messages.iter().any(|m| {
1842 m.get("sender").is_some() || m.get("role").is_some() || m.get("author").is_some()
1843 });
1844
1845 if looks_like_conversation {
1846 let mut entries = Vec::new();
1847 for msg in messages {
1848 let msg_obj = match msg.as_object() {
1849 Some(o) => o,
1850 None => continue,
1851 };
1852
1853 let Some(role) = msg_obj
1854 .get("sender")
1855 .or_else(|| msg_obj.get("role"))
1856 .or_else(|| msg_obj.get("author").and_then(|a| a.get("role")))
1857 .and_then(|v| v.as_str())
1858 .and_then(parse_transcript_role)
1859 else {
1860 continue;
1861 };
1862
1863 let text = extract_text_blocks(msg_obj).trim().to_string();
1864 if text.len() < 20 {
1865 continue;
1866 }
1867
1868 let timestamp = msg_obj
1869 .get("created_at")
1870 .or_else(|| msg_obj.get("timestamp"))
1871 .and_then(|v| v.as_str())
1872 .map(ToOwned::to_owned);
1873
1874 entries.push(StructuredDialogEntry {
1875 time: timestamp,
1876 role,
1877 text,
1878 });
1879 }
1880
1881 let mut metadata = serde_json::Map::new();
1882 metadata.insert("conversation".to_string(), json!(conv_short));
1883 metadata.insert("title".to_string(), json!(title));
1884 metadata.insert("source".to_string(), json!(source_name));
1885 metadata.insert("type".to_string(), json!("conversation"));
1886 metadata.insert("format".to_string(), json!("claude_web"));
1887
1888 let docs =
1889 build_role_aware_turn_documents(entries, &format!("conv-{conv_short}"), metadata);
1890
1891 if !docs.is_empty() {
1892 tracing::info!(
1893 "Conversation format detected: {} -> {} turn documents",
1894 source_path.display(),
1895 docs.len()
1896 );
1897 return Some(docs);
1898 }
1899 }
1900 }
1901
1902 if let Some(serde_json::Value::Array(messages)) = obj.get("chat_messages") {
1905 let conv_id = obj
1906 .get("uuid")
1907 .and_then(|v| v.as_str())
1908 .unwrap_or("unknown");
1909 let conv_short = &conv_id[..conv_id.len().min(8)];
1910 let title = obj.get("name").and_then(|v| v.as_str()).unwrap_or("");
1911
1912 let mut entries = Vec::new();
1913 for msg in messages {
1914 let msg_obj = match msg.as_object() {
1915 Some(o) => o,
1916 None => continue,
1917 };
1918
1919 let Some(role) = msg_obj
1920 .get("sender")
1921 .and_then(|v| v.as_str())
1922 .and_then(parse_transcript_role)
1923 else {
1924 continue;
1925 };
1926
1927 let text = msg_obj
1928 .get("text")
1929 .and_then(|v| v.as_str())
1930 .unwrap_or("")
1931 .trim()
1932 .to_string();
1933 if text.len() < 20 {
1934 continue;
1935 }
1936
1937 let timestamp = msg_obj
1938 .get("created_at")
1939 .and_then(|v| v.as_str())
1940 .map(ToOwned::to_owned);
1941
1942 entries.push(StructuredDialogEntry {
1943 time: timestamp,
1944 role,
1945 text,
1946 });
1947 }
1948
1949 let mut metadata = serde_json::Map::new();
1950 metadata.insert("conversation".to_string(), json!(conv_short));
1951 metadata.insert("title".to_string(), json!(title));
1952 metadata.insert("source".to_string(), json!(source_name));
1953 metadata.insert("type".to_string(), json!("conversation"));
1954 metadata.insert("format".to_string(), json!("claude_web"));
1955
1956 let docs =
1957 build_role_aware_turn_documents(entries, &format!("chat-{conv_short}"), metadata);
1958
1959 if !docs.is_empty() {
1960 tracing::info!(
1961 "Claude.ai chat_messages format detected: {} -> {} turn documents",
1962 source_path.display(),
1963 docs.len()
1964 );
1965 return Some(docs);
1966 }
1967 }
1968
1969 if let Some(serde_json::Value::Object(mapping)) = obj.get("mapping") {
1972 let conv_id = obj
1973 .get("id")
1974 .or_else(|| obj.get("conversation_id"))
1975 .and_then(|v| v.as_str())
1976 .unwrap_or("unknown");
1977 let conv_short = &conv_id[..conv_id.len().min(8)];
1978 let title = obj.get("title").and_then(|v| v.as_str()).unwrap_or("");
1979
1980 let mut entries: Vec<_> = mapping.iter().collect();
1981 entries.sort_by(|a, b| {
1982 let time_a =
1983 a.1.get("message")
1984 .and_then(|m| m.get("create_time"))
1985 .and_then(|t| t.as_f64())
1986 .unwrap_or(0.0);
1987 let time_b =
1988 b.1.get("message")
1989 .and_then(|m| m.get("create_time"))
1990 .and_then(|t| t.as_f64())
1991 .unwrap_or(0.0);
1992 time_a
1993 .partial_cmp(&time_b)
1994 .unwrap_or(std::cmp::Ordering::Equal)
1995 });
1996
1997 let mut dialog_entries = Vec::new();
1998 for (_node_id, node) in entries {
1999 let message = match node.get("message") {
2000 Some(m) => m,
2001 None => continue,
2002 };
2003
2004 let Some(role) = message
2005 .get("author")
2006 .and_then(|a| a.get("role"))
2007 .and_then(|v| v.as_str())
2008 .and_then(parse_transcript_role)
2009 else {
2010 continue;
2011 };
2012
2013 let text = message
2014 .get("content")
2015 .and_then(|c| c.get("parts"))
2016 .and_then(|p| p.as_array())
2017 .map(|parts| {
2018 parts
2019 .iter()
2020 .filter_map(|p| p.as_str())
2021 .collect::<Vec<_>>()
2022 .join(" ")
2023 })
2024 .unwrap_or_default()
2025 .trim()
2026 .to_string();
2027 if text.len() < 20 {
2028 continue;
2029 }
2030
2031 let timestamp = message
2032 .get("create_time")
2033 .and_then(|t| t.as_f64())
2034 .and_then(|ts| chrono::DateTime::from_timestamp(ts as i64, 0))
2035 .map(|dt| dt.to_rfc3339());
2036
2037 dialog_entries.push(StructuredDialogEntry {
2038 time: timestamp,
2039 role,
2040 text,
2041 });
2042 }
2043
2044 let mut metadata = serde_json::Map::new();
2045 metadata.insert("conversation".to_string(), json!(conv_short));
2046 metadata.insert("title".to_string(), json!(title));
2047 metadata.insert("source".to_string(), json!(source_name));
2048 metadata.insert("type".to_string(), json!("conversation"));
2049 metadata.insert("format".to_string(), json!("chatgpt"));
2050
2051 let docs =
2052 build_role_aware_turn_documents(dialog_entries, &format!("gpt-{conv_short}"), metadata);
2053
2054 if !docs.is_empty() {
2055 tracing::info!(
2056 "ChatGPT format detected: {} -> {} turn documents",
2057 source_path.display(),
2058 docs.len()
2059 );
2060 return Some(docs);
2061 }
2062 }
2063
2064 None
2065}
2066
2067fn extract_json_element_content(value: &serde_json::Value) -> String {
2070 match value {
2071 serde_json::Value::String(s) => s.clone(),
2072 serde_json::Value::Object(map) => {
2073 let mut parts = Vec::new();
2074
2075 for key in [
2077 "content",
2078 "text",
2079 "message",
2080 "summary",
2081 "description",
2082 "body",
2083 ] {
2084 if let Some(serde_json::Value::String(s)) = map.get(key)
2085 && !s.is_empty()
2086 {
2087 parts.push(s.clone());
2088 }
2089 }
2090
2091 if let Some(serde_json::Value::String(role)) = map.get("role")
2093 && let Some(content) = map.get("content")
2094 {
2095 match content {
2096 serde_json::Value::String(s) => {
2097 parts.push(format!("{}: {}", role, s));
2098 }
2099 serde_json::Value::Array(arr) => {
2100 for item in arr {
2102 if let serde_json::Value::Object(block) = item
2103 && let Some(serde_json::Value::String(t)) = block.get("text")
2104 {
2105 parts.push(format!("{}: {}", role, t));
2106 }
2107 }
2108 }
2109 _ => {}
2110 }
2111 }
2112
2113 if let Some(serde_json::Value::Array(messages)) = map.get("messages") {
2115 for msg in messages.iter().take(50) {
2116 let msg_content = extract_json_element_content(msg);
2118 if !msg_content.is_empty() && msg_content.len() > 10 {
2119 parts.push(msg_content);
2120 }
2121 }
2122 }
2123
2124 if let Some(serde_json::Value::Array(messages)) = map.get("chat_messages") {
2126 for msg in messages.iter().take(50) {
2127 let msg_content = extract_json_element_content(msg);
2128 if !msg_content.is_empty() && msg_content.len() > 10 {
2129 parts.push(msg_content);
2130 }
2131 }
2132 }
2133
2134 if let Some(serde_json::Value::String(name)) = map.get("name")
2136 && let Some(serde_json::Value::Array(obs)) = map.get("observations")
2137 {
2138 let observations: Vec<String> = obs
2139 .iter()
2140 .filter_map(|v| v.as_str().map(String::from))
2141 .take(10)
2142 .collect();
2143 if !observations.is_empty() {
2144 parts.push(format!("{}: {}", name, observations.join("; ")));
2145 }
2146 }
2147
2148 for key in ["title", "name", "uuid", "id"] {
2150 if let Some(serde_json::Value::String(s)) = map.get(key) {
2151 if !s.is_empty() && parts.iter().all(|p| !p.contains(s)) {
2152 parts.insert(0, format!("[{}]", s));
2153 }
2154 break;
2155 }
2156 }
2157
2158 if parts.is_empty() {
2159 serde_json::to_string(value)
2161 .unwrap_or_default()
2162 .chars()
2163 .take(5000)
2164 .collect()
2165 } else {
2166 parts.join("\n")
2167 }
2168 }
2169 serde_json::Value::Array(arr) => {
2170 arr.iter()
2172 .take(20)
2173 .map(extract_json_element_content)
2174 .filter(|s| !s.is_empty())
2175 .collect::<Vec<_>>()
2176 .join("\n")
2177 }
2178 _ => value.to_string(),
2179 }
2180}
2181
2182fn detect_json_element_type(value: &serde_json::Value) -> &'static str {
2184 if let serde_json::Value::Object(map) = value {
2185 if map.contains_key("chat_messages") || map.contains_key("mapping") {
2187 return "conversation";
2188 }
2189 if map.contains_key("messages") && map.contains_key("sessions") {
2190 return "session";
2191 }
2192 if map.contains_key("role") && map.contains_key("content") {
2193 return "message";
2194 }
2195 if map.contains_key("observations") && map.contains_key("name") {
2196 return "entity";
2197 }
2198 if map.contains_key("messages") {
2199 return "thread";
2200 }
2201 "object"
2202 } else if value.is_array() {
2203 "array"
2204 } else if value.is_string() {
2205 "text"
2206 } else {
2207 "value"
2208 }
2209}
2210
2211fn extract_key_content(text: &str, target_chars: usize) -> String {
2214 if text.len() <= target_chars {
2215 return text.to_string();
2216 }
2217
2218 let sentences: Vec<&str> = text
2220 .split(['.', '!', '?'])
2221 .map(|s| s.trim())
2222 .filter(|s| !s.is_empty())
2223 .collect();
2224
2225 if sentences.is_empty() {
2226 return truncate_at_word_boundary(text, target_chars);
2228 }
2229
2230 let keywords = extract_keywords(text, 10);
2232 let keyword_set: std::collections::HashSet<&str> =
2233 keywords.iter().map(|s| s.as_str()).collect();
2234
2235 let mut scored_sentences: Vec<(usize, f32, &str)> = sentences
2236 .iter()
2237 .enumerate()
2238 .map(|(idx, sentence)| {
2239 let mut score = 0.0_f32;
2240
2241 if idx == 0 {
2243 score += 2.0;
2244 } else if idx == sentences.len() - 1 {
2245 score += 1.5;
2246 }
2247
2248 let words: Vec<&str> = sentence.split_whitespace().collect();
2250 let keyword_count = words
2251 .iter()
2252 .filter(|w| {
2253 let cleaned: String = w
2254 .chars()
2255 .filter(|c| c.is_alphanumeric())
2256 .collect::<String>()
2257 .to_lowercase();
2258 keyword_set.contains(cleaned.as_str())
2259 })
2260 .count();
2261
2262 if !words.is_empty() {
2263 score += (keyword_count as f32 / words.len() as f32) * 3.0;
2264 }
2265
2266 if sentence.len() < 20 {
2268 score -= 0.5;
2269 }
2270
2271 (idx, score, *sentence)
2272 })
2273 .collect();
2274
2275 scored_sentences.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2277
2278 let mut selected_indices: Vec<usize> = Vec::new();
2280 let mut total_len = 0;
2281
2282 for (idx, _, sentence) in &scored_sentences {
2283 let sentence_len = sentence.len() + 2; if total_len + sentence_len > target_chars && !selected_indices.is_empty() {
2285 break;
2286 }
2287 selected_indices.push(*idx);
2288 total_len += sentence_len;
2289 }
2290
2291 selected_indices.sort();
2293
2294 let result: Vec<&str> = selected_indices
2296 .iter()
2297 .filter_map(|&idx| sentences.get(idx).copied())
2298 .collect();
2299
2300 if result.is_empty() {
2301 truncate_at_word_boundary(text, target_chars)
2302 } else {
2303 result.join(". ") + "."
2304 }
2305}
2306
2307fn create_outer_summary(middle_content: &str, keywords: &[String], target_chars: usize) -> String {
2309 let keyword_prefix = if !keywords.is_empty() {
2311 format!(
2312 "[{}] ",
2313 keywords
2314 .iter()
2315 .take(5)
2316 .cloned()
2317 .collect::<Vec<_>>()
2318 .join(", ")
2319 )
2320 } else {
2321 String::new()
2322 };
2323
2324 let remaining_chars = target_chars.saturating_sub(keyword_prefix.len());
2325
2326 let first_sentence = middle_content
2328 .split(['.', '!', '?'])
2329 .next()
2330 .unwrap_or(middle_content)
2331 .trim();
2332
2333 let summary = if first_sentence.len() <= remaining_chars {
2334 first_sentence.to_string()
2335 } else {
2336 truncate_at_word_boundary(first_sentence, remaining_chars)
2337 };
2338
2339 format!("{}{}", keyword_prefix, summary)
2340}
2341
2342fn truncate_at_word_boundary(text: &str, max_chars: usize) -> String {
2344 let char_count = text.chars().count();
2345 if char_count <= max_chars {
2346 return text.to_string();
2347 }
2348
2349 let byte_idx = text
2351 .char_indices()
2352 .nth(max_chars)
2353 .map(|(idx, _)| idx)
2354 .unwrap_or(text.len());
2355
2356 let truncated = &text[..byte_idx];
2357
2358 if let Some(last_space) = truncated.rfind(' ') {
2360 format!("{}...", &text[..last_space])
2361 } else {
2362 format!("{}...", truncated)
2363 }
2364}
2365
2366pub struct RAGPipeline {
2367 mlx_bridge: Arc<Mutex<MLXBridge>>,
2368 storage: Arc<StorageManager>,
2369 bm25_writer: Option<Arc<BM25Index>>,
2370}
2371
2372impl RAGPipeline {
2373 pub async fn new(
2375 mlx_bridge: Arc<Mutex<MLXBridge>>,
2376 storage: Arc<StorageManager>,
2377 ) -> Result<Self> {
2378 Self::new_with_bm25(mlx_bridge, storage, None).await
2379 }
2380
2381 pub async fn new_with_bm25(
2382 mlx_bridge: Arc<Mutex<MLXBridge>>,
2383 storage: Arc<StorageManager>,
2384 bm25_writer: Option<Arc<BM25Index>>,
2385 ) -> Result<Self> {
2386 Ok(Self {
2387 mlx_bridge,
2388 storage,
2389 bm25_writer,
2390 })
2391 }
2392
2393 pub fn storage_manager(&self) -> Arc<StorageManager> {
2394 self.storage.clone()
2395 }
2396
2397 pub async fn embedding_healthcheck(&self) -> Result<()> {
2398 self.mlx_bridge.lock().await.embed("healthcheck").await?;
2399 Ok(())
2400 }
2401
2402 pub async fn refresh(&self) -> Result<()> {
2404 self.storage.refresh().await
2405 }
2406
2407 async fn persist_documents(&self, documents: Vec<ChromaDocument>) -> Result<()> {
2418 if documents.is_empty() {
2419 return Ok(());
2420 }
2421
2422 let mut unique_documents = Vec::with_capacity(documents.len());
2423 let mut seen_ids: HashSet<(String, String)> = HashSet::new();
2424 let mut seen_hashes: HashSet<(String, String)> = HashSet::new();
2425
2426 for mut document in documents {
2427 if let Value::Object(ref mut map) = document.metadata {
2428 map.entry("indexed_at".to_string())
2429 .or_insert_with(|| json!(chrono::Utc::now().to_rfc3339()));
2430 }
2431
2432 let id_key = (document.namespace.clone(), document.id.clone());
2433 if !seen_ids.insert(id_key) {
2434 continue;
2435 }
2436
2437 if let Some(hash) = document.content_hash.as_ref() {
2438 let hash_key = (document.namespace.clone(), hash.clone());
2439 if !seen_hashes.insert(hash_key) {
2440 continue;
2441 }
2442 }
2443
2444 unique_documents.push(document);
2445 }
2446
2447 let documents = self
2448 .filter_documents_against_store(unique_documents)
2449 .await?;
2450 if documents.is_empty() {
2451 return Ok(());
2452 }
2453
2454 let bm25_documents: Vec<(String, String, String)> = documents
2455 .iter()
2456 .map(|doc| (doc.id.clone(), doc.namespace.clone(), doc.document.clone()))
2457 .collect();
2458 let inserted_ids: Vec<(String, String)> = documents
2459 .iter()
2460 .map(|doc| (doc.namespace.clone(), doc.id.clone()))
2461 .collect();
2462 let mut recovery_batch = self
2463 .bm25_writer
2464 .as_ref()
2465 .map(|_| CrossStoreRecoveryBatch::from_documents(&documents));
2466 let recovery_path = if let Some(batch) = recovery_batch.as_ref() {
2467 Some(self.storage.persist_cross_store_recovery_batch(batch)?)
2468 } else {
2469 None
2470 };
2471
2472 if let Err(error) = self.storage.add_to_store(documents).await {
2473 if let (Some(batch), Some(path)) = (recovery_batch.as_mut(), recovery_path.as_ref()) {
2474 batch.last_error = Some(format!("Lance write failed: {error}"));
2475 let _ = self.storage.update_cross_store_recovery_batch(batch);
2476 return Err(anyhow!(
2477 "Lance write failed while cross-store recovery ledger was active at {}: {}. \
2478 This path is not crash-safe; run `rust-memex repair-writes --execute` to reconcile Lance/BM25 truth after investigating the primary failure.",
2479 path.display(),
2480 error
2481 ));
2482 }
2483 return Err(error);
2484 }
2485
2486 if let Some(bm25_writer) = &self.bm25_writer
2487 && let Err(error) = bm25_writer.add_documents(&bm25_documents).await
2488 {
2489 let mut rollback_failures = 0usize;
2490 for (namespace, id) in &inserted_ids {
2491 if self.storage.delete_document(namespace, id).await.is_err() {
2492 rollback_failures += 1;
2493 }
2494 }
2495
2496 if let Some(batch) = recovery_batch.as_mut() {
2497 batch.status = if rollback_failures == 0 {
2498 CrossStoreRecoveryStatus::RolledBack
2499 } else {
2500 CrossStoreRecoveryStatus::Pending
2501 };
2502 batch.last_error = Some(format!(
2503 "BM25 write failed after Lance persist: {error}; lance_rollback_failures={rollback_failures}"
2504 ));
2505 let _ = self.storage.update_cross_store_recovery_batch(batch);
2506 }
2507
2508 return Err(anyhow!(
2509 "BM25 write failed after Lance persist: {}. Recovery ledger preserved at {}. \
2510 Same-process rollback attempted for {} documents and {} rollback operations failed. \
2511 This remains recoverable through `rust-memex repair-writes --execute`, but it is not the same as crash-safe cross-store atomicity.",
2512 error,
2513 recovery_path
2514 .as_ref()
2515 .map(|path| path.display().to_string())
2516 .unwrap_or_else(|| "<not-recorded>".to_string()),
2517 inserted_ids.len(),
2518 rollback_failures
2519 ));
2520 }
2521
2522 if let Some(batch) = recovery_batch {
2523 self.storage
2524 .clear_cross_store_recovery_batch(&batch.batch_id)?;
2525 }
2526 Ok(())
2527 }
2528
2529 async fn filter_documents_against_store(
2530 &self,
2531 documents: Vec<ChromaDocument>,
2532 ) -> Result<Vec<ChromaDocument>> {
2533 if documents.is_empty() {
2534 return Ok(vec![]);
2535 }
2536
2537 let mut hashes_by_namespace: HashMap<String, Vec<String>> = HashMap::new();
2538 for document in &documents {
2539 if let Some(hash) = document.content_hash.as_ref() {
2540 hashes_by_namespace
2541 .entry(document.namespace.clone())
2542 .or_default()
2543 .push(hash.clone());
2544 }
2545 }
2546
2547 let mut allowed_hashes: HashMap<String, HashSet<String>> = HashMap::new();
2548 for (namespace, hashes) in hashes_by_namespace {
2549 let hashes = self
2550 .storage
2551 .filter_existing_hashes(&namespace, &hashes)
2552 .await?;
2553 allowed_hashes.insert(
2554 namespace,
2555 hashes.into_iter().cloned().collect::<HashSet<_>>(),
2556 );
2557 }
2558
2559 Ok(documents
2560 .into_iter()
2561 .filter(|document| match document.content_hash.as_ref() {
2562 None => true,
2563 Some(hash) => allowed_hashes
2564 .get(&document.namespace)
2565 .map(|hashes| hashes.contains(hash))
2566 .unwrap_or(true),
2567 })
2568 .collect())
2569 }
2570
2571 async fn clear_namespace_from_indices(&self, namespace: &str) -> Result<usize> {
2572 let deleted = self.storage.delete_namespace_documents(namespace).await?;
2573
2574 if let Some(bm25_writer) = &self.bm25_writer {
2575 bm25_writer.delete_namespace_term(namespace).await?;
2576 }
2577
2578 Ok(deleted)
2579 }
2580
2581 async fn load_memory_family(&self, namespace: &str, id: &str) -> Result<Vec<ChromaDocument>> {
2582 let docs = self.storage.get_all_in_namespace(namespace).await?;
2583 Ok(docs
2584 .into_iter()
2585 .filter(|doc| {
2586 doc.id == id
2587 || doc
2588 .metadata
2589 .get("original_id")
2590 .and_then(|value| value.as_str())
2591 .is_some_and(|original_id| original_id == id)
2592 })
2593 .collect())
2594 }
2595
2596 async fn delete_memory_family(&self, namespace: &str, id: &str) -> Result<usize> {
2597 let family = self.load_memory_family(namespace, id).await?;
2598 if family.is_empty() {
2599 return Ok(0);
2600 }
2601
2602 let mut deleted = 0usize;
2603 let mut ids = Vec::with_capacity(family.len());
2604
2605 for document in family {
2606 deleted += self
2607 .storage
2608 .delete_document(namespace, &document.id)
2609 .await?
2610 .min(1);
2611 ids.push(document.id);
2612 }
2613
2614 if let Some(bm25_writer) = &self.bm25_writer
2615 && !ids.is_empty()
2616 {
2617 bm25_writer.delete_documents(&ids).await?;
2618 }
2619
2620 Ok(deleted)
2621 }
2622
2623 fn preferred_memory_family_document(
2624 mut family: Vec<ChromaDocument>,
2625 requested_id: &str,
2626 ) -> Option<ChromaDocument> {
2627 fn rank(layer: Option<SliceLayer>) -> u8 {
2628 match layer {
2629 None => 0,
2630 Some(SliceLayer::Outer) => 1,
2631 Some(SliceLayer::Middle) => 2,
2632 Some(SliceLayer::Inner) => 3,
2633 Some(SliceLayer::Core) => 4,
2634 }
2635 }
2636
2637 fn chunk_index(document: &ChromaDocument) -> usize {
2638 document
2639 .metadata
2640 .get("chunk_index")
2641 .and_then(|value| value.as_u64())
2642 .and_then(|value| usize::try_from(value).ok())
2643 .unwrap_or(usize::MAX)
2644 }
2645
2646 family.sort_by_key(|document| {
2647 if document.id == requested_id {
2648 (0_u8, 0_u8, 0_usize)
2649 } else {
2650 (1_u8, rank(document.slice_layer()), chunk_index(document))
2651 }
2652 });
2653
2654 family.into_iter().next()
2655 }
2656
2657 pub fn mlx_connected_to(&self) -> String {
2659 if let Ok(bridge) = self.mlx_bridge.try_lock() {
2661 bridge.connected_to().to_string()
2662 } else {
2663 "mlx (lock held)".to_string()
2664 }
2665 }
2666
2667 pub async fn index_document(&self, path: &Path, namespace: Option<&str>) -> Result<()> {
2668 self.index_document_with_mode(path, namespace, SliceMode::default())
2669 .await
2670 }
2671
2672 pub async fn index_document_with_mode(
2674 &self,
2675 path: &Path,
2676 namespace: Option<&str>,
2677 slice_mode: SliceMode,
2678 ) -> Result<()> {
2679 self.index_document_internal(path, namespace, None, slice_mode)
2680 .await
2681 }
2682
2683 pub async fn index_document_with_preprocessing(
2685 &self,
2686 path: &Path,
2687 namespace: Option<&str>,
2688 preprocess_config: PreprocessingConfig,
2689 ) -> Result<()> {
2690 self.index_document_internal(path, namespace, Some(preprocess_config), SliceMode::Flat)
2691 .await
2692 }
2693
2694 pub async fn index_document_with_dedup(
2699 &self,
2700 path: &Path,
2701 namespace: Option<&str>,
2702 slice_mode: SliceMode,
2703 ) -> Result<IndexResult> {
2704 self.storage.require_current_schema_for_writes().await?;
2705
2706 let validated_path = crate::path_utils::validate_read_path(path)?;
2708 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
2709
2710 let is_json = validated_path
2713 .extension()
2714 .and_then(|e| e.to_str())
2715 .map(|e| e.eq_ignore_ascii_case("json"))
2716 .unwrap_or(false);
2717
2718 if is_json || matches!(slice_mode, SliceMode::Onion | SliceMode::OnionFast) {
2719 return self
2720 .index_document_with_json_awareness(&validated_path, ns, slice_mode)
2721 .await;
2722 }
2723
2724 let text = self.extract_text(&validated_path).await?;
2726
2727 let content_hash = compute_content_hash(&text);
2729
2730 if self.storage.has_content_hash(ns, &content_hash).await? {
2732 debug!(
2733 "Skipping duplicate content: {} (hash: {})",
2734 path.display(),
2735 &content_hash[..16]
2736 );
2737 return Ok(IndexResult::Skipped {
2738 reason: "exact duplicate".to_string(),
2739 content_hash,
2740 });
2741 }
2742
2743 let base_metadata = json!({
2744 "path": path.to_str(),
2745 "slice_mode": "flat",
2746 "content_hash": &content_hash,
2747 });
2748
2749 let (chunks_indexed, embedder_ms, tokens_estimated) = self
2750 .index_with_flat_chunking_and_hash(&text, ns, path, base_metadata, &content_hash)
2751 .await?;
2752
2753 Ok(IndexResult::Indexed {
2754 chunks_indexed,
2755 content_hash,
2756 embedder_ms: Some(embedder_ms),
2757 tokens_estimated: Some(tokens_estimated),
2758 })
2759 }
2760
2761 pub async fn index_document_with_chunker(
2763 &self,
2764 path: &Path,
2765 namespace: Option<&str>,
2766 chunker: ChunkerKind,
2767 slice_mode: SliceMode,
2768 dedup: bool,
2769 ) -> Result<IndexResult> {
2770 if chunker != ChunkerKind::Aicx {
2771 let effective_mode = chunker.slice_mode(slice_mode);
2772 if dedup {
2773 return self
2774 .index_document_with_dedup(path, namespace, effective_mode)
2775 .await;
2776 }
2777 self.index_document_with_mode(path, namespace, effective_mode)
2778 .await?;
2779 return Ok(IndexResult::Indexed {
2780 chunks_indexed: 1,
2781 content_hash: String::new(),
2782 embedder_ms: None,
2783 tokens_estimated: None,
2784 });
2785 }
2786
2787 let validated_path = crate::path_utils::validate_read_path(path)?;
2788 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
2789 let text = self.extract_text(&validated_path).await?;
2790 let content_hash = compute_content_hash(&text);
2791
2792 if dedup && self.storage.has_content_hash(ns, &content_hash).await? {
2793 debug!(
2794 "Skipping duplicate content: {} (hash: {})",
2795 path.display(),
2796 &content_hash[..16]
2797 );
2798 return Ok(IndexResult::Skipped {
2799 reason: "exact duplicate".to_string(),
2800 content_hash,
2801 });
2802 }
2803
2804 let file_content = FileContent {
2805 path: validated_path,
2806 text,
2807 namespace: ns.to_string(),
2808 content_hash: content_hash.clone(),
2809 };
2810 let opts = ChunkOpts::new(chunker, SliceMode::Flat, OuterSynthesis::default());
2811 let provider = chunker.into_provider();
2812 let chunks = provider.chunk(&file_content, &opts).await?;
2813 let (chunks_indexed, embedder_ms, tokens_estimated) =
2814 self.embed_and_store_provider_chunks(chunks).await?;
2815
2816 Ok(IndexResult::Indexed {
2817 chunks_indexed,
2818 content_hash,
2819 embedder_ms: Some(embedder_ms),
2820 tokens_estimated: Some(tokens_estimated),
2821 })
2822 }
2823
2824 async fn index_document_with_json_awareness(
2830 &self,
2831 path: &Path,
2832 namespace: &str,
2833 slice_mode: SliceMode,
2834 ) -> Result<IndexResult> {
2835 let documents = self.extract_json_documents(path).await?;
2837
2838 let mut total_chunks = 0;
2839 let mut total_embedder_ms = 0u64;
2840 let mut total_tokens_estimated = 0usize;
2841 let mut saw_metrics = false;
2842 let mut skipped_docs = 0;
2843 let file_content_hash = match crate::path_utils::safe_read_to_string_async(path).await {
2844 Ok((_p, content)) => compute_content_hash(&content),
2845 Err(_) => compute_content_hash(""),
2846 };
2847
2848 for (doc_id, content, mut doc_metadata) in documents {
2849 if content.len() < 50 {
2850 continue; }
2852
2853 let doc_hash = compute_content_hash(&content);
2855
2856 if self.storage.has_content_hash(namespace, &doc_hash).await? {
2858 skipped_docs += 1;
2859 continue;
2860 }
2861
2862 if let serde_json::Value::Object(ref mut map) = doc_metadata {
2864 map.insert("doc_id".to_string(), json!(doc_id));
2865 map.insert("content_hash".to_string(), json!(doc_hash));
2866 map.insert("file_hash".to_string(), json!(&file_content_hash));
2867 map.insert(
2868 "slice_mode".to_string(),
2869 json!(match slice_mode {
2870 SliceMode::Onion => "onion",
2871 SliceMode::OnionFast => "onion-fast",
2872 SliceMode::Flat => "flat",
2873 }),
2874 );
2875 }
2876
2877 let (chunks, embedder_ms, tokens_estimated) = match slice_mode {
2878 SliceMode::Onion => {
2879 let (chunks, embedder_ms, tokens_estimated) = self
2880 .index_with_onion_slicing_and_hash(
2881 &content,
2882 namespace,
2883 doc_metadata,
2884 &doc_hash,
2885 )
2886 .await?;
2887 (chunks, Some(embedder_ms), Some(tokens_estimated))
2888 }
2889 SliceMode::OnionFast => {
2890 let (chunks, embedder_ms, tokens_estimated) = self
2891 .index_with_onion_slicing_fast_and_hash(
2892 &content,
2893 namespace,
2894 doc_metadata,
2895 &doc_hash,
2896 )
2897 .await?;
2898 (chunks, Some(embedder_ms), Some(tokens_estimated))
2899 }
2900 SliceMode::Flat => {
2901 let (chunks, embedder_ms, tokens_estimated) = self
2902 .index_with_flat_chunking_and_hash(
2903 &content,
2904 namespace,
2905 path,
2906 doc_metadata,
2907 &doc_hash,
2908 )
2909 .await?;
2910 (chunks, Some(embedder_ms), Some(tokens_estimated))
2911 }
2912 };
2913
2914 total_chunks += chunks;
2915 if let Some(embedder_ms) = embedder_ms {
2916 total_embedder_ms += embedder_ms;
2917 saw_metrics = true;
2918 }
2919 if let Some(tokens_estimated) = tokens_estimated {
2920 total_tokens_estimated += tokens_estimated;
2921 saw_metrics = true;
2922 }
2923 }
2924
2925 if total_chunks == 0 && skipped_docs > 0 {
2926 return Ok(IndexResult::Skipped {
2927 reason: format!("all {} documents already indexed", skipped_docs),
2928 content_hash: file_content_hash,
2929 });
2930 }
2931
2932 tracing::info!(
2933 "JSON-aware indexing: {} -> {} chunks ({} docs skipped)",
2934 path.display(),
2935 total_chunks,
2936 skipped_docs
2937 );
2938
2939 Ok(IndexResult::Indexed {
2940 chunks_indexed: total_chunks,
2941 content_hash: file_content_hash,
2942 embedder_ms: saw_metrics.then_some(total_embedder_ms),
2943 tokens_estimated: saw_metrics.then_some(total_tokens_estimated),
2944 })
2945 }
2946
2947 pub async fn index_document_with_preprocessing_and_dedup(
2949 &self,
2950 path: &Path,
2951 namespace: Option<&str>,
2952 preprocess_config: PreprocessingConfig,
2953 ) -> Result<IndexResult> {
2954 let text = self.extract_text(path).await?;
2955 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
2956
2957 let content_hash = compute_content_hash(&text);
2959
2960 if self.storage.has_content_hash(ns, &content_hash).await? {
2962 debug!(
2963 "Skipping duplicate content: {} (hash: {})",
2964 path.display(),
2965 &content_hash[..16]
2966 );
2967 return Ok(IndexResult::Skipped {
2968 reason: "exact duplicate".to_string(),
2969 content_hash,
2970 });
2971 }
2972
2973 let preprocessor = Preprocessor::new(preprocess_config);
2975 let cleaned = preprocessor.extract_semantic_content(&text);
2976 tracing::info!(
2977 "Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
2978 text.len(),
2979 cleaned.len(),
2980 (1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
2981 );
2982
2983 let base_metadata = json!({
2984 "path": path.to_str(),
2985 "slice_mode": "flat",
2986 "content_hash": &content_hash,
2987 });
2988
2989 let (chunks_indexed, embedder_ms, tokens_estimated) = self
2990 .index_with_flat_chunking_and_hash(&cleaned, ns, path, base_metadata, &content_hash)
2991 .await?;
2992
2993 Ok(IndexResult::Indexed {
2994 chunks_indexed,
2995 content_hash,
2996 embedder_ms: Some(embedder_ms),
2997 tokens_estimated: Some(tokens_estimated),
2998 })
2999 }
3000
3001 async fn index_document_internal(
3002 &self,
3003 path: &Path,
3004 namespace: Option<&str>,
3005 preprocess_config: Option<PreprocessingConfig>,
3006 slice_mode: SliceMode,
3007 ) -> Result<()> {
3008 self.storage.require_current_schema_for_writes().await?;
3009
3010 let validated_path = crate::path_utils::validate_read_path(path)?;
3012 let text = self.extract_text(&validated_path).await?;
3013
3014 let text = if let Some(config) = preprocess_config {
3016 let preprocessor = Preprocessor::new(config);
3017 let cleaned = preprocessor.extract_semantic_content(&text);
3018 tracing::info!(
3019 "Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
3020 text.len(),
3021 cleaned.len(),
3022 (1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
3023 );
3024 cleaned
3025 } else {
3026 text
3027 };
3028
3029 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
3030 let base_metadata = json!({
3031 "path": validated_path.to_str(),
3032 "slice_mode": match slice_mode {
3033 SliceMode::Onion => "onion",
3034 SliceMode::OnionFast => "onion-fast",
3035 SliceMode::Flat => "flat",
3036 }
3037 });
3038
3039 match slice_mode {
3040 SliceMode::Onion => {
3041 self.index_with_onion_slicing(&text, ns, base_metadata)
3042 .await
3043 }
3044 SliceMode::OnionFast => {
3045 self.index_with_onion_slicing_fast(&text, ns, base_metadata)
3046 .await
3047 }
3048 SliceMode::Flat => {
3049 self.index_with_flat_chunking(&text, ns, path, base_metadata)
3050 .await
3051 }
3052 }
3053 }
3054
3055 async fn index_with_onion_slicing(
3057 &self,
3058 text: &str,
3059 namespace: &str,
3060 base_metadata: serde_json::Value,
3061 ) -> Result<()> {
3062 let config = OnionSliceConfig::default();
3063 let slices = create_onion_slices(text, &base_metadata, &config);
3064 let total_slices = slices.len();
3065
3066 tracing::info!(
3067 "Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
3068 text.len(),
3069 total_slices
3070 );
3071
3072 let mut total_stored = 0;
3074 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
3075 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
3077 let embeddings = self.embed_chunks(&batch_contents).await?;
3078
3079 let mut batch_docs = Vec::with_capacity(batch.len());
3081 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
3082 let mut metadata = base_metadata.clone();
3083 if let serde_json::Value::Object(ref mut map) = metadata {
3084 map.insert("layer".to_string(), json!(slice.layer.name()));
3085 map.insert("keywords".to_string(), json!(slice.keywords));
3086 }
3087
3088 let doc = ChromaDocument::from_onion_slice(
3089 slice,
3090 namespace.to_string(),
3091 embedding.clone(),
3092 metadata,
3093 );
3094 batch_docs.push(doc);
3095 }
3096
3097 self.persist_documents(batch_docs).await?;
3099 total_stored += batch.len();
3100 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
3101 }
3102
3103 Ok(())
3104 }
3105
3106 async fn index_with_onion_slicing_fast(
3108 &self,
3109 text: &str,
3110 namespace: &str,
3111 base_metadata: serde_json::Value,
3112 ) -> Result<()> {
3113 let config = OnionSliceConfig::default();
3114 let slices = create_onion_slices_fast(text, &base_metadata, &config);
3115 let total_slices = slices.len();
3116
3117 tracing::info!(
3118 "Fast onion slicing: {} chars -> {} slices (outer/core only)",
3119 text.len(),
3120 total_slices
3121 );
3122
3123 let mut total_stored = 0;
3124 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
3125 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
3126 let embeddings = self.embed_chunks(&batch_contents).await?;
3127
3128 let mut batch_docs = Vec::with_capacity(batch.len());
3129 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
3130 let mut metadata = base_metadata.clone();
3131 if let serde_json::Value::Object(ref mut map) = metadata {
3132 map.insert("layer".to_string(), json!(slice.layer.name()));
3133 map.insert("keywords".to_string(), json!(slice.keywords));
3134 }
3135
3136 let doc = ChromaDocument::from_onion_slice(
3137 slice,
3138 namespace.to_string(),
3139 embedding.clone(),
3140 metadata,
3141 );
3142 batch_docs.push(doc);
3143 }
3144
3145 self.persist_documents(batch_docs).await?;
3146 total_stored += batch.len();
3147 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
3148 }
3149
3150 Ok(())
3151 }
3152
3153 async fn index_with_onion_slicing_and_hash(
3155 &self,
3156 text: &str,
3157 namespace: &str,
3158 base_metadata: serde_json::Value,
3159 content_hash: &str,
3160 ) -> Result<(usize, u64, usize)> {
3161 let config = OnionSliceConfig::default();
3162 let slices = create_onion_slices(text, &base_metadata, &config);
3163 let total_slices = slices.len();
3164 let mut tokens_estimated = 0;
3165 let token_config = crate::embeddings::TokenConfig::default();
3166
3167 tracing::info!(
3168 "Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
3169 text.len(),
3170 total_slices
3171 );
3172
3173 let mut total_stored = 0;
3175 let mut total_embedder_ms = 0;
3176 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
3177 for slice in batch {
3179 tokens_estimated +=
3180 crate::embeddings::estimate_tokens(&slice.content, &token_config);
3181 }
3182
3183 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
3185 let embed_started_at = std::time::Instant::now();
3186 let embeddings = self.embed_chunks(&batch_contents).await?;
3187 total_embedder_ms += embed_started_at.elapsed().as_millis() as u64;
3188
3189 let mut batch_docs = Vec::with_capacity(batch.len());
3191 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
3192 let mut metadata = base_metadata.clone();
3193 if let serde_json::Value::Object(ref mut map) = metadata {
3194 map.insert("layer".to_string(), json!(slice.layer.name()));
3195 map.insert("keywords".to_string(), json!(slice.keywords));
3196 }
3197
3198 let slice_hash = compute_content_hash(&slice.content);
3200 if let serde_json::Value::Object(ref mut map) = metadata {
3201 map.insert("file_hash".to_string(), json!(content_hash));
3202 map.insert("source_hash".to_string(), json!(content_hash));
3203 map.insert("chunk_hash".to_string(), json!(&slice_hash));
3204 }
3205 let doc = ChromaDocument::from_onion_slice_with_hashes(
3206 slice,
3207 namespace.to_string(),
3208 embedding.clone(),
3209 metadata,
3210 slice_hash,
3211 Some(content_hash.to_string()),
3212 );
3213 batch_docs.push(doc);
3214 }
3215
3216 self.persist_documents(batch_docs).await?;
3218 total_stored += batch.len();
3219 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
3220 }
3221
3222 Ok((total_slices, total_embedder_ms, tokens_estimated))
3223 }
3224
3225 async fn index_with_onion_slicing_fast_and_hash(
3228 &self,
3229 text: &str,
3230 namespace: &str,
3231 base_metadata: serde_json::Value,
3232 content_hash: &str,
3233 ) -> Result<(usize, u64, usize)> {
3234 let config = OnionSliceConfig::default();
3235 let slices = create_onion_slices_fast(text, &base_metadata, &config);
3236 let total_slices = slices.len();
3237 let mut tokens_estimated = 0;
3238 let token_config = crate::embeddings::TokenConfig::default();
3239
3240 tracing::info!(
3241 "Fast onion slicing: {} chars -> {} slices (outer/core only)",
3242 text.len(),
3243 total_slices
3244 );
3245
3246 let mut total_stored = 0;
3248 let mut total_embedder_ms = 0;
3249 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
3250 for slice in batch {
3252 tokens_estimated +=
3253 crate::embeddings::estimate_tokens(&slice.content, &token_config);
3254 }
3255
3256 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
3258 let embed_started_at = std::time::Instant::now();
3259 let embeddings = self.embed_chunks(&batch_contents).await?;
3260 total_embedder_ms += embed_started_at.elapsed().as_millis() as u64;
3261
3262 let mut batch_docs = Vec::with_capacity(batch.len());
3264 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
3265 let mut metadata = base_metadata.clone();
3266 if let serde_json::Value::Object(ref mut map) = metadata {
3267 map.insert("layer".to_string(), json!(slice.layer.name()));
3268 map.insert("keywords".to_string(), json!(slice.keywords));
3269 }
3270
3271 let slice_hash = compute_content_hash(&slice.content);
3273 if let serde_json::Value::Object(ref mut map) = metadata {
3274 map.insert("file_hash".to_string(), json!(content_hash));
3275 map.insert("source_hash".to_string(), json!(content_hash));
3276 map.insert("chunk_hash".to_string(), json!(&slice_hash));
3277 }
3278 let doc = ChromaDocument::from_onion_slice_with_hashes(
3279 slice,
3280 namespace.to_string(),
3281 embedding.clone(),
3282 metadata,
3283 slice_hash,
3284 Some(content_hash.to_string()),
3285 );
3286 batch_docs.push(doc);
3287 }
3288
3289 self.persist_documents(batch_docs).await?;
3290 total_stored += batch.len();
3291 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
3292 }
3293
3294 Ok((total_slices, total_embedder_ms, tokens_estimated))
3295 }
3296
3297 async fn index_with_flat_chunking(
3299 &self,
3300 text: &str,
3301 namespace: &str,
3302 path: &Path,
3303 base_metadata: serde_json::Value,
3304 ) -> Result<()> {
3305 let chunks = self.chunk_text(text, 512, 128)?;
3307 let total_chunks = chunks.len();
3308
3309 tracing::info!(
3310 "Flat chunking: {} chars -> {} chunks",
3311 text.len(),
3312 total_chunks
3313 );
3314
3315 let mut total_stored = 0;
3317 let mut global_idx = 0;
3318 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
3319 let embeddings = self.embed_chunks(batch).await?;
3321
3322 let mut batch_docs = Vec::with_capacity(batch.len());
3324 for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
3325 let mut metadata = base_metadata.clone();
3326 if let serde_json::Value::Object(ref mut map) = metadata {
3327 map.insert("chunk_index".to_string(), json!(global_idx));
3328 map.insert("total_chunks".to_string(), json!(total_chunks));
3329 }
3330
3331 let doc = ChromaDocument::new_flat(
3332 format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
3333 namespace.to_string(),
3334 embedding.clone(),
3335 metadata,
3336 chunk.clone(),
3337 );
3338 batch_docs.push(doc);
3339 global_idx += 1;
3340 }
3341
3342 self.persist_documents(batch_docs).await?;
3344 total_stored += batch.len();
3345 tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
3346 }
3347
3348 Ok(())
3349 }
3350
3351 async fn index_with_flat_chunking_and_hash(
3353 &self,
3354 text: &str,
3355 namespace: &str,
3356 path: &Path,
3357 base_metadata: serde_json::Value,
3358 content_hash: &str,
3359 ) -> Result<(usize, u64, usize)> {
3360 let chunks = self.chunk_text(text, 512, 128)?;
3362 let total_chunks = chunks.len();
3363 let mut tokens_estimated = 0;
3364 let token_config = crate::embeddings::TokenConfig::default();
3365
3366 tracing::info!(
3367 "Flat chunking: {} chars -> {} chunks",
3368 text.len(),
3369 total_chunks
3370 );
3371
3372 let mut total_stored = 0;
3374 let mut global_idx = 0;
3375 let mut total_embedder_ms = 0;
3376 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
3377 tokens_estimated += batch
3379 .iter()
3380 .map(|chunk| crate::embeddings::estimate_tokens(chunk, &token_config))
3381 .sum::<usize>();
3382 let embed_started_at = std::time::Instant::now();
3383 let embeddings = self.embed_chunks(batch).await?;
3384 total_embedder_ms += embed_started_at.elapsed().as_millis() as u64;
3385
3386 let mut batch_docs = Vec::with_capacity(batch.len());
3388 for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
3389 let mut metadata = base_metadata.clone();
3390 if let serde_json::Value::Object(ref mut map) = metadata {
3391 map.insert("chunk_index".to_string(), json!(global_idx));
3392 map.insert("total_chunks".to_string(), json!(total_chunks));
3393 }
3394
3395 let chunk_hash = compute_content_hash(chunk);
3397 if let serde_json::Value::Object(ref mut map) = metadata {
3398 map.insert("file_hash".to_string(), json!(content_hash));
3399 map.insert("source_hash".to_string(), json!(content_hash));
3400 map.insert("chunk_hash".to_string(), json!(&chunk_hash));
3401 }
3402 let doc = ChromaDocument::new_flat_with_hashes(
3403 format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
3404 namespace.to_string(),
3405 embedding.clone(),
3406 metadata,
3407 chunk.clone(),
3408 chunk_hash,
3409 Some(content_hash.to_string()),
3410 );
3411 batch_docs.push(doc);
3412 global_idx += 1;
3413 }
3414
3415 self.persist_documents(batch_docs).await?;
3417 total_stored += batch.len();
3418 tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
3419 }
3420
3421 Ok((total_chunks, total_embedder_ms, tokens_estimated))
3422 }
3423
3424 async fn embed_and_store_provider_chunks(
3425 &self,
3426 chunks: Vec<Chunk>,
3427 ) -> Result<(usize, u64, usize)> {
3428 let total_chunks = chunks.len();
3429 let mut tokens_estimated = 0;
3430 let mut total_embedder_ms = 0;
3431 let token_config = crate::embeddings::TokenConfig::default();
3432
3433 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
3434 tokens_estimated += batch
3435 .iter()
3436 .map(|chunk| crate::embeddings::estimate_tokens(&chunk.content, &token_config))
3437 .sum::<usize>();
3438
3439 let batch_contents: Vec<String> =
3440 batch.iter().map(|chunk| chunk.content.clone()).collect();
3441 let embed_started_at = std::time::Instant::now();
3442 let embeddings = self.embed_chunks(&batch_contents).await?;
3443 total_embedder_ms += embed_started_at.elapsed().as_millis() as u64;
3444
3445 let documents: Vec<ChromaDocument> = batch
3446 .iter()
3447 .cloned()
3448 .zip(embeddings.into_iter())
3449 .map(|(chunk, embedding)| {
3450 let source_hash = Some(chunk.source_hash);
3451 if chunk.layer > 0 {
3452 ChromaDocument {
3453 id: chunk.id,
3454 namespace: chunk.namespace,
3455 embedding,
3456 metadata: chunk.metadata,
3457 document: chunk.content,
3458 layer: chunk.layer,
3459 parent_id: chunk.parent_id,
3460 children_ids: chunk.children_ids,
3461 keywords: chunk.keywords,
3462 content_hash: Some(chunk.chunk_hash),
3463 source_hash,
3464 }
3465 } else {
3466 ChromaDocument::new_flat_with_hashes(
3467 chunk.id,
3468 chunk.namespace,
3469 embedding,
3470 chunk.metadata,
3471 chunk.content,
3472 chunk.chunk_hash,
3473 source_hash,
3474 )
3475 }
3476 })
3477 .collect();
3478
3479 self.persist_documents(documents).await?;
3480 }
3481
3482 Ok((total_chunks, total_embedder_ms, tokens_estimated))
3483 }
3484
3485 async fn index_flat_memory_family_with_hash(
3486 &self,
3487 text: &str,
3488 namespace: &str,
3489 original_id: &str,
3490 base_metadata: serde_json::Value,
3491 content_hash: &str,
3492 ) -> Result<usize> {
3493 let chunks = self.chunk_text(text, 512, 128)?;
3494 let total_chunks = chunks.len();
3495
3496 tracing::info!(
3497 "Flat memory chunking: {} chars -> {} chunks",
3498 text.len(),
3499 total_chunks
3500 );
3501
3502 let mut total_stored = 0;
3503 let mut global_idx = 0;
3504 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
3505 let embeddings = self.embed_chunks(batch).await?;
3506
3507 let mut batch_docs = Vec::with_capacity(batch.len());
3508 for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
3509 let mut metadata = base_metadata.clone();
3510 let chunk_hash = compute_content_hash(chunk);
3511 if let serde_json::Value::Object(ref mut map) = metadata {
3512 map.insert("chunk_index".to_string(), json!(global_idx));
3513 map.insert("total_chunks".to_string(), json!(total_chunks));
3514 map.insert("file_hash".to_string(), json!(content_hash));
3515 map.insert("source_hash".to_string(), json!(content_hash));
3516 map.insert("chunk_hash".to_string(), json!(&chunk_hash));
3517 map.insert("original_id".to_string(), json!(original_id));
3518 }
3519
3520 let doc_id = if total_chunks == 1 {
3521 original_id.to_string()
3522 } else {
3523 format!("{original_id}::chunk::{global_idx}")
3524 };
3525
3526 let doc = ChromaDocument::new_flat_with_hashes(
3527 doc_id,
3528 namespace.to_string(),
3529 embedding.clone(),
3530 metadata,
3531 chunk.clone(),
3532 chunk_hash,
3533 Some(content_hash.to_string()),
3534 );
3535 batch_docs.push(doc);
3536 global_idx += 1;
3537 }
3538
3539 self.persist_documents(batch_docs).await?;
3540 total_stored += batch.len();
3541 tracing::info!(
3542 "Stored {}/{} flat memory chunks for {}",
3543 total_stored,
3544 total_chunks,
3545 original_id
3546 );
3547 }
3548
3549 Ok(total_chunks)
3550 }
3551
3552 pub async fn index_text(
3553 &self,
3554 namespace: Option<&str>,
3555 id: String,
3556 text: String,
3557 metadata: serde_json::Value,
3558 ) -> Result<String> {
3559 self.index_text_with_mode(namespace, id, text, metadata, SliceMode::default())
3560 .await
3561 }
3562
3563 pub async fn index_text_with_mode(
3565 &self,
3566 namespace: Option<&str>,
3567 id: String,
3568 text: String,
3569 metadata: serde_json::Value,
3570 slice_mode: SliceMode,
3571 ) -> Result<String> {
3572 self.storage.require_current_schema_for_writes().await?;
3573
3574 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE).to_string();
3575 let slice_mode_name = match slice_mode {
3576 SliceMode::Onion => "onion",
3577 SliceMode::OnionFast => "onion-fast",
3578 SliceMode::Flat => "flat",
3579 };
3580
3581 match slice_mode {
3582 SliceMode::Onion | SliceMode::OnionFast => {
3583 let config = OnionSliceConfig::default();
3585 let slices = if slice_mode == SliceMode::OnionFast {
3586 create_onion_slices_fast(&text, &metadata, &config)
3587 } else {
3588 create_onion_slices(&text, &metadata, &config)
3589 };
3590
3591 let slice_contents: Vec<String> =
3592 slices.iter().map(|s| s.content.clone()).collect();
3593 let embeddings = self.embed_chunks(&slice_contents).await?;
3594
3595 let mut documents = Vec::with_capacity(slices.len());
3596 for (slice, embedding) in slices.iter().zip(embeddings.iter()) {
3597 let mut meta = metadata.clone();
3598 if let serde_json::Value::Object(ref mut map) = meta {
3599 map.insert("layer".to_string(), json!(slice.layer.name()));
3600 map.insert("original_id".to_string(), json!(id));
3601 map.insert("slice_mode".to_string(), json!(slice_mode_name));
3602 }
3603
3604 let doc = ChromaDocument::from_onion_slice(
3605 slice,
3606 ns.clone(),
3607 embedding.clone(),
3608 meta,
3609 );
3610 documents.push(doc);
3611 }
3612
3613 self.persist_documents(documents).await?;
3614
3615 Ok(slices
3617 .iter()
3618 .find(|s| s.layer == SliceLayer::Outer)
3619 .map(|s| s.id.clone())
3620 .unwrap_or(id))
3621 }
3622 SliceMode::Flat => {
3623 let embedding = self.embed_query(&text).await?;
3624 let mut metadata = metadata;
3625 if let serde_json::Value::Object(ref mut map) = metadata {
3626 map.insert("slice_mode".to_string(), json!(slice_mode_name));
3627 }
3628 let doc = ChromaDocument::new_flat(id.clone(), ns, embedding, metadata, text);
3629 self.persist_documents(vec![doc]).await?;
3630 Ok(id)
3631 }
3632 }
3633 }
3634
3635 async fn index_text_memory_family_with_hash(
3636 &self,
3637 namespace: &str,
3638 id: &str,
3639 text: &str,
3640 metadata: serde_json::Value,
3641 slice_mode: SliceMode,
3642 chunker: Option<ChunkerKind>,
3643 ) -> Result<()> {
3644 let slice_mode_name = match slice_mode {
3645 SliceMode::Onion => "onion",
3646 SliceMode::OnionFast => "onion-fast",
3647 SliceMode::Flat => "flat",
3648 };
3649 let content_hash = compute_content_hash(text);
3650 let mut metadata = metadata;
3651
3652 if let serde_json::Value::Object(ref mut map) = metadata {
3653 map.insert("slice_mode".to_string(), json!(slice_mode_name));
3654 if let Some(chunker) = chunker {
3655 map.insert("chunker".to_string(), json!(chunker.name()));
3656 }
3657 if matches!(slice_mode, SliceMode::Onion | SliceMode::OnionFast) {
3658 map.insert("original_id".to_string(), json!(id));
3659 }
3660 }
3661
3662 if chunker == Some(ChunkerKind::Aicx) {
3663 let path = metadata
3664 .get("path")
3665 .and_then(serde_json::Value::as_str)
3666 .or_else(|| {
3667 metadata
3668 .get("reprocess_source")
3669 .and_then(serde_json::Value::as_str)
3670 })
3671 .unwrap_or(id);
3672 let file_content = FileContent {
3673 path: metadata_path_label(path),
3674 text: text.to_string(),
3675 namespace: namespace.to_string(),
3676 content_hash: content_hash.clone(),
3677 };
3678 let opts = ChunkOpts::new(
3679 ChunkerKind::Aicx,
3680 SliceMode::Flat,
3681 OuterSynthesis::default(),
3682 );
3683 let provider = ChunkerKind::Aicx.into_provider();
3684 let chunks = provider.chunk(&file_content, &opts).await?;
3685 self.embed_and_store_provider_chunks(chunks).await?;
3686 return Ok(());
3687 }
3688
3689 match slice_mode {
3690 SliceMode::Onion => {
3691 self.index_with_onion_slicing_and_hash(text, namespace, metadata, &content_hash)
3692 .await?;
3693 }
3694 SliceMode::OnionFast => {
3695 self.index_with_onion_slicing_fast_and_hash(
3696 text,
3697 namespace,
3698 metadata,
3699 &content_hash,
3700 )
3701 .await?;
3702 }
3703 SliceMode::Flat => {
3704 self.index_flat_memory_family_with_hash(
3705 text,
3706 namespace,
3707 id,
3708 metadata,
3709 &content_hash,
3710 )
3711 .await?;
3712 }
3713 }
3714
3715 Ok(())
3716 }
3717
3718 pub async fn memory_upsert(
3719 &self,
3720 namespace: &str,
3721 id: String,
3722 text: String,
3723 metadata: serde_json::Value,
3724 ) -> Result<()> {
3725 self.storage.require_current_schema_for_writes().await?;
3726
3727 let slice_mode = match metadata
3728 .get("slice_mode")
3729 .and_then(|value| value.as_str())
3730 .map(|value| value.to_ascii_lowercase())
3731 .as_deref()
3732 {
3733 Some("onion") => SliceMode::Onion,
3734 Some("onion-fast") | Some("onion_fast") | Some("fast") => SliceMode::OnionFast,
3735 Some("flat") | None => SliceMode::Flat,
3736 Some(other) => {
3737 return Err(anyhow!(
3738 "Unsupported metadata.slice_mode '{}'. Use 'flat', 'onion', or 'onion-fast'.",
3739 other
3740 ));
3741 }
3742 };
3743 let chunker = metadata
3744 .get("chunker")
3745 .and_then(|value| value.as_str())
3746 .map(str::parse::<ChunkerKind>)
3747 .transpose()
3748 .map_err(anyhow::Error::msg)?;
3749
3750 self.delete_memory_family(namespace, &id).await?;
3751 self.index_text_memory_family_with_hash(
3752 namespace, &id, &text, metadata, slice_mode, chunker,
3753 )
3754 .await?;
3755 Ok(())
3756 }
3757
3758 pub async fn lookup_memory(&self, namespace: &str, id: &str) -> Result<Option<SearchResult>> {
3759 if let Some(doc) = self.storage.get_document(namespace, id).await? {
3760 let layer = doc.slice_layer();
3761 return Ok(Some(SearchResult {
3762 id: doc.id,
3763 namespace: doc.namespace,
3764 text: doc.document,
3765 score: 1.0,
3766 metadata: doc.metadata,
3767 layer,
3768 parent_id: doc.parent_id,
3769 children_ids: doc.children_ids,
3770 keywords: doc.keywords,
3771 }));
3772 }
3773
3774 if let Some(doc) = Self::preferred_memory_family_document(
3775 self.load_memory_family(namespace, id).await?,
3776 id,
3777 ) {
3778 let layer = doc.slice_layer();
3779 return Ok(Some(SearchResult {
3780 id: doc.id,
3781 namespace: doc.namespace,
3782 text: doc.document,
3783 score: 1.0,
3784 metadata: doc.metadata,
3785 layer,
3786 parent_id: doc.parent_id,
3787 children_ids: doc.children_ids,
3788 keywords: doc.keywords,
3789 }));
3790 }
3791
3792 Ok(None)
3793 }
3794
3795 pub async fn remove_memory(&self, namespace: &str, id: &str) -> Result<usize> {
3796 self.delete_memory_family(namespace, id).await
3797 }
3798
3799 pub async fn clear_namespace(&self, namespace: &str) -> Result<usize> {
3800 self.clear_namespace_from_indices(namespace).await
3801 }
3802
3803 pub async fn search_memory(
3804 &self,
3805 namespace: &str,
3806 query: &str,
3807 k: usize,
3808 ) -> Result<Vec<SearchResult>> {
3809 self.search_with_options(Some(namespace), query, k, SearchOptions::default())
3810 .await
3811 }
3812
3813 pub async fn memory_search_with_layer(
3815 &self,
3816 namespace: &str,
3817 query: &str,
3818 k: usize,
3819 layer: Option<SliceLayer>,
3820 ) -> Result<Vec<SearchResult>> {
3821 self.search_with_options(
3822 Some(namespace),
3823 query,
3824 k,
3825 SearchOptions {
3826 layer_filter: layer,
3827 project_filter: None,
3828 },
3829 )
3830 .await
3831 }
3832
3833 pub async fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
3834 self.search_inner(None, query, k).await
3835 }
3836
3837 pub async fn search_inner(
3839 &self,
3840 namespace: Option<&str>,
3841 query: &str,
3842 k: usize,
3843 ) -> Result<Vec<SearchResult>> {
3844 self.search_with_options(namespace, query, k, SearchOptions::default())
3845 .await
3846 }
3847
3848 pub async fn search_with_options(
3850 &self,
3851 namespace: Option<&str>,
3852 query: &str,
3853 k: usize,
3854 options: SearchOptions,
3855 ) -> Result<Vec<SearchResult>> {
3856 let query_embedding = self.embed_query(query).await?;
3857 let candidate_multiplier = if options.project_filter.is_some() {
3858 8
3859 } else {
3860 3
3861 };
3862
3863 let mut candidates = self
3864 .storage
3865 .search_store_with_layer(
3866 namespace,
3867 query_embedding.clone(),
3868 k * candidate_multiplier,
3869 options.layer_filter,
3870 )
3871 .await?;
3872
3873 if let Some(project) = options.project_filter.as_deref() {
3874 candidates.retain(|candidate| metadata_matches_project(&candidate.metadata, project));
3875 }
3876
3877 if !candidates.is_empty() {
3879 let documents: Vec<String> = candidates.iter().map(|c| c.document.clone()).collect();
3880 let metadatas: Vec<serde_json::Value> =
3881 candidates.iter().map(|c| c.metadata.clone()).collect();
3882
3883 let reranked = match self.mlx_bridge.lock().await.rerank(query, &documents).await {
3885 Ok(r) => Some(r),
3886 Err(e) => {
3887 tracing::warn!("MLX rerank failed, using cosine fallback: {}", e);
3888 None
3889 }
3890 };
3891
3892 let reranked = if let Some(r) = reranked {
3893 r
3894 } else {
3895 let doc_embeddings = self.ensure_doc_embeddings(&documents, &candidates).await?;
3897 let scores = doc_embeddings
3898 .iter()
3899 .enumerate()
3900 .map(|(idx, emb)| (idx, cosine(&query_embedding, emb)))
3901 .collect::<Vec<_>>();
3902 let mut scores = scores;
3903 scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3904 scores
3905 };
3906
3907 let results: Vec<SearchResult> = reranked
3909 .into_iter()
3910 .take(k)
3911 .filter_map(|(idx, score)| {
3912 candidates.get(idx).map(|candidate| {
3913 SearchResult {
3914 id: candidate.id.clone(),
3915 namespace: candidate.namespace.clone(),
3916 text: candidate.document.clone(),
3917 score,
3918 metadata: metadatas.get(idx).cloned().unwrap_or_else(|| json!({})),
3919 layer: candidate.slice_layer(),
3921 parent_id: candidate.parent_id.clone(),
3922 children_ids: candidate.children_ids.clone(),
3923 keywords: candidate.keywords.clone(),
3924 }
3925 })
3926 })
3927 .collect();
3928
3929 return Ok(results);
3930 }
3931
3932 Ok(vec![])
3933 }
3934
3935 pub async fn expand_result(&self, namespace: &str, id: &str) -> Result<Vec<SearchResult>> {
3937 let children = self.storage.get_children(namespace, id).await?;
3938 Ok(children
3939 .into_iter()
3940 .map(|doc| {
3941 let layer = doc.slice_layer();
3942 SearchResult {
3943 id: doc.id,
3944 namespace: doc.namespace,
3945 text: doc.document,
3946 score: 1.0,
3947 metadata: doc.metadata,
3948 layer,
3949 parent_id: doc.parent_id,
3950 children_ids: doc.children_ids,
3951 keywords: doc.keywords,
3952 }
3953 })
3954 .collect())
3955 }
3956
3957 pub async fn get_parent_result(
3959 &self,
3960 namespace: &str,
3961 id: &str,
3962 ) -> Result<Option<SearchResult>> {
3963 if let Some(parent) = self.storage.get_parent(namespace, id).await? {
3964 let layer = parent.slice_layer();
3965 return Ok(Some(SearchResult {
3966 id: parent.id,
3967 namespace: parent.namespace,
3968 text: parent.document,
3969 score: 1.0,
3970 metadata: parent.metadata,
3971 layer,
3972 parent_id: parent.parent_id,
3973 children_ids: parent.children_ids,
3974 keywords: parent.keywords,
3975 }));
3976 }
3977 Ok(None)
3978 }
3979
3980 async fn extract_text(&self, path: &Path) -> Result<String> {
3981 let ext = path
3982 .extension()
3983 .and_then(|e| e.to_str())
3984 .unwrap_or("")
3985 .to_lowercase();
3986
3987 if ext == "pdf" {
3988 let path = path.to_path_buf();
3990 let pdf_text =
3991 tokio::task::spawn_blocking(move || pdf_extract::extract_text(&path)).await??;
3992 return Ok(pdf_text);
3993 }
3994
3995 let (_p, content) = crate::path_utils::safe_read_to_string_async(path).await?;
3997 Ok(content)
3998 }
3999
4000 async fn extract_json_documents(
4008 &self,
4009 path: &Path,
4010 ) -> Result<Vec<(String, String, serde_json::Value)>> {
4011 let ext = path
4012 .extension()
4013 .and_then(|e| e.to_str())
4014 .unwrap_or("")
4015 .to_lowercase();
4016
4017 if matches!(ext.as_str(), "md" | "markdown") {
4018 let (_p, raw) = crate::path_utils::safe_read_to_string_async(path).await?;
4019 if let Some(docs) = extract_markdown_transcript_documents(&raw, path) {
4020 tracing::info!(
4021 "Markdown transcript detected: {} -> {} turn documents",
4022 path.display(),
4023 docs.len()
4024 );
4025 return Ok(docs);
4026 }
4027
4028 let doc_id = format!("{}:0", path.display());
4029 let metadata = json!({ "path": path.to_str(), "index": 0 });
4030 return Ok(vec![(doc_id, raw, metadata)]);
4031 }
4032
4033 if ext != "json" {
4035 let text = self.extract_text(path).await?;
4036 let doc_id = format!("{}:0", path.display());
4037 let metadata = json!({ "path": path.to_str(), "index": 0 });
4038 return Ok(vec![(doc_id, text, metadata)]);
4039 }
4040
4041 let (_p, raw) = crate::path_utils::safe_read_to_string_async(path).await?;
4043 let parsed: serde_json::Value = match serde_json::from_str(&raw) {
4044 Ok(v) => v,
4045 Err(_) => {
4046 let doc_id = format!("{}:0", path.display());
4048 let metadata = json!({ "path": path.to_str(), "index": 0 });
4049 return Ok(vec![(doc_id, raw, metadata)]);
4050 }
4051 };
4052
4053 if let serde_json::Value::Array(arr) = parsed {
4055 let mut docs = Vec::new();
4056 let mut used_smart_extraction = false;
4057
4058 for item in arr.iter() {
4060 if let Some(mut conv_docs) = extract_conversation_documents(item, path) {
4061 docs.append(&mut conv_docs);
4062 used_smart_extraction = true;
4063 }
4064 }
4065
4066 if used_smart_extraction && !docs.is_empty() {
4068 tracing::info!(
4069 "Conversation array detected: {} -> {} turn documents",
4070 path.display(),
4071 docs.len()
4072 );
4073 return Ok(docs);
4074 }
4075
4076 docs.clear();
4078 for (idx, item) in arr.iter().enumerate() {
4079 let doc_id = format!("{}:{}", path.display(), idx);
4080 let content = extract_json_element_content(item);
4081 if content.len() > 50 {
4082 let metadata = json!({
4084 "path": path.to_str(),
4085 "index": idx,
4086 "total_elements": arr.len(),
4087 "element_type": detect_json_element_type(item),
4088 });
4089 docs.push((doc_id, content, metadata));
4090 }
4091 }
4092 if docs.is_empty() {
4093 let doc_id = format!("{}:0", path.display());
4095 let metadata = json!({ "path": path.to_str(), "index": 0 });
4096 return Ok(vec![(doc_id, raw, metadata)]);
4097 }
4098 tracing::info!(
4099 "JSON array detected: {} -> {} documents",
4100 path.display(),
4101 docs.len()
4102 );
4103 return Ok(docs);
4104 }
4105
4106 if let Some(docs) = extract_conversation_documents(&parsed, path) {
4108 return Ok(docs);
4109 }
4110
4111 let content = extract_json_element_content(&parsed);
4113 let doc_id = format!("{}:0", path.display());
4114 let metadata = json!({ "path": path.to_str(), "index": 0 });
4115 Ok(vec![(doc_id, content, metadata)])
4116 }
4117
4118 async fn embed_chunks(&self, chunks: &[String]) -> Result<Vec<Vec<f32>>> {
4119 self.mlx_bridge.lock().await.embed_batch(chunks).await
4121 }
4122
4123 async fn embed_query(&self, query: &str) -> Result<Vec<f32>> {
4124 self.mlx_bridge.lock().await.embed(query).await
4125 }
4126
4127 async fn ensure_doc_embeddings(
4128 &self,
4129 documents: &[String],
4130 candidates: &[ChromaDocument],
4131 ) -> Result<Vec<Vec<f32>>> {
4132 let has_all = candidates.iter().all(|c| !c.embedding.is_empty());
4134 if has_all {
4135 return Ok(candidates.iter().map(|c| c.embedding.clone()).collect());
4136 }
4137
4138 self.mlx_bridge.lock().await.embed_batch(documents).await
4139 }
4140
4141 fn chunk_text(&self, text: &str, target_size: usize, overlap: usize) -> Result<Vec<String>> {
4148 let sentences = split_into_sentences(text);
4149
4150 if sentences.is_empty() {
4151 return Ok(vec![text.to_string()]);
4152 }
4153
4154 if text.chars().count() <= target_size {
4156 return Ok(vec![text.to_string()]);
4157 }
4158
4159 let mut chunks = Vec::new();
4160 let mut current_chunk = String::new();
4161 let mut overlap_sentences: Vec<String> = Vec::new();
4162
4163 let overlap_sentence_count = (overlap / 50).clamp(1, 3);
4165
4166 for sentence in &sentences {
4167 let sentence_len = sentence.chars().count();
4168 let current_len = current_chunk.chars().count();
4169
4170 let max_size = target_size + target_size / 2;
4172 if current_len + sentence_len > max_size && !current_chunk.is_empty() {
4173 chunks.push(current_chunk.trim().to_string());
4174
4175 current_chunk = overlap_sentences.join(" ");
4177 if !current_chunk.is_empty() {
4178 current_chunk.push(' ');
4179 }
4180 overlap_sentences.clear();
4181 }
4182
4183 current_chunk.push_str(sentence);
4184 current_chunk.push(' ');
4185
4186 overlap_sentences.push(sentence.clone());
4188 if overlap_sentences.len() > overlap_sentence_count {
4189 overlap_sentences.remove(0);
4190 }
4191
4192 if current_chunk.chars().count() >= target_size {
4194 chunks.push(current_chunk.trim().to_string());
4195
4196 current_chunk = overlap_sentences.join(" ");
4198 if !current_chunk.is_empty() {
4199 current_chunk.push(' ');
4200 }
4201 overlap_sentences.clear();
4202 }
4203 }
4204
4205 let remaining = current_chunk.trim();
4207 if !remaining.is_empty() {
4208 if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
4210 let last_idx = chunks.len() - 1;
4211 chunks[last_idx].push(' ');
4212 chunks[last_idx].push_str(remaining);
4213 } else {
4214 chunks.push(remaining.to_string());
4215 }
4216 }
4217
4218 if chunks.is_empty() {
4220 chunks.push(text.to_string());
4221 }
4222
4223 Ok(chunks)
4224 }
4225}
4226
4227#[derive(Debug, Clone)]
4240pub struct ContextPrefixConfig {
4241 pub include_source: bool,
4243 pub include_section: bool,
4245 pub include_doc_type: bool,
4247 pub max_prefix_length: usize,
4249}
4250
4251impl Default for ContextPrefixConfig {
4252 fn default() -> Self {
4253 Self {
4254 include_source: true,
4255 include_section: true,
4256 include_doc_type: true,
4257 max_prefix_length: 100,
4258 }
4259 }
4260}
4261
4262#[derive(Debug, Clone)]
4264pub struct EnrichedChunk {
4265 pub content: String,
4267 pub original_content: String,
4269 pub doc_path: String,
4271 pub chunk_index: usize,
4273 pub section: Option<String>,
4275 pub doc_type: Option<String>,
4277}
4278
4279pub fn create_enriched_chunks(
4291 content: &str,
4292 doc_path: &str,
4293 chunk_size: usize,
4294 overlap: usize,
4295 config: &ContextPrefixConfig,
4296) -> Vec<EnrichedChunk> {
4297 let doc_type = detect_doc_type(doc_path);
4299
4300 let filename = std::path::Path::new(doc_path)
4302 .file_name()
4303 .and_then(|n| n.to_str())
4304 .unwrap_or("unknown");
4305
4306 let sections = extract_sections(content);
4308
4309 let mut enriched_chunks = Vec::new();
4310 let mut global_chunk_index = 0;
4311
4312 for (section_header, section_content) in sections {
4313 let chunks = smart_chunk_text(section_content, chunk_size, overlap);
4315
4316 for chunk in chunks {
4317 let prefix = build_context_prefix(
4319 filename,
4320 section_header.as_deref(),
4321 doc_type.as_deref(),
4322 config,
4323 );
4324
4325 let full_content = if prefix.is_empty() {
4327 chunk.clone()
4328 } else {
4329 format!("{}\n\n{}", prefix, chunk)
4330 };
4331
4332 enriched_chunks.push(EnrichedChunk {
4333 content: full_content,
4334 original_content: chunk,
4335 doc_path: doc_path.to_string(),
4336 chunk_index: global_chunk_index,
4337 section: section_header.clone(),
4338 doc_type: doc_type.clone(),
4339 });
4340
4341 global_chunk_index += 1;
4342 }
4343 }
4344
4345 if enriched_chunks.is_empty() && !content.trim().is_empty() {
4347 let prefix = build_context_prefix(filename, None, doc_type.as_deref(), config);
4348 let full_content = if prefix.is_empty() {
4349 content.to_string()
4350 } else {
4351 format!("{}\n\n{}", prefix, content)
4352 };
4353
4354 enriched_chunks.push(EnrichedChunk {
4355 content: full_content,
4356 original_content: content.to_string(),
4357 doc_path: doc_path.to_string(),
4358 chunk_index: 0,
4359 section: None,
4360 doc_type,
4361 });
4362 }
4363
4364 enriched_chunks
4365}
4366
4367fn build_context_prefix(
4369 filename: &str,
4370 section: Option<&str>,
4371 doc_type: Option<&str>,
4372 config: &ContextPrefixConfig,
4373) -> String {
4374 let mut parts = Vec::new();
4375
4376 if config.include_source && !filename.is_empty() {
4377 parts.push(format!("[Source: {}]", filename));
4378 }
4379
4380 if config.include_section
4381 && let Some(sec) = section
4382 {
4383 parts.push(format!("[Section: {}]", sec));
4384 }
4385
4386 if config.include_doc_type
4387 && let Some(dt) = doc_type
4388 {
4389 parts.push(format!("[Type: {}]", dt));
4390 }
4391
4392 let prefix = parts.join(" ");
4393
4394 if prefix.len() > config.max_prefix_length {
4396 prefix.chars().take(config.max_prefix_length).collect()
4397 } else {
4398 prefix
4399 }
4400}
4401
4402fn detect_doc_type(path: &str) -> Option<String> {
4404 let ext = std::path::Path::new(path)
4405 .extension()
4406 .and_then(|e| e.to_str())
4407 .map(|s| s.to_lowercase())?;
4408
4409 let doc_type = match ext.as_str() {
4410 "rs" => "Rust source code",
4411 "py" => "Python source code",
4412 "js" | "jsx" => "JavaScript source code",
4413 "ts" | "tsx" => "TypeScript source code",
4414 "md" => "Markdown documentation",
4415 "txt" => "Plain text",
4416 "json" => "JSON data",
4417 "yaml" | "yml" => "YAML configuration",
4418 "toml" => "TOML configuration",
4419 "html" => "HTML document",
4420 "css" => "CSS stylesheet",
4421 "sql" => "SQL query",
4422 "sh" | "bash" => "Shell script",
4423 "pdf" => "PDF document",
4424 _ => return None,
4425 };
4426
4427 Some(doc_type.to_string())
4428}
4429
4430fn extract_sections(content: &str) -> Vec<(Option<String>, &str)> {
4432 let header_pattern = regex::Regex::new(r"(?m)^(#{1,6})\s+(.+)$").ok();
4434
4435 if let Some(re) = header_pattern {
4436 let mut sections = Vec::new();
4437 let mut last_end = 0;
4438 let mut current_header: Option<String> = None;
4439
4440 for caps in re.captures_iter(content) {
4441 let Some(full_match) = caps.get(0) else {
4442 continue;
4443 };
4444 let Some(header_match) = caps.get(2) else {
4445 continue;
4446 };
4447 let match_start = full_match.start();
4448
4449 if match_start > last_end {
4451 let section_content = &content[last_end..match_start];
4452 if !section_content.trim().is_empty() {
4453 sections.push((current_header.clone(), section_content.trim()));
4454 }
4455 }
4456
4457 current_header = Some(header_match.as_str().to_string());
4458 last_end = full_match.end();
4459 }
4460
4461 if last_end < content.len() {
4463 let section_content = &content[last_end..];
4464 if !section_content.trim().is_empty() {
4465 sections.push((current_header, section_content.trim()));
4466 }
4467 }
4468
4469 if sections.is_empty() {
4470 vec![(None, content)]
4471 } else {
4472 sections
4473 }
4474 } else {
4475 vec![(None, content)]
4476 }
4477}
4478
4479fn smart_chunk_text(text: &str, target_size: usize, overlap: usize) -> Vec<String> {
4481 let sentences = split_into_sentences(text);
4482
4483 if sentences.is_empty() || text.chars().count() <= target_size {
4484 return vec![text.to_string()];
4485 }
4486
4487 let mut chunks = Vec::new();
4488 let mut current_chunk = String::new();
4489 let mut overlap_sentences: Vec<String> = Vec::new();
4490 let overlap_sentence_count = (overlap / 50).clamp(1, 3);
4491
4492 for sentence in &sentences {
4493 let sentence_len = sentence.chars().count();
4494 let current_len = current_chunk.chars().count();
4495 let max_size = target_size + target_size / 2;
4496
4497 if current_len + sentence_len > max_size && !current_chunk.is_empty() {
4498 chunks.push(current_chunk.trim().to_string());
4499 current_chunk = overlap_sentences.join(" ");
4500 if !current_chunk.is_empty() {
4501 current_chunk.push(' ');
4502 }
4503 overlap_sentences.clear();
4504 }
4505
4506 current_chunk.push_str(sentence);
4507 current_chunk.push(' ');
4508
4509 overlap_sentences.push(sentence.clone());
4510 if overlap_sentences.len() > overlap_sentence_count {
4511 overlap_sentences.remove(0);
4512 }
4513
4514 if current_chunk.chars().count() >= target_size {
4515 chunks.push(current_chunk.trim().to_string());
4516 current_chunk = overlap_sentences.join(" ");
4517 if !current_chunk.is_empty() {
4518 current_chunk.push(' ');
4519 }
4520 overlap_sentences.clear();
4521 }
4522 }
4523
4524 let remaining = current_chunk.trim();
4525 if !remaining.is_empty() {
4526 if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
4527 let last_idx = chunks.len() - 1;
4528 chunks[last_idx].push(' ');
4529 chunks[last_idx].push_str(remaining);
4530 } else {
4531 chunks.push(remaining.to_string());
4532 }
4533 }
4534
4535 if chunks.is_empty() {
4536 chunks.push(text.to_string());
4537 }
4538
4539 chunks
4540}
4541
4542fn split_into_sentences(text: &str) -> Vec<String> {
4545 let mut sentences = Vec::new();
4546 let mut current = String::new();
4547 let mut chars = text.chars().peekable();
4548
4549 while let Some(c) = chars.next() {
4550 current.push(c);
4551
4552 if matches!(c, '.' | '!' | '?') {
4554 if let Some(&next) = chars.peek() {
4556 if next.is_whitespace() {
4557 let trimmed = current.trim();
4559 let is_abbreviation = trimmed.ends_with("Mr.")
4560 || trimmed.ends_with("Mrs.")
4561 || trimmed.ends_with("Dr.")
4562 || trimmed.ends_with("Prof.")
4563 || trimmed.ends_with("vs.")
4564 || trimmed.ends_with("etc.")
4565 || trimmed.ends_with("e.g.")
4566 || trimmed.ends_with("i.e.")
4567 || (trimmed.len() >= 2 && trimmed.chars().rev().nth(1).map(|c| c.is_uppercase()).unwrap_or(false));
4569
4570 if !is_abbreviation {
4571 sentences.push(current.trim().to_string());
4572 current = String::new();
4573 chars.next();
4575 }
4576 }
4577 } else {
4578 sentences.push(current.trim().to_string());
4580 current = String::new();
4581 }
4582 } else if c == '\n' {
4583 if let Some(&next) = chars.peek()
4585 && next == '\n'
4586 {
4587 if !current.trim().is_empty() {
4588 sentences.push(current.trim().to_string());
4589 current = String::new();
4590 }
4591 chars.next(); }
4593 }
4594 }
4595
4596 let remaining = current.trim();
4598 if !remaining.is_empty() {
4599 sentences.push(remaining.to_string());
4600 }
4601
4602 sentences
4603}
4604
4605#[derive(Debug, Clone, PartialEq, Eq)]
4607pub struct SearchOptions {
4608 pub layer_filter: Option<SliceLayer>,
4610 pub project_filter: Option<String>,
4612}
4613
4614impl SearchOptions {
4615 pub fn outer_only() -> Self {
4617 Self {
4618 layer_filter: Some(SliceLayer::Outer),
4619 project_filter: None,
4620 }
4621 }
4622
4623 pub fn deep() -> Self {
4625 Self {
4626 layer_filter: None,
4627 project_filter: None,
4628 }
4629 }
4630
4631 pub fn with_project(mut self, project: Option<String>) -> Self {
4632 self.project_filter = project.filter(|value| !value.trim().is_empty());
4633 self
4634 }
4635}
4636
4637impl Default for SearchOptions {
4638 fn default() -> Self {
4639 Self::outer_only()
4640 }
4641}
4642
4643fn metadata_matches_project(metadata: &Value, project: &str) -> bool {
4644 let needle = project.trim();
4645 if needle.is_empty() {
4646 return true;
4647 }
4648
4649 let needle = canonical_project_identity(needle);
4650
4651 metadata.as_object().is_some_and(|object| {
4652 ["project", "project_id", "source_project"]
4653 .iter()
4654 .filter_map(|key| object.get(*key))
4655 .filter_map(|value| value.as_str())
4656 .any(|value| canonical_project_identity(value) == needle)
4657 })
4658}
4659
4660fn canonical_project_identity(value: &str) -> String {
4661 match value.trim().to_ascii_lowercase().as_str() {
4662 "loctree" | "vetcoders" => "vetcoders".to_string(),
4663 other => other.to_string(),
4664 }
4665}
4666
4667fn metadata_path_label(path: &str) -> std::path::PathBuf {
4668 std::path::PathBuf::from(path)
4672}
4673
4674#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4675pub struct SearchResult {
4676 pub id: String,
4677 pub namespace: String,
4678 pub text: String,
4679 pub score: f32,
4680 pub metadata: serde_json::Value,
4681 #[serde(skip_serializing_if = "Option::is_none")]
4683 pub layer: Option<SliceLayer>,
4684 #[serde(skip_serializing_if = "Option::is_none")]
4686 pub parent_id: Option<String>,
4687 #[serde(skip_serializing_if = "Vec::is_empty")]
4689 pub children_ids: Vec<String>,
4690 #[serde(skip_serializing_if = "Vec::is_empty")]
4692 pub keywords: Vec<String>,
4693}
4694
4695impl SearchResult {
4696 pub fn new_legacy(
4698 id: String,
4699 namespace: String,
4700 text: String,
4701 score: f32,
4702 metadata: serde_json::Value,
4703 ) -> Self {
4704 Self {
4705 id,
4706 namespace,
4707 text,
4708 score,
4709 metadata,
4710 layer: None,
4711 parent_id: None,
4712 children_ids: vec![],
4713 keywords: vec![],
4714 }
4715 }
4716
4717 pub fn can_expand(&self) -> bool {
4719 !self.children_ids.is_empty()
4720 }
4721
4722 pub fn can_drill_up(&self) -> bool {
4724 self.parent_id.is_some()
4725 }
4726}
4727
4728fn cosine(a: &[f32], b: &[f32]) -> f32 {
4729 let mut dot = 0.0_f32;
4730 let mut norm_a = 0.0_f32;
4731 let mut norm_b = 0.0_f32;
4732 for (x, y) in a.iter().zip(b.iter()) {
4733 dot += x * y;
4734 norm_a += x * x;
4735 norm_b += y * y;
4736 }
4737 if norm_a == 0.0 || norm_b == 0.0 {
4738 return 0.0;
4739 }
4740 dot / (norm_a.sqrt() * norm_b.sqrt())
4741}
4742
4743#[cfg(test)]
4744mod tests {
4745 use super::{
4746 OnionSliceConfig, SearchOptions, SliceLayer, create_onion_slices, create_onion_slices_fast,
4747 extract_conversation_documents, extract_keywords, extract_markdown_transcript_documents,
4748 hash_content, metadata_matches_project,
4749 };
4750 use serde_json::json;
4751 use std::path::Path;
4752
4753 #[test]
4754 fn short_hash_uses_sha256_prefix_with_minimum_length() {
4755 let hash = hash_content("same content");
4756 assert_eq!(hash.len(), 16);
4757 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
4758 assert_eq!(hash, hash_content("same content"));
4759 }
4760
4761 #[test]
4762 fn keyword_extraction_splits_paths_and_filters_session_tokens() {
4763 let keywords = extract_keywords(
4764 "/Users/silver/Git/tools/TwinSweep session 2ff4de8b9a4e1234567890abcdef notes",
4765 10,
4766 );
4767
4768 assert!(keywords.contains(&"users".to_string()));
4769 assert!(keywords.contains(&"twinsweep".to_string()));
4770 assert!(!keywords.iter().any(|keyword| keyword.contains("2ff4de8b")));
4771 }
4772
4773 #[test]
4774 fn search_options_can_carry_project_filter() {
4775 let options = SearchOptions::deep().with_project(Some("Vista".to_string()));
4776 assert_eq!(options.layer_filter, None);
4777 assert_eq!(options.project_filter.as_deref(), Some("Vista"));
4778 }
4779
4780 #[test]
4781 fn project_match_uses_metadata_fields() {
4782 assert!(metadata_matches_project(
4783 &json!({"project": "Vista"}),
4784 "vista"
4785 ));
4786 assert!(metadata_matches_project(
4787 &json!({"project_id": "Loctree"}),
4788 "loctree"
4789 ));
4790 assert!(!metadata_matches_project(
4791 &json!({"project": "rust-memex"}),
4792 "vista"
4793 ));
4794 assert_eq!(
4795 SearchOptions::default().layer_filter,
4796 Some(SliceLayer::Outer)
4797 );
4798 }
4799
4800 #[test]
4801 fn markdown_transcript_extraction_builds_role_aware_turn_docs() {
4802 let raw = r#"[project: Loctree/vibecrafted | agent: codex | date: 2026-03-30]
4803
4804[signals]
4805Results:
4806- AICX lookup działa
4807[/signals]
4808
4809[09:14:00] assistant: Tak, i to właśnie jest sedno: `aicx-dragon` to żywy endpoint MCP.
4810[09:15:33] user: ziom ale ty sobie sam skonfigurowałeś ~/.codex/config.toml
4811[09:15:47] assistant: Sprawdzam teraz lokalny kontrakt konfiguracji MCP dla Codexa.
4812[09:15:55] reasoning: **Checking config contract**
4813[09:16:06] assistant: Składnia configu wygląda już poprawnie według samego Codexa.
4814"#;
4815
4816 let docs = extract_markdown_transcript_documents(raw, Path::new("sample.md"))
4817 .expect("expected transcript docs");
4818
4819 assert_eq!(docs.len(), 2);
4820 assert!(docs[0].1.contains("Assistant response:"));
4821 assert!(docs[1].1.contains("User request:"));
4822 assert!(docs[1].1.contains("Reasoning focus:"));
4823 assert_eq!(docs[1].2["format"], "markdown_transcript");
4824 assert_eq!(docs[1].2["type"], "transcript_turn");
4825 assert_eq!(docs[1].2["project"], "Loctree/vibecrafted");
4826 }
4827
4828 #[test]
4829 fn short_structured_transcript_turns_keep_outer_slice_in_full_and_fast_modes() {
4830 let metadata = json!({
4831 "type": "transcript_turn",
4832 "format": "markdown_transcript"
4833 });
4834 let config = OnionSliceConfig::default();
4835 let content = "User request:\nDodaj progress do pipeline.\n\nAssistant response:\nPodepnę licznik etapów.";
4836
4837 let full_layers: Vec<SliceLayer> = create_onion_slices(content, &metadata, &config)
4838 .into_iter()
4839 .map(|slice| slice.layer)
4840 .collect();
4841 let fast_layers: Vec<SliceLayer> = create_onion_slices_fast(content, &metadata, &config)
4842 .into_iter()
4843 .map(|slice| slice.layer)
4844 .collect();
4845
4846 assert_eq!(full_layers, vec![SliceLayer::Outer, SliceLayer::Core]);
4847 assert_eq!(fast_layers, vec![SliceLayer::Outer, SliceLayer::Core]);
4848 }
4849
4850 #[test]
4851 fn structured_markdown_outer_becomes_semantic_card() {
4852 let metadata = json!({
4853 "type": "transcript_turn",
4854 "format": "markdown_transcript",
4855 "project": "Loctree/rust-memex",
4856 "agent": "codex"
4857 });
4858 let config = OnionSliceConfig {
4859 outer_target: 220,
4860 ..OnionSliceConfig::default()
4861 };
4862 let content = "User request:\nMake outer retrieval useful for transcript search.\n\nAssistant response:\nDecision: build semantic cards instead of keyword prefixes.\nNext action: add JSON slicing coverage.\n\nReasoning focus:\nOuter-only is the default search path, so weak outer text hides short turns.";
4863
4864 let slices = create_onion_slices(content, &metadata, &config);
4865 let outer = &slices[0].content;
4866 let middle = &slices[1].content;
4867 let inner = &slices[2].content;
4868
4869 assert!(outer.contains("Request:"));
4870 assert!(outer.contains("Response:"));
4871 assert!(outer.contains("Decision:"));
4872 assert!(!outer.starts_with('['));
4873 assert!(middle.contains("Decision:"));
4874 assert!(middle.contains("Next:"));
4875 assert!(inner.contains("Assistant response:"));
4876 assert!(inner.contains("Entities:"));
4877 }
4878
4879 #[test]
4880 fn json_conversation_docs_flow_through_structured_semantic_slices() {
4881 let conversation = json!({
4882 "project": "Loctree/rust-memex",
4883 "sessions": [
4884 {
4885 "info": {
4886 "sessionId": "session-1234567890"
4887 },
4888 "messages": [
4889 {
4890 "role": "user",
4891 "text": "Please replace the keyword-prefixed outer summary with something that reads like a semantic card for search.",
4892 "timestamp": "2026-04-12T04:00:00Z"
4893 },
4894 {
4895 "role": "assistant",
4896 "text": "Decision: we will use semantic cards for outer retrieval. Next action: add JSON regression coverage and preserve the plain text fallback.",
4897 "timestamp": "2026-04-12T04:01:00Z"
4898 },
4899 {
4900 "role": "user",
4901 "text": "Keep the generic plain text path as a safe fallback.",
4902 "timestamp": "2026-04-12T04:02:00Z"
4903 },
4904 {
4905 "role": "assistant",
4906 "text": "Reasoning: default search prefers outer-only, so the semantic card needs to surface the exchange even for short turns.",
4907 "timestamp": "2026-04-12T04:03:00Z"
4908 }
4909 ]
4910 }
4911 ]
4912 });
4913
4914 let docs = extract_conversation_documents(&conversation, Path::new("conversation.json"))
4915 .expect("expected conversation docs");
4916 assert_eq!(docs.len(), 2);
4917 assert_eq!(
4918 docs[0].2.get("format").and_then(|value| value.as_str()),
4919 Some("sessions")
4920 );
4921 assert_eq!(
4922 docs[0].2.get("turn_index").and_then(|value| value.as_u64()),
4923 Some(0)
4924 );
4925 assert!(docs[0].1.contains("User request:"));
4926 assert!(docs[0].1.contains("Assistant response:"));
4927
4928 let config = OnionSliceConfig {
4929 outer_target: 220,
4930 ..OnionSliceConfig::default()
4931 };
4932 let slices = create_onion_slices(&docs[0].1, &docs[0].2, &config);
4933 let outer = &slices[0].content;
4934 let middle = &slices[1].content;
4935 let inner = &slices[2].content;
4936
4937 assert!(outer.contains("Request:"));
4938 assert!(outer.contains("Response:"));
4939 assert!(outer.contains("Decision:"));
4940 assert!(!outer.starts_with('['));
4941 assert!(middle.contains("Decision:"));
4942 assert!(middle.contains("Next:"));
4943 assert!(inner.contains("Assistant response:"));
4944 }
4945
4946 #[test]
4953 fn extract_keywords_drops_spec_boilerplate_even_when_dominant() {
4954 let text = r"
4958 ## user
4959 ## assistant
4960 transcript transcript transcript
4961 user user user user
4962 assistant assistant assistant assistant
4963 Brewing… Brewing… Brewing… Cogitating…
4964 Frosting… Grooving… Grooving…
4965 shifttab shifttab bypass bypass permissions tokens
4966 jest jest jest nie nie nie już też też
4967 VistaPortal LiveTree onionSlicer LanceDB qwen3
4968 ";
4969
4970 let keywords = extract_keywords(text, 30);
4971 let lower: Vec<String> = keywords.iter().map(|k| k.to_ascii_lowercase()).collect();
4972
4973 let banned = [
4974 "assistant",
4975 "user",
4976 "transcript",
4977 "system",
4978 "human",
4979 "model",
4980 "session",
4981 "agent",
4982 "claude",
4983 "codex",
4984 "nie",
4985 "jest",
4986 "już",
4987 "też",
4988 "tylko",
4989 "bardzo",
4990 "brewing",
4991 "cogitating",
4992 "frosting",
4993 "grooving",
4994 "beaming",
4995 "thinking",
4996 "shifttab",
4997 "bypass",
4998 "permissions",
4999 "tokens",
5000 "thought",
5001 "running",
5002 ];
5003 for token in banned {
5004 assert!(
5005 !lower.iter().any(|k| k == token),
5006 "extract_keywords leaked banned boilerplate `{}` into keywords {:?}",
5007 token,
5008 keywords
5009 );
5010 }
5011
5012 let signal_hits = ["vistaportal", "livetree", "onionslicer", "lancedb", "qwen3"]
5016 .iter()
5017 .filter(|signal| {
5018 lower
5019 .iter()
5020 .any(|k| k.contains(*signal) || k == &(*signal).to_string())
5021 })
5022 .count();
5023 assert!(
5024 signal_hits >= 1,
5025 "stoplist over-filtered: zero meaningful tokens survived in {:?}",
5026 keywords
5027 );
5028 }
5029
5030 #[test]
5041 fn extract_keywords_is_deterministic_on_count_ties() {
5042 let text = "alpha bravo charlie delta echo foxtrot golf hotel india juliet";
5043 let baseline = extract_keywords(text, 5);
5044 assert_eq!(baseline.len(), 5);
5045 assert_eq!(
5049 baseline,
5050 vec![
5051 "alpha".to_string(),
5052 "bravo".to_string(),
5053 "charlie".to_string(),
5054 "delta".to_string(),
5055 "echo".to_string(),
5056 ]
5057 );
5058 for _ in 0..50 {
5059 assert_eq!(
5060 extract_keywords(text, 5),
5061 baseline,
5062 "extract_keywords must be deterministic across runs on count ties"
5063 );
5064 }
5065 }
5066
5067 #[test]
5068 fn plain_text_still_uses_generic_fallback_path() {
5069 let metadata = json!({
5070 "type": "note",
5071 "format": "markdown"
5072 });
5073 let config = OnionSliceConfig::default();
5074 let content = "The release workflow still needs a truthful browse surface. We should preserve the plain text fallback while improving structured conversation retrieval with semantic cards and regression tests so the generic path does not regress.";
5075
5076 let slices = create_onion_slices(content, &metadata, &config);
5077 let outer = &slices[0].content;
5078
5079 assert_eq!(
5080 slices.iter().map(|slice| slice.layer).collect::<Vec<_>>(),
5081 vec![
5082 SliceLayer::Outer,
5083 SliceLayer::Middle,
5084 SliceLayer::Inner,
5085 SliceLayer::Core
5086 ]
5087 );
5088 assert!(!outer.contains("Request:"));
5089 assert!(!outer.contains("Response:"));
5090 }
5091}
5092
5093#[cfg(test)]
5094mod p3_llm_outer_tests {
5095 use super::*;
5105 use axum::{Json, Router, extract::State, http::StatusCode, routing::post};
5106 use serde_json::json;
5107 use std::sync::Arc;
5108 use std::sync::Mutex as StdMutex;
5109 use std::time::Duration;
5110 use tokio::net::TcpListener;
5111 use tokio::task::JoinHandle;
5112
5113 type CapturedBody = Arc<StdMutex<Option<serde_json::Value>>>;
5115
5116 enum MockResponse {
5117 Ok(&'static str),
5119 OkRaw(serde_json::Value),
5122 Status(StatusCode),
5124 }
5125
5126 struct MockOllama {
5127 endpoint: String,
5128 captured: CapturedBody,
5129 _handle: JoinHandle<()>,
5130 }
5131
5132 async fn spawn_mock_ollama(behavior: MockResponse) -> MockOllama {
5133 let captured: CapturedBody = Arc::new(StdMutex::new(None));
5134 let captured_for_handler = captured.clone();
5135 let behavior = Arc::new(behavior);
5136
5137 async fn handler(
5138 State(state): State<(CapturedBody, Arc<MockResponse>)>,
5139 Json(body): Json<serde_json::Value>,
5140 ) -> (StatusCode, Json<serde_json::Value>) {
5141 *state.0.lock().expect("captured mutex poisoned") = Some(body);
5142 match state.1.as_ref() {
5143 MockResponse::Ok(text) => (
5144 StatusCode::OK,
5145 Json(json!({ "response": text, "done": true })),
5146 ),
5147 MockResponse::OkRaw(value) => (StatusCode::OK, Json(value.clone())),
5148 MockResponse::Status(code) => (*code, Json(json!({"error": "mocked"}))),
5149 }
5150 }
5151
5152 let app = Router::new()
5153 .route("/api/generate", post(handler))
5154 .with_state((captured_for_handler, behavior));
5155
5156 let listener = TcpListener::bind("127.0.0.1:0")
5157 .await
5158 .expect("bind mock ollama");
5159 let addr = listener.local_addr().expect("local_addr");
5160 let handle = tokio::spawn(async move {
5161 let _ = axum::serve(listener, app).await;
5162 });
5163
5164 MockOllama {
5165 endpoint: format!("http://{addr}"),
5166 captured,
5167 _handle: handle,
5168 }
5169 }
5170
5171 #[tokio::test]
5172 async fn synthesize_outer_via_ollama_posts_correct_payload_and_parses_response() {
5173 let mock = spawn_mock_ollama(MockResponse::Ok(
5174 "Naprawiono onion-slicer P3: outer generowany przez Ollama.",
5175 ))
5176 .await;
5177
5178 let summary = synthesize_outer_via_ollama(
5179 "User: napraw P3.\nAssistant: Wpięte do pipeline.",
5180 "qwen2.5:3b",
5181 &mock.endpoint,
5182 )
5183 .await;
5184
5185 assert_eq!(
5186 summary.as_deref(),
5187 Some("Naprawiono onion-slicer P3: outer generowany przez Ollama.")
5188 );
5189
5190 let captured = mock
5191 .captured
5192 .lock()
5193 .expect("captured")
5194 .clone()
5195 .expect("ollama mock did not record the POST body");
5196
5197 assert_eq!(
5198 captured.get("model").and_then(|v| v.as_str()),
5199 Some("qwen2.5:3b"),
5200 "model field must be forwarded verbatim"
5201 );
5202 assert_eq!(
5203 captured.get("stream"),
5204 Some(&json!(false)),
5205 "stream must be false so the helper can read the full response in one shot"
5206 );
5207 let prompt = captured
5208 .get("prompt")
5209 .and_then(|v| v.as_str())
5210 .expect("prompt field");
5211 assert!(
5212 prompt.contains("napraw P3"),
5213 "prompt must include the transcript content"
5214 );
5215 assert!(
5216 prompt.to_ascii_lowercase().contains("polish"),
5217 "prompt must keep the language directive (Polish summary)"
5218 );
5219 assert!(
5220 prompt.to_ascii_lowercase().contains("brewing"),
5221 "prompt must instruct the model to skip Claude Code/Codex UI noise"
5222 );
5223 }
5224
5225 #[tokio::test]
5226 async fn synthesize_outer_via_ollama_truncates_oversized_input() {
5227 let mock = spawn_mock_ollama(MockResponse::Ok("ok")).await;
5228 let big = "A".repeat(OLLAMA_OUTER_INPUT_CHAR_BUDGET * 2);
5229
5230 let _ = synthesize_outer_via_ollama(&big, "any", &mock.endpoint).await;
5231
5232 let prompt = mock
5233 .captured
5234 .lock()
5235 .expect("captured")
5236 .clone()
5237 .expect("body")
5238 .get("prompt")
5239 .and_then(|v| v.as_str())
5240 .expect("prompt")
5241 .to_string();
5242 assert!(
5243 prompt.contains("transcript truncated for outer summary"),
5244 "oversized inputs must be truncated with the marker so the model sees the boundary"
5245 );
5246 assert!(
5249 prompt.chars().count() < OLLAMA_OUTER_INPUT_CHAR_BUDGET + 1_000,
5250 "prompt blew past the input char budget: {} chars",
5251 prompt.chars().count()
5252 );
5253 }
5254
5255 #[tokio::test]
5256 async fn synthesize_outer_via_ollama_returns_none_on_non_2xx() {
5257 let mock = spawn_mock_ollama(MockResponse::Status(StatusCode::INTERNAL_SERVER_ERROR)).await;
5258 let summary = synthesize_outer_via_ollama("payload", "model", &mock.endpoint).await;
5259 assert!(
5260 summary.is_none(),
5261 "5xx responses must surface as None (keyword fallback)"
5262 );
5263 }
5264
5265 #[tokio::test]
5266 async fn synthesize_outer_via_ollama_returns_none_on_malformed_payload() {
5267 let mock = spawn_mock_ollama(MockResponse::OkRaw(json!({"done": true}))).await;
5269 let summary = synthesize_outer_via_ollama("payload", "model", &mock.endpoint).await;
5270 assert!(summary.is_none());
5271 }
5272
5273 #[tokio::test]
5274 async fn synthesize_outer_via_ollama_returns_none_on_empty_response_field() {
5275 let mock = spawn_mock_ollama(MockResponse::Ok(" \n ")).await;
5276 let summary = synthesize_outer_via_ollama("payload", "model", &mock.endpoint).await;
5277 assert!(
5278 summary.is_none(),
5279 "whitespace-only completions must not pollute the outer layer"
5280 );
5281 }
5282
5283 #[tokio::test]
5284 async fn synthesize_outer_via_ollama_returns_none_on_unreachable_endpoint() {
5285 let result = tokio::time::timeout(
5292 Duration::from_secs(15),
5293 synthesize_outer_via_ollama("payload", "model", "http://203.0.113.1:9"),
5294 )
5295 .await
5296 .expect("synthesize must respect its own connect_timeout in the test budget");
5297 assert!(result.is_none());
5298 }
5299
5300 #[tokio::test]
5301 async fn synthesize_outer_via_ollama_returns_none_on_empty_input() {
5302 let result = synthesize_outer_via_ollama(" \n\t ", "x", "http://127.0.0.1:1").await;
5305 assert!(result.is_none());
5306 }
5307
5308 fn long_transcript() -> String {
5309 let body = "User asked how to fix the onion slicer outer layer. Assistant proposed wiring Ollama into the pipeline so the outer summary becomes a real Polish sentence instead of a TF-IDF keyword splat. The plan covers prompt construction, response parsing, and graceful fallback when Ollama is unreachable. ";
5312 body.repeat(3)
5313 }
5314
5315 #[tokio::test]
5316 async fn create_onion_slices_async_replaces_outer_with_llm_summary() {
5317 let mock = spawn_mock_ollama(MockResponse::Ok(
5318 "LLM-resolved outer: streszczenie naprawy slicera onionowego.",
5319 ))
5320 .await;
5321 let config = OnionSliceConfig {
5322 outer_synthesis: OuterSynthesis::Llm {
5323 model: "qwen2.5:3b".to_string(),
5324 endpoint: mock.endpoint.clone(),
5325 },
5326 ..OnionSliceConfig::default()
5327 };
5328 let metadata = json!({"type": "note"});
5329 let content = long_transcript();
5330
5331 let slices = create_onion_slices_async(&content, &metadata, &config).await;
5332 let outer = slices
5333 .iter()
5334 .find(|slice| slice.layer == SliceLayer::Outer)
5335 .expect("outer slice present");
5336
5337 assert_eq!(
5338 outer.content,
5339 "LLM-resolved outer: streszczenie naprawy slicera onionowego."
5340 );
5341
5342 let middle = slices
5345 .iter()
5346 .find(|slice| slice.layer == SliceLayer::Middle)
5347 .expect("middle slice present");
5348 assert!(
5349 middle.children_ids.contains(&outer.id),
5350 "middle.children_ids must point at the new outer id (got {:?}, outer={})",
5351 middle.children_ids,
5352 outer.id
5353 );
5354
5355 let keyword_lower: Vec<String> = outer
5358 .keywords
5359 .iter()
5360 .map(|k| k.to_ascii_lowercase())
5361 .collect();
5362 assert!(
5363 keyword_lower.iter().any(|kw| kw.contains("streszczenie")
5364 || kw.contains("naprawy")
5365 || kw.contains("slicera")),
5366 "outer keywords should reflect the LLM summary, got {:?}",
5367 outer.keywords
5368 );
5369 }
5370
5371 #[tokio::test]
5372 async fn create_onion_slices_async_falls_back_to_keyword_when_ollama_unreachable() {
5373 let config = OnionSliceConfig {
5376 outer_synthesis: OuterSynthesis::Llm {
5377 model: "qwen2.5:3b".to_string(),
5378 endpoint: "http://203.0.113.1:9".to_string(),
5379 },
5380 ..OnionSliceConfig::default()
5381 };
5382 let metadata = json!({"type": "note"});
5383 let content = long_transcript();
5384
5385 let slices = tokio::time::timeout(
5388 Duration::from_secs(15),
5389 create_onion_slices_async(&content, &metadata, &config),
5390 )
5391 .await
5392 .expect("async slicer must not block forever on a dead endpoint");
5393 let outer = slices
5394 .iter()
5395 .find(|slice| slice.layer == SliceLayer::Outer)
5396 .expect("outer slice present");
5397 assert!(
5398 !outer.content.trim().is_empty(),
5399 "keyword fallback must produce a usable outer when LLM is unreachable"
5400 );
5401
5402 assert!(
5411 outer.content.starts_with('['),
5412 "fallback outer must be the keyword-style bracketed summary, got: {:?}",
5413 outer.content
5414 );
5415 }
5416
5417 #[tokio::test]
5418 async fn create_onion_slices_fast_async_replaces_outer_with_llm_summary() {
5419 let mock = spawn_mock_ollama(MockResponse::Ok("Fast onion outer via LLM.")).await;
5420 let config = OnionSliceConfig {
5421 outer_synthesis: OuterSynthesis::Llm {
5422 model: "qwen2.5:3b".to_string(),
5423 endpoint: mock.endpoint.clone(),
5424 },
5425 ..OnionSliceConfig::default()
5426 };
5427 let metadata = json!({"type": "note"});
5428 let content = long_transcript();
5429
5430 let slices = create_onion_slices_fast_async(&content, &metadata, &config).await;
5431 assert_eq!(slices.len(), 2);
5433 let outer = slices
5434 .iter()
5435 .find(|slice| slice.layer == SliceLayer::Outer)
5436 .expect("fast outer slice present");
5437 let core = slices
5438 .iter()
5439 .find(|slice| slice.layer == SliceLayer::Core)
5440 .expect("fast core slice present");
5441 assert_eq!(outer.content, "Fast onion outer via LLM.");
5442 assert!(
5443 core.children_ids.contains(&outer.id),
5444 "fast-mode core must reference the new outer id"
5445 );
5446 }
5447
5448 #[tokio::test]
5449 async fn structured_conversation_outer_is_replaced_by_llm_summary() {
5450 let mock = spawn_mock_ollama(MockResponse::Ok("Structured outer rewritten by LLM.")).await;
5454 let config = OnionSliceConfig {
5455 outer_synthesis: OuterSynthesis::Llm {
5456 model: "qwen2.5:3b".to_string(),
5457 endpoint: mock.endpoint.clone(),
5458 },
5459 ..OnionSliceConfig::default()
5460 };
5461 let metadata = json!({
5462 "type": "conversation",
5463 "format": "markdown_transcript"
5464 });
5465 let content = "## user\nNapraw onion slicer P3.\n\n## assistant\nWpiąłem Ollama do pipeline. Dodałem testy. Klucze sa nowe.\n";
5466
5467 let slices = create_onion_slices_async(content, &metadata, &config).await;
5468 let outer = slices
5469 .iter()
5470 .find(|slice| slice.layer == SliceLayer::Outer)
5471 .expect("structured outer slice present");
5472 assert_eq!(outer.content, "Structured outer rewritten by LLM.");
5473 }
5474
5475 #[test]
5476 fn replace_outer_slice_is_a_noop_when_summary_is_empty() {
5477 let metadata = json!({"type": "note"});
5478 let content = long_transcript();
5479 let original = create_onion_slices(&content, &metadata, &OnionSliceConfig::default());
5480 let cloned = original.clone();
5481 let after = replace_outer_slice(cloned, " ".to_string());
5482 assert_eq!(after.len(), original.len());
5483 for (left, right) in after.iter().zip(original.iter()) {
5484 assert_eq!(left.id, right.id);
5485 assert_eq!(left.content, right.content);
5486 assert_eq!(left.children_ids, right.children_ids);
5487 }
5488 }
5489
5490 #[test]
5491 fn replace_outer_slice_rewrites_outer_id_and_parent_links() {
5492 let metadata = json!({"type": "note"});
5493 let content = long_transcript();
5494 let slices = create_onion_slices(&content, &metadata, &OnionSliceConfig::default());
5495 let original_outer_id = slices
5496 .iter()
5497 .find(|slice| slice.layer == SliceLayer::Outer)
5498 .expect("outer present")
5499 .id
5500 .clone();
5501
5502 let after = replace_outer_slice(slices, "Brand new outer text.".to_string());
5503 let outer = after
5504 .iter()
5505 .find(|slice| slice.layer == SliceLayer::Outer)
5506 .expect("outer still present");
5507 assert_eq!(outer.content, "Brand new outer text.");
5508 assert_ne!(outer.id, original_outer_id, "outer id must be regenerated");
5509
5510 for slice in &after {
5512 assert!(
5513 !slice.children_ids.contains(&original_outer_id),
5514 "children_ids must not reference the old outer id (slice layer={:?})",
5515 slice.layer
5516 );
5517 }
5518 assert!(
5520 after
5521 .iter()
5522 .any(|slice| slice.children_ids.contains(&outer.id)),
5523 "no slice references the new outer id — hierarchy broken"
5524 );
5525 }
5526}