1use anyhow::{Result, anyhow};
2use pdf_extract;
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use sha2::{Digest, Sha256};
6use std::collections::{HashMap, HashSet, hash_map::DefaultHasher};
7use std::hash::{Hash, Hasher};
8use std::path::Path;
9use std::sync::Arc;
10use tokio::sync::Mutex;
11use tracing::debug;
12
13use crate::{
14 embeddings::MLXBridge,
15 preprocessing::{PreprocessingConfig, Preprocessor},
16 search::BM25Index,
17 storage::{ChromaDocument, StorageManager},
18};
19
20pub mod pipeline;
22pub use pipeline::{
23 Chunk, EmbeddedChunk, FileContent, PipelineConfig, PipelineResult, PipelineStats, run_pipeline,
24};
25
26const DEFAULT_NAMESPACE: &str = "rag";
27
28const STORAGE_BATCH_SIZE: usize = 100;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[repr(u8)]
49pub enum SliceLayer {
50 Outer = 1,
52 Middle = 2,
54 Inner = 3,
56 Core = 4,
58}
59
60impl SliceLayer {
61 pub fn target_chars(&self) -> usize {
63 match self {
64 SliceLayer::Outer => 100,
65 SliceLayer::Middle => 300,
66 SliceLayer::Inner => 600,
67 SliceLayer::Core => usize::MAX,
68 }
69 }
70
71 pub fn as_u8(&self) -> u8 {
73 *self as u8
74 }
75
76 pub fn from_u8(v: u8) -> Option<Self> {
78 match v {
79 1 => Some(SliceLayer::Outer),
80 2 => Some(SliceLayer::Middle),
81 3 => Some(SliceLayer::Inner),
82 4 => Some(SliceLayer::Core),
83 _ => None,
84 }
85 }
86
87 pub fn name(&self) -> &'static str {
89 match self {
90 SliceLayer::Outer => "outer",
91 SliceLayer::Middle => "middle",
92 SliceLayer::Inner => "inner",
93 SliceLayer::Core => "core",
94 }
95 }
96}
97
98impl std::fmt::Display for SliceLayer {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "{}", self.name())
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OnionSlice {
107 pub id: String,
109 pub layer: SliceLayer,
111 pub content: String,
113 pub parent_id: Option<String>,
115 pub children_ids: Vec<String>,
117 pub keywords: Vec<String>,
119}
120
121impl OnionSlice {
122 pub fn generate_id(content: &str, layer: SliceLayer) -> String {
124 let mut hasher = DefaultHasher::new();
125 content.hash(&mut hasher);
126 layer.as_u8().hash(&mut hasher);
127 format!("slice_{:016x}_{}", hasher.finish(), layer.name())
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
133pub enum SliceMode {
134 #[default]
136 Onion,
137 OnionFast,
139 Flat,
141}
142
143impl std::str::FromStr for SliceMode {
144 type Err = String;
145
146 fn from_str(s: &str) -> Result<Self, Self::Err> {
147 match s.to_lowercase().as_str() {
148 "onion" => Ok(SliceMode::Onion),
149 "onion-fast" | "fast" => Ok(SliceMode::OnionFast),
150 "flat" => Ok(SliceMode::Flat),
151 other => Err(format!(
152 "Invalid slice mode: '{}'. Use 'onion', 'onion-fast', or 'flat'",
153 other
154 )),
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub enum IndexResult {
162 Indexed {
164 chunks_indexed: usize,
166 content_hash: String,
168 },
169 Skipped {
171 reason: String,
173 content_hash: String,
175 },
176}
177
178impl IndexResult {
179 pub fn was_indexed(&self) -> bool {
181 matches!(self, IndexResult::Indexed { .. })
182 }
183
184 #[deprecated(note = "use was_indexed")]
185 pub fn is_indexed(&self) -> bool {
186 self.was_indexed()
187 }
188
189 pub fn is_skipped(&self) -> bool {
191 matches!(self, IndexResult::Skipped { .. })
192 }
193
194 pub fn content_hash(&self) -> &str {
196 match self {
197 IndexResult::Indexed { content_hash, .. } => content_hash,
198 IndexResult::Skipped { content_hash, .. } => content_hash,
199 }
200 }
201}
202
203pub fn compute_content_hash(content: &str) -> String {
205 let mut hasher = Sha256::new();
206 hasher.update(content.as_bytes());
207 let result = hasher.finalize();
208 result.iter().map(|b| format!("{:02x}", b)).collect()
210}
211
212#[derive(Debug, Clone)]
214pub struct OnionSliceConfig {
215 pub outer_target: usize,
217 pub middle_target: usize,
219 pub inner_target: usize,
221 pub min_content_for_slicing: usize,
223}
224
225impl Default for OnionSliceConfig {
226 fn default() -> Self {
227 Self {
228 outer_target: 100,
229 middle_target: 300,
230 inner_target: 600,
231 min_content_for_slicing: 200,
232 }
233 }
234}
235
236pub fn create_onion_slices(
244 content: &str,
245 _metadata: &serde_json::Value,
246 config: &OnionSliceConfig,
247) -> Vec<OnionSlice> {
248 let content = content.trim();
249
250 if content.len() < config.min_content_for_slicing {
252 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
253 let keywords = extract_keywords(content, 5);
254 return vec![OnionSlice {
255 id: core_id,
256 layer: SliceLayer::Core,
257 content: content.to_string(),
258 parent_id: None,
259 children_ids: vec![],
260 keywords,
261 }];
262 }
263
264 let mut slices = Vec::with_capacity(4);
265
266 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
268 let core_keywords = extract_keywords(content, 10);
269
270 let inner_content = extract_key_content(content, config.inner_target);
272 let inner_id = OnionSlice::generate_id(&inner_content, SliceLayer::Inner);
273 let inner_keywords = extract_keywords(&inner_content, 7);
274
275 let middle_content = extract_key_content(&inner_content, config.middle_target);
277 let middle_id = OnionSlice::generate_id(&middle_content, SliceLayer::Middle);
278 let middle_keywords = extract_keywords(&middle_content, 5);
279
280 let outer_content = create_outer_summary(&middle_content, &core_keywords, config.outer_target);
282 let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
283 let outer_keywords = extract_keywords(&outer_content, 3);
284
285 slices.push(OnionSlice {
287 id: outer_id.clone(),
288 layer: SliceLayer::Outer,
289 content: outer_content,
290 parent_id: Some(middle_id.clone()),
291 children_ids: vec![],
292 keywords: outer_keywords,
293 });
294
295 slices.push(OnionSlice {
296 id: middle_id.clone(),
297 layer: SliceLayer::Middle,
298 content: middle_content,
299 parent_id: Some(inner_id.clone()),
300 children_ids: vec![outer_id],
301 keywords: middle_keywords,
302 });
303
304 slices.push(OnionSlice {
305 id: inner_id.clone(),
306 layer: SliceLayer::Inner,
307 content: inner_content,
308 parent_id: Some(core_id.clone()),
309 children_ids: vec![middle_id],
310 keywords: inner_keywords,
311 });
312
313 slices.push(OnionSlice {
314 id: core_id.clone(),
315 layer: SliceLayer::Core,
316 content: content.to_string(),
317 parent_id: None,
318 children_ids: vec![inner_id],
319 keywords: core_keywords,
320 });
321
322 slices
323}
324
325pub fn create_onion_slices_fast(
330 content: &str,
331 _metadata: &serde_json::Value,
332 config: &OnionSliceConfig,
333) -> Vec<OnionSlice> {
334 let content = content.trim();
335
336 if content.len() < config.min_content_for_slicing {
338 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
339 let keywords = extract_keywords(content, 5);
340 return vec![OnionSlice {
341 id: core_id,
342 layer: SliceLayer::Core,
343 content: content.to_string(),
344 parent_id: None,
345 children_ids: vec![],
346 keywords,
347 }];
348 }
349
350 let mut slices = Vec::with_capacity(2);
351
352 let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
354 let core_keywords = extract_keywords(content, 10);
355
356 let outer_content = create_outer_summary(content, &core_keywords, config.outer_target);
359 let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
360 let outer_keywords = extract_keywords(&outer_content, 3);
361
362 slices.push(OnionSlice {
364 id: outer_id.clone(),
365 layer: SliceLayer::Outer,
366 content: outer_content,
367 parent_id: Some(core_id.clone()),
368 children_ids: vec![],
369 keywords: outer_keywords,
370 });
371
372 slices.push(OnionSlice {
373 id: core_id,
374 layer: SliceLayer::Core,
375 content: content.to_string(),
376 parent_id: None,
377 children_ids: vec![outer_id],
378 keywords: core_keywords,
379 });
380
381 slices
382}
383
384fn extract_keywords(text: &str, max_keywords: usize) -> Vec<String> {
386 use std::collections::HashMap;
387
388 const STOP_WORDS: &[&str] = &[
390 "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by",
391 "from", "as", "is", "was", "are", "were", "been", "be", "have", "has", "had", "do", "does",
392 "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "this",
393 "that", "these", "those", "i", "you", "he", "she", "it", "we", "they", "what", "which",
394 "who", "whom", "when", "where", "why", "how", "all", "each", "every", "both", "few",
395 "more", "most", "other", "some", "such", "no", "not", "only", "own", "same", "so", "than",
396 "too", "very", "just", "also", "now", "here", "there", "then", "once", "if", "into",
397 "through", "during", "before", "after", "above", "below", "between", "under", "again",
398 "further", "about", "out", "over", "up", "down", "off", "any", "because", "until", "while",
399 ];
400
401 let stop_set: std::collections::HashSet<&str> = STOP_WORDS.iter().copied().collect();
402
403 let mut word_counts: HashMap<String, usize> = HashMap::new();
405 for raw in text.split_whitespace() {
406 for token in tokenize_keyword_candidates(raw) {
407 if token.len() >= 3
408 && token.len() <= 30
409 && !stop_set.contains(token.as_str())
410 && !looks_like_session_token(&token)
411 {
412 *word_counts.entry(token).or_insert(0) += 1;
413 }
414 }
415 }
416
417 let mut words: Vec<_> = word_counts.into_iter().collect();
419 words.sort_by(|a, b| b.1.cmp(&a.1));
420
421 words
422 .into_iter()
423 .take(max_keywords)
424 .map(|(word, _)| word)
425 .collect()
426}
427
428fn tokenize_keyword_candidates(raw: &str) -> Vec<String> {
429 let mut tokens = Vec::new();
430
431 for segment in raw
432 .split(|ch: char| !ch.is_alphanumeric())
433 .filter(|segment| !segment.is_empty())
434 {
435 let compact: String = segment.chars().flat_map(|ch| ch.to_lowercase()).collect();
436 let mut normalized = String::with_capacity(segment.len() * 2);
437 let mut previous_is_lowercase = false;
438
439 for ch in segment.chars() {
440 if ch.is_ascii_uppercase() && previous_is_lowercase {
441 normalized.push(' ');
442 }
443
444 normalized.push(ch.to_ascii_lowercase());
445 previous_is_lowercase = ch.is_ascii_lowercase();
446 }
447
448 let segment_tokens = normalized
449 .split_whitespace()
450 .map(str::trim)
451 .filter(|value| !value.is_empty())
452 .map(ToOwned::to_owned)
453 .collect::<Vec<_>>();
454
455 tokens.extend(segment_tokens.iter().cloned());
456
457 if segment_tokens.len() > 1
458 && compact.len() >= 3
459 && compact.len() <= 30
460 && !tokens.iter().any(|token| token == &compact)
461 {
462 tokens.push(compact);
463 }
464 }
465
466 tokens
467}
468
469fn looks_like_session_token(token: &str) -> bool {
470 let hex_chars = token.chars().filter(|ch| ch.is_ascii_hexdigit()).count();
471 let digit_chars = token.chars().filter(|ch| ch.is_ascii_digit()).count();
472 let alpha_chars = token.chars().filter(|ch| ch.is_ascii_alphabetic()).count();
473
474 token.len() > 12 && hex_chars == token.len()
475 || digit_chars >= 6
476 || (token.len() > 20 && alpha_chars < token.len() / 3)
477}
478
479fn hash_content(text: &str) -> String {
481 let mut hash = compute_content_hash(text);
482 hash.truncate(16);
483 hash
484}
485
486fn extract_conversation_documents(
490 value: &serde_json::Value,
491 source_path: &std::path::Path,
492) -> Option<Vec<(String, String, serde_json::Value)>> {
493 let obj = value.as_object()?;
494 let source_name = source_path
495 .file_name()
496 .and_then(|n| n.to_str())
497 .unwrap_or("unknown");
498
499 if let Some(serde_json::Value::Array(sessions)) = obj.get("sessions") {
502 let project = obj
503 .get("project")
504 .and_then(|v| v.as_str())
505 .unwrap_or("unknown");
506
507 let mut docs = Vec::new();
508 for session in sessions {
509 let session_obj = session.as_object()?;
510 let session_id = session_obj
511 .get("info")
512 .and_then(|i| i.get("sessionId"))
513 .and_then(|v| v.as_str())
514 .unwrap_or("unknown");
515 let session_short = &session_id[..session_id.len().min(8)];
516
517 if let Some(serde_json::Value::Array(messages)) = session_obj.get("messages") {
518 for (idx, msg) in messages.iter().enumerate() {
519 let msg_obj = match msg.as_object() {
520 Some(o) => o,
521 None => continue,
522 };
523
524 let role = msg_obj
525 .get("role")
526 .and_then(|v| v.as_str())
527 .unwrap_or("unknown");
528 let text = msg_obj.get("text").and_then(|v| v.as_str()).unwrap_or("");
529 let timestamp = msg_obj
530 .get("timestamp")
531 .and_then(|v| v.as_str())
532 .unwrap_or("");
533
534 let text = text.trim();
536 if text.len() < 20 {
537 continue;
538 }
539
540 let content_hash = hash_content(text);
541 let doc_id = format!("msg-{}-{:04}-{}", session_short, idx, content_hash);
542
543 let metadata = json!({
544 "role": role,
545 "session": session_short,
546 "project": project,
547 "timestamp": timestamp,
548 "source": source_name,
549 "type": "conversation",
550 "format": "sessions"
551 });
552
553 docs.push((doc_id, text.to_string(), metadata));
554 }
555 }
556 }
557
558 if !docs.is_empty() {
559 tracing::info!(
560 "Sessions format detected: {} -> {} messages",
561 source_path.display(),
562 docs.len()
563 );
564 return Some(docs);
565 }
566 }
567
568 if let Some(serde_json::Value::Array(messages)) = obj.get("messages") {
572 let conv_id = obj
573 .get("uuid")
574 .or_else(|| obj.get("id"))
575 .and_then(|v| v.as_str())
576 .unwrap_or("unknown");
577 let conv_short = &conv_id[..conv_id.len().min(8)];
578 let title = obj
579 .get("name")
580 .or_else(|| obj.get("title"))
581 .and_then(|v| v.as_str())
582 .unwrap_or("");
583
584 let looks_like_conversation = messages.iter().any(|m| {
586 m.get("sender").is_some() || m.get("role").is_some() || m.get("author").is_some()
587 });
588
589 if looks_like_conversation {
590 let mut docs = Vec::new();
591 for (idx, msg) in messages.iter().enumerate() {
592 let msg_obj = match msg.as_object() {
593 Some(o) => o,
594 None => continue,
595 };
596
597 let role = msg_obj
598 .get("sender")
599 .or_else(|| msg_obj.get("role"))
600 .or_else(|| msg_obj.get("author").and_then(|a| a.get("role")))
601 .and_then(|v| v.as_str())
602 .unwrap_or("unknown");
603
604 let role = match role {
606 "human" => "user",
607 "assistant" | "bot" => "assistant",
608 other => other,
609 };
610
611 let text = msg_obj
613 .get("text")
614 .and_then(|v| v.as_str())
615 .or_else(|| {
616 msg_obj.get("content").and_then(|c| {
618 if let Some(s) = c.as_str() {
619 Some(s)
620 } else if let Some(_arr) = c.as_array() {
621 None } else {
624 None
625 }
626 })
627 })
628 .unwrap_or("");
629
630 let text = if text.is_empty() {
632 if let Some(serde_json::Value::Array(content)) = msg_obj.get("content") {
633 content
634 .iter()
635 .filter_map(|c| c.get("text").and_then(|t| t.as_str()))
636 .collect::<Vec<_>>()
637 .join(" ")
638 } else {
639 String::new()
640 }
641 } else {
642 text.to_string()
643 };
644
645 let timestamp = msg_obj
646 .get("created_at")
647 .or_else(|| msg_obj.get("timestamp"))
648 .and_then(|v| v.as_str())
649 .unwrap_or("");
650
651 let text = text.trim();
652 if text.len() < 20 {
653 continue;
654 }
655
656 let content_hash = hash_content(text);
657 let doc_id = format!("conv-{}-{:04}-{}", conv_short, idx, content_hash);
658
659 let metadata = json!({
660 "role": role,
661 "conversation": conv_short,
662 "title": title,
663 "timestamp": timestamp,
664 "source": source_name,
665 "type": "conversation",
666 "format": "claude_web"
667 });
668
669 docs.push((doc_id, text.to_string(), metadata));
670 }
671
672 if !docs.is_empty() {
673 tracing::info!(
674 "Conversation format detected: {} -> {} messages",
675 source_path.display(),
676 docs.len()
677 );
678 return Some(docs);
679 }
680 }
681 }
682
683 if let Some(serde_json::Value::Array(messages)) = obj.get("chat_messages") {
686 let conv_id = obj
687 .get("uuid")
688 .and_then(|v| v.as_str())
689 .unwrap_or("unknown");
690 let conv_short = &conv_id[..conv_id.len().min(8)];
691 let title = obj.get("name").and_then(|v| v.as_str()).unwrap_or("");
692
693 let mut docs = Vec::new();
694 for (idx, msg) in messages.iter().enumerate() {
695 let msg_obj = match msg.as_object() {
696 Some(o) => o,
697 None => continue,
698 };
699
700 let role = msg_obj
701 .get("sender")
702 .and_then(|v| v.as_str())
703 .unwrap_or("unknown");
704
705 let role = match role {
707 "human" => "user",
708 "assistant" | "bot" => "assistant",
709 other => other,
710 };
711
712 let text = msg_obj.get("text").and_then(|v| v.as_str()).unwrap_or("");
713
714 let timestamp = msg_obj
715 .get("created_at")
716 .and_then(|v| v.as_str())
717 .unwrap_or("");
718
719 let text = text.trim();
720 if text.len() < 20 {
721 continue;
722 }
723
724 let content_hash = hash_content(text);
725 let doc_id = format!("chat-{}-{:04}-{}", conv_short, idx, content_hash);
726
727 let metadata = json!({
728 "role": role,
729 "conversation": conv_short,
730 "title": title,
731 "timestamp": timestamp,
732 "source": source_name,
733 "type": "conversation",
734 "format": "claude_web"
735 });
736
737 docs.push((doc_id, text.to_string(), metadata));
738 }
739
740 if !docs.is_empty() {
741 tracing::info!(
742 "Claude.ai chat_messages format detected: {} -> {} messages",
743 source_path.display(),
744 docs.len()
745 );
746 return Some(docs);
747 }
748 }
749
750 if let Some(serde_json::Value::Object(mapping)) = obj.get("mapping") {
753 let conv_id = obj
754 .get("id")
755 .or_else(|| obj.get("conversation_id"))
756 .and_then(|v| v.as_str())
757 .unwrap_or("unknown");
758 let conv_short = &conv_id[..conv_id.len().min(8)];
759 let title = obj.get("title").and_then(|v| v.as_str()).unwrap_or("");
760
761 let mut docs = Vec::new();
762 let mut entries: Vec<_> = mapping.iter().collect();
763 entries.sort_by(|a, b| {
765 let time_a =
766 a.1.get("message")
767 .and_then(|m| m.get("create_time"))
768 .and_then(|t| t.as_f64())
769 .unwrap_or(0.0);
770 let time_b =
771 b.1.get("message")
772 .and_then(|m| m.get("create_time"))
773 .and_then(|t| t.as_f64())
774 .unwrap_or(0.0);
775 time_a
776 .partial_cmp(&time_b)
777 .unwrap_or(std::cmp::Ordering::Equal)
778 });
779
780 for (idx, (_node_id, node)) in entries.iter().enumerate() {
781 let message = match node.get("message") {
782 Some(m) => m,
783 None => continue,
784 };
785
786 let role = message
787 .get("author")
788 .and_then(|a| a.get("role"))
789 .and_then(|v| v.as_str())
790 .unwrap_or("unknown");
791
792 if role == "system" {
794 continue;
795 }
796
797 let text = message
798 .get("content")
799 .and_then(|c| c.get("parts"))
800 .and_then(|p| p.as_array())
801 .map(|parts| {
802 parts
803 .iter()
804 .filter_map(|p| p.as_str())
805 .collect::<Vec<_>>()
806 .join(" ")
807 })
808 .unwrap_or_default();
809
810 let timestamp = message
811 .get("create_time")
812 .and_then(|t| t.as_f64())
813 .map(|ts| {
814 chrono::DateTime::from_timestamp(ts as i64, 0)
815 .map(|dt| dt.to_rfc3339())
816 .unwrap_or_default()
817 })
818 .unwrap_or_default();
819
820 let text = text.trim();
821 if text.len() < 20 {
822 continue;
823 }
824
825 let content_hash = hash_content(text);
826 let doc_id = format!("gpt-{}-{:04}-{}", conv_short, idx, content_hash);
827
828 let metadata = json!({
829 "role": role,
830 "conversation": conv_short,
831 "title": title,
832 "timestamp": timestamp,
833 "source": source_name,
834 "type": "conversation",
835 "format": "chatgpt"
836 });
837
838 docs.push((doc_id, text.to_string(), metadata));
839 }
840
841 if !docs.is_empty() {
842 tracing::info!(
843 "ChatGPT format detected: {} -> {} messages",
844 source_path.display(),
845 docs.len()
846 );
847 return Some(docs);
848 }
849 }
850
851 None
852}
853
854fn extract_json_element_content(value: &serde_json::Value) -> String {
857 match value {
858 serde_json::Value::String(s) => s.clone(),
859 serde_json::Value::Object(map) => {
860 let mut parts = Vec::new();
861
862 for key in [
864 "content",
865 "text",
866 "message",
867 "summary",
868 "description",
869 "body",
870 ] {
871 if let Some(serde_json::Value::String(s)) = map.get(key)
872 && !s.is_empty()
873 {
874 parts.push(s.clone());
875 }
876 }
877
878 if let Some(serde_json::Value::String(role)) = map.get("role")
880 && let Some(content) = map.get("content")
881 {
882 match content {
883 serde_json::Value::String(s) => {
884 parts.push(format!("{}: {}", role, s));
885 }
886 serde_json::Value::Array(arr) => {
887 for item in arr {
889 if let serde_json::Value::Object(block) = item
890 && let Some(serde_json::Value::String(t)) = block.get("text")
891 {
892 parts.push(format!("{}: {}", role, t));
893 }
894 }
895 }
896 _ => {}
897 }
898 }
899
900 if let Some(serde_json::Value::Array(messages)) = map.get("messages") {
902 for msg in messages.iter().take(50) {
903 let msg_content = extract_json_element_content(msg);
905 if !msg_content.is_empty() && msg_content.len() > 10 {
906 parts.push(msg_content);
907 }
908 }
909 }
910
911 if let Some(serde_json::Value::Array(messages)) = map.get("chat_messages") {
913 for msg in messages.iter().take(50) {
914 let msg_content = extract_json_element_content(msg);
915 if !msg_content.is_empty() && msg_content.len() > 10 {
916 parts.push(msg_content);
917 }
918 }
919 }
920
921 if let Some(serde_json::Value::String(name)) = map.get("name")
923 && let Some(serde_json::Value::Array(obs)) = map.get("observations")
924 {
925 let observations: Vec<String> = obs
926 .iter()
927 .filter_map(|v| v.as_str().map(String::from))
928 .take(10)
929 .collect();
930 if !observations.is_empty() {
931 parts.push(format!("{}: {}", name, observations.join("; ")));
932 }
933 }
934
935 for key in ["title", "name", "uuid", "id"] {
937 if let Some(serde_json::Value::String(s)) = map.get(key) {
938 if !s.is_empty() && parts.iter().all(|p| !p.contains(s)) {
939 parts.insert(0, format!("[{}]", s));
940 }
941 break;
942 }
943 }
944
945 if parts.is_empty() {
946 serde_json::to_string(value)
948 .unwrap_or_default()
949 .chars()
950 .take(5000)
951 .collect()
952 } else {
953 parts.join("\n")
954 }
955 }
956 serde_json::Value::Array(arr) => {
957 arr.iter()
959 .take(20)
960 .map(extract_json_element_content)
961 .filter(|s| !s.is_empty())
962 .collect::<Vec<_>>()
963 .join("\n")
964 }
965 _ => value.to_string(),
966 }
967}
968
969fn detect_json_element_type(value: &serde_json::Value) -> &'static str {
971 if let serde_json::Value::Object(map) = value {
972 if map.contains_key("chat_messages") || map.contains_key("mapping") {
974 return "conversation";
975 }
976 if map.contains_key("messages") && map.contains_key("sessions") {
977 return "session";
978 }
979 if map.contains_key("role") && map.contains_key("content") {
980 return "message";
981 }
982 if map.contains_key("observations") && map.contains_key("name") {
983 return "entity";
984 }
985 if map.contains_key("messages") {
986 return "thread";
987 }
988 "object"
989 } else if value.is_array() {
990 "array"
991 } else if value.is_string() {
992 "text"
993 } else {
994 "value"
995 }
996}
997
998fn extract_key_content(text: &str, target_chars: usize) -> String {
1001 if text.len() <= target_chars {
1002 return text.to_string();
1003 }
1004
1005 let sentences: Vec<&str> = text
1007 .split(['.', '!', '?'])
1008 .map(|s| s.trim())
1009 .filter(|s| !s.is_empty())
1010 .collect();
1011
1012 if sentences.is_empty() {
1013 return truncate_at_word_boundary(text, target_chars);
1015 }
1016
1017 let keywords = extract_keywords(text, 10);
1019 let keyword_set: std::collections::HashSet<&str> =
1020 keywords.iter().map(|s| s.as_str()).collect();
1021
1022 let mut scored_sentences: Vec<(usize, f32, &str)> = sentences
1023 .iter()
1024 .enumerate()
1025 .map(|(idx, sentence)| {
1026 let mut score = 0.0_f32;
1027
1028 if idx == 0 {
1030 score += 2.0;
1031 } else if idx == sentences.len() - 1 {
1032 score += 1.5;
1033 }
1034
1035 let words: Vec<&str> = sentence.split_whitespace().collect();
1037 let keyword_count = words
1038 .iter()
1039 .filter(|w| {
1040 let cleaned: String = w
1041 .chars()
1042 .filter(|c| c.is_alphanumeric())
1043 .collect::<String>()
1044 .to_lowercase();
1045 keyword_set.contains(cleaned.as_str())
1046 })
1047 .count();
1048
1049 if !words.is_empty() {
1050 score += (keyword_count as f32 / words.len() as f32) * 3.0;
1051 }
1052
1053 if sentence.len() < 20 {
1055 score -= 0.5;
1056 }
1057
1058 (idx, score, *sentence)
1059 })
1060 .collect();
1061
1062 scored_sentences.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1064
1065 let mut selected_indices: Vec<usize> = Vec::new();
1067 let mut total_len = 0;
1068
1069 for (idx, _, sentence) in &scored_sentences {
1070 let sentence_len = sentence.len() + 2; if total_len + sentence_len > target_chars && !selected_indices.is_empty() {
1072 break;
1073 }
1074 selected_indices.push(*idx);
1075 total_len += sentence_len;
1076 }
1077
1078 selected_indices.sort();
1080
1081 let result: Vec<&str> = selected_indices
1083 .iter()
1084 .filter_map(|&idx| sentences.get(idx).copied())
1085 .collect();
1086
1087 if result.is_empty() {
1088 truncate_at_word_boundary(text, target_chars)
1089 } else {
1090 result.join(". ") + "."
1091 }
1092}
1093
1094fn create_outer_summary(middle_content: &str, keywords: &[String], target_chars: usize) -> String {
1096 let keyword_prefix = if !keywords.is_empty() {
1098 format!(
1099 "[{}] ",
1100 keywords
1101 .iter()
1102 .take(5)
1103 .cloned()
1104 .collect::<Vec<_>>()
1105 .join(", ")
1106 )
1107 } else {
1108 String::new()
1109 };
1110
1111 let remaining_chars = target_chars.saturating_sub(keyword_prefix.len());
1112
1113 let first_sentence = middle_content
1115 .split(['.', '!', '?'])
1116 .next()
1117 .unwrap_or(middle_content)
1118 .trim();
1119
1120 let summary = if first_sentence.len() <= remaining_chars {
1121 first_sentence.to_string()
1122 } else {
1123 truncate_at_word_boundary(first_sentence, remaining_chars)
1124 };
1125
1126 format!("{}{}", keyword_prefix, summary)
1127}
1128
1129fn truncate_at_word_boundary(text: &str, max_chars: usize) -> String {
1131 let char_count = text.chars().count();
1132 if char_count <= max_chars {
1133 return text.to_string();
1134 }
1135
1136 let byte_idx = text
1138 .char_indices()
1139 .nth(max_chars)
1140 .map(|(idx, _)| idx)
1141 .unwrap_or(text.len());
1142
1143 let truncated = &text[..byte_idx];
1144
1145 if let Some(last_space) = truncated.rfind(' ') {
1147 format!("{}...", &text[..last_space])
1148 } else {
1149 format!("{}...", truncated)
1150 }
1151}
1152
1153pub struct RAGPipeline {
1154 mlx_bridge: Arc<Mutex<MLXBridge>>,
1155 storage: Arc<StorageManager>,
1156 bm25_writer: Option<Arc<BM25Index>>,
1157}
1158
1159impl RAGPipeline {
1160 pub async fn new(
1162 mlx_bridge: Arc<Mutex<MLXBridge>>,
1163 storage: Arc<StorageManager>,
1164 ) -> Result<Self> {
1165 Self::new_with_bm25(mlx_bridge, storage, None).await
1166 }
1167
1168 pub async fn new_with_bm25(
1169 mlx_bridge: Arc<Mutex<MLXBridge>>,
1170 storage: Arc<StorageManager>,
1171 bm25_writer: Option<Arc<BM25Index>>,
1172 ) -> Result<Self> {
1173 Ok(Self {
1174 mlx_bridge,
1175 storage,
1176 bm25_writer,
1177 })
1178 }
1179
1180 pub fn storage_manager(&self) -> Arc<StorageManager> {
1181 self.storage.clone()
1182 }
1183
1184 pub async fn refresh(&self) -> Result<()> {
1186 self.storage.refresh().await
1187 }
1188
1189 async fn persist_documents(&self, documents: Vec<ChromaDocument>) -> Result<()> {
1190 if documents.is_empty() {
1191 return Ok(());
1192 }
1193
1194 let mut unique_documents = Vec::with_capacity(documents.len());
1195 let mut seen_ids: HashSet<(String, String)> = HashSet::new();
1196 let mut seen_hashes: HashSet<(String, String)> = HashSet::new();
1197
1198 for mut document in documents {
1199 if let Value::Object(ref mut map) = document.metadata {
1200 map.entry("indexed_at".to_string())
1201 .or_insert_with(|| json!(chrono::Utc::now().to_rfc3339()));
1202 }
1203
1204 let id_key = (document.namespace.clone(), document.id.clone());
1205 if !seen_ids.insert(id_key) {
1206 continue;
1207 }
1208
1209 if let Some(hash) = document.content_hash.as_ref() {
1210 let hash_key = (document.namespace.clone(), hash.clone());
1211 if !seen_hashes.insert(hash_key) {
1212 continue;
1213 }
1214 }
1215
1216 unique_documents.push(document);
1217 }
1218
1219 let documents = self
1220 .filter_documents_against_store(unique_documents)
1221 .await?;
1222 if documents.is_empty() {
1223 return Ok(());
1224 }
1225
1226 let bm25_documents: Vec<(String, String, String)> = documents
1227 .iter()
1228 .map(|doc| (doc.id.clone(), doc.namespace.clone(), doc.document.clone()))
1229 .collect();
1230 let inserted_ids: Vec<(String, String)> = documents
1231 .iter()
1232 .map(|doc| (doc.namespace.clone(), doc.id.clone()))
1233 .collect();
1234
1235 self.storage.add_to_store(documents).await?;
1236
1237 if let Some(bm25_writer) = &self.bm25_writer
1238 && let Err(error) = bm25_writer.add_documents(&bm25_documents).await
1239 {
1240 for (namespace, id) in &inserted_ids {
1241 let _ = self.storage.delete_document(namespace, id).await;
1242 }
1243 return Err(error);
1244 }
1245
1246 Ok(())
1247 }
1248
1249 async fn filter_documents_against_store(
1250 &self,
1251 documents: Vec<ChromaDocument>,
1252 ) -> Result<Vec<ChromaDocument>> {
1253 if documents.is_empty() {
1254 return Ok(vec![]);
1255 }
1256
1257 let mut hashes_by_namespace: HashMap<String, Vec<String>> = HashMap::new();
1258 for document in &documents {
1259 if let Some(hash) = document.content_hash.as_ref() {
1260 hashes_by_namespace
1261 .entry(document.namespace.clone())
1262 .or_default()
1263 .push(hash.clone());
1264 }
1265 }
1266
1267 let mut allowed_hashes: HashMap<String, HashSet<String>> = HashMap::new();
1268 for (namespace, hashes) in hashes_by_namespace {
1269 let hashes = self
1270 .storage
1271 .filter_existing_hashes(&namespace, &hashes)
1272 .await?;
1273 allowed_hashes.insert(
1274 namespace,
1275 hashes.into_iter().cloned().collect::<HashSet<_>>(),
1276 );
1277 }
1278
1279 Ok(documents
1280 .into_iter()
1281 .filter(|document| match document.content_hash.as_ref() {
1282 None => true,
1283 Some(hash) => allowed_hashes
1284 .get(&document.namespace)
1285 .map(|hashes| hashes.contains(hash))
1286 .unwrap_or(true),
1287 })
1288 .collect())
1289 }
1290
1291 async fn clear_namespace_from_indices(&self, namespace: &str) -> Result<usize> {
1292 let deleted = self.storage.delete_namespace_documents(namespace).await?;
1293
1294 if let Some(bm25_writer) = &self.bm25_writer {
1295 bm25_writer.delete_namespace_term(namespace).await?;
1296 }
1297
1298 Ok(deleted)
1299 }
1300
1301 async fn load_memory_family(&self, namespace: &str, id: &str) -> Result<Vec<ChromaDocument>> {
1302 let docs = self.storage.get_all_in_namespace(namespace).await?;
1303 Ok(docs
1304 .into_iter()
1305 .filter(|doc| {
1306 doc.id == id
1307 || doc
1308 .metadata
1309 .get("original_id")
1310 .and_then(|value| value.as_str())
1311 .is_some_and(|original_id| original_id == id)
1312 })
1313 .collect())
1314 }
1315
1316 async fn delete_memory_family(&self, namespace: &str, id: &str) -> Result<usize> {
1317 let family = self.load_memory_family(namespace, id).await?;
1318 if family.is_empty() {
1319 return Ok(0);
1320 }
1321
1322 let mut deleted = 0usize;
1323 let mut ids = Vec::with_capacity(family.len());
1324
1325 for document in family {
1326 deleted += self
1327 .storage
1328 .delete_document(namespace, &document.id)
1329 .await?
1330 .min(1);
1331 ids.push(document.id);
1332 }
1333
1334 if let Some(bm25_writer) = &self.bm25_writer
1335 && !ids.is_empty()
1336 {
1337 bm25_writer.delete_documents(&ids).await?;
1338 }
1339
1340 Ok(deleted)
1341 }
1342
1343 fn preferred_memory_family_document(
1344 mut family: Vec<ChromaDocument>,
1345 requested_id: &str,
1346 ) -> Option<ChromaDocument> {
1347 fn rank(layer: Option<SliceLayer>) -> u8 {
1348 match layer {
1349 None => 0,
1350 Some(SliceLayer::Outer) => 1,
1351 Some(SliceLayer::Middle) => 2,
1352 Some(SliceLayer::Inner) => 3,
1353 Some(SliceLayer::Core) => 4,
1354 }
1355 }
1356
1357 family.sort_by_key(|document| {
1358 if document.id == requested_id {
1359 (0_u8, 0_u8)
1360 } else {
1361 (1_u8, rank(document.slice_layer()))
1362 }
1363 });
1364
1365 family.into_iter().next()
1366 }
1367
1368 pub fn mlx_connected_to(&self) -> String {
1370 if let Ok(bridge) = self.mlx_bridge.try_lock() {
1372 bridge.connected_to().to_string()
1373 } else {
1374 "mlx (lock held)".to_string()
1375 }
1376 }
1377
1378 pub async fn index_document(&self, path: &Path, namespace: Option<&str>) -> Result<()> {
1379 self.index_document_with_mode(path, namespace, SliceMode::default())
1380 .await
1381 }
1382
1383 pub async fn index_document_with_mode(
1385 &self,
1386 path: &Path,
1387 namespace: Option<&str>,
1388 slice_mode: SliceMode,
1389 ) -> Result<()> {
1390 self.index_document_internal(path, namespace, None, slice_mode)
1391 .await
1392 }
1393
1394 pub async fn index_document_with_preprocessing(
1396 &self,
1397 path: &Path,
1398 namespace: Option<&str>,
1399 preprocess_config: PreprocessingConfig,
1400 ) -> Result<()> {
1401 self.index_document_internal(path, namespace, Some(preprocess_config), SliceMode::Flat)
1402 .await
1403 }
1404
1405 pub async fn index_document_with_dedup(
1410 &self,
1411 path: &Path,
1412 namespace: Option<&str>,
1413 slice_mode: SliceMode,
1414 ) -> Result<IndexResult> {
1415 let validated_path = crate::path_utils::validate_read_path(path)?;
1417 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
1418
1419 let is_json = validated_path
1422 .extension()
1423 .and_then(|e| e.to_str())
1424 .map(|e| e.eq_ignore_ascii_case("json"))
1425 .unwrap_or(false);
1426
1427 if is_json || matches!(slice_mode, SliceMode::Onion | SliceMode::OnionFast) {
1428 return self
1429 .index_document_with_json_awareness(&validated_path, ns, slice_mode)
1430 .await;
1431 }
1432
1433 let text = self.extract_text(&validated_path).await?;
1435
1436 let content_hash = compute_content_hash(&text);
1438
1439 if self.storage.has_content_hash(ns, &content_hash).await? {
1441 debug!(
1442 "Skipping duplicate content: {} (hash: {})",
1443 path.display(),
1444 &content_hash[..16]
1445 );
1446 return Ok(IndexResult::Skipped {
1447 reason: "exact duplicate".to_string(),
1448 content_hash,
1449 });
1450 }
1451
1452 let base_metadata = json!({
1453 "path": path.to_str(),
1454 "slice_mode": "flat",
1455 "content_hash": &content_hash,
1456 });
1457
1458 let chunks_indexed = self
1459 .index_with_flat_chunking_and_hash(&text, ns, path, base_metadata, &content_hash)
1460 .await?;
1461
1462 Ok(IndexResult::Indexed {
1463 chunks_indexed,
1464 content_hash,
1465 })
1466 }
1467
1468 async fn index_document_with_json_awareness(
1474 &self,
1475 path: &Path,
1476 namespace: &str,
1477 slice_mode: SliceMode,
1478 ) -> Result<IndexResult> {
1479 let documents = self.extract_json_documents(path).await?;
1481
1482 let mut total_chunks = 0;
1483 let mut skipped_docs = 0;
1484 let file_content_hash = match crate::path_utils::safe_read_to_string_async(path).await {
1485 Ok((_p, content)) => compute_content_hash(&content),
1486 Err(_) => compute_content_hash(""),
1487 };
1488
1489 for (doc_id, content, mut doc_metadata) in documents {
1490 if content.len() < 50 {
1491 continue; }
1493
1494 let doc_hash = compute_content_hash(&content);
1496
1497 if self.storage.has_content_hash(namespace, &doc_hash).await? {
1499 skipped_docs += 1;
1500 continue;
1501 }
1502
1503 if let serde_json::Value::Object(ref mut map) = doc_metadata {
1505 map.insert("doc_id".to_string(), json!(doc_id));
1506 map.insert("content_hash".to_string(), json!(doc_hash));
1507 map.insert("file_hash".to_string(), json!(&file_content_hash));
1508 map.insert(
1509 "slice_mode".to_string(),
1510 json!(match slice_mode {
1511 SliceMode::Onion => "onion",
1512 SliceMode::OnionFast => "onion-fast",
1513 SliceMode::Flat => "flat",
1514 }),
1515 );
1516 }
1517
1518 let is_conversation = doc_metadata
1520 .get("type")
1521 .and_then(|v| v.as_str())
1522 .map(|t| t == "conversation")
1523 .unwrap_or(false);
1524
1525 let chunks = if is_conversation {
1526 self.index_conversation_message_direct(
1528 &doc_id,
1529 &content,
1530 namespace,
1531 doc_metadata,
1532 &doc_hash,
1533 )
1534 .await?
1535 } else {
1536 match slice_mode {
1538 SliceMode::Onion => {
1539 self.index_with_onion_slicing_and_hash(
1540 &content,
1541 namespace,
1542 doc_metadata,
1543 &doc_hash,
1544 )
1545 .await?
1546 }
1547 SliceMode::OnionFast => {
1548 self.index_with_onion_slicing_fast_and_hash(
1549 &content,
1550 namespace,
1551 doc_metadata,
1552 &doc_hash,
1553 )
1554 .await?
1555 }
1556 SliceMode::Flat => {
1557 self.index_with_flat_chunking_and_hash(
1558 &content,
1559 namespace,
1560 path,
1561 doc_metadata,
1562 &doc_hash,
1563 )
1564 .await?
1565 }
1566 }
1567 };
1568
1569 total_chunks += chunks;
1570 }
1571
1572 if total_chunks == 0 && skipped_docs > 0 {
1573 return Ok(IndexResult::Skipped {
1574 reason: format!("all {} documents already indexed", skipped_docs),
1575 content_hash: file_content_hash,
1576 });
1577 }
1578
1579 tracing::info!(
1580 "JSON-aware indexing: {} -> {} chunks ({} docs skipped)",
1581 path.display(),
1582 total_chunks,
1583 skipped_docs
1584 );
1585
1586 Ok(IndexResult::Indexed {
1587 chunks_indexed: total_chunks,
1588 content_hash: file_content_hash,
1589 })
1590 }
1591
1592 pub async fn index_document_with_preprocessing_and_dedup(
1594 &self,
1595 path: &Path,
1596 namespace: Option<&str>,
1597 preprocess_config: PreprocessingConfig,
1598 ) -> Result<IndexResult> {
1599 let text = self.extract_text(path).await?;
1600 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
1601
1602 let content_hash = compute_content_hash(&text);
1604
1605 if self.storage.has_content_hash(ns, &content_hash).await? {
1607 debug!(
1608 "Skipping duplicate content: {} (hash: {})",
1609 path.display(),
1610 &content_hash[..16]
1611 );
1612 return Ok(IndexResult::Skipped {
1613 reason: "exact duplicate".to_string(),
1614 content_hash,
1615 });
1616 }
1617
1618 let preprocessor = Preprocessor::new(preprocess_config);
1620 let cleaned = preprocessor.extract_semantic_content(&text);
1621 tracing::info!(
1622 "Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
1623 text.len(),
1624 cleaned.len(),
1625 (1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
1626 );
1627
1628 let base_metadata = json!({
1629 "path": path.to_str(),
1630 "slice_mode": "flat",
1631 "content_hash": &content_hash,
1632 });
1633
1634 let chunks_indexed = self
1635 .index_with_flat_chunking_and_hash(&cleaned, ns, path, base_metadata, &content_hash)
1636 .await?;
1637
1638 Ok(IndexResult::Indexed {
1639 chunks_indexed,
1640 content_hash,
1641 })
1642 }
1643
1644 async fn index_document_internal(
1645 &self,
1646 path: &Path,
1647 namespace: Option<&str>,
1648 preprocess_config: Option<PreprocessingConfig>,
1649 slice_mode: SliceMode,
1650 ) -> Result<()> {
1651 let validated_path = crate::path_utils::validate_read_path(path)?;
1653 let text = self.extract_text(&validated_path).await?;
1654
1655 let text = if let Some(config) = preprocess_config {
1657 let preprocessor = Preprocessor::new(config);
1658 let cleaned = preprocessor.extract_semantic_content(&text);
1659 tracing::info!(
1660 "Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
1661 text.len(),
1662 cleaned.len(),
1663 (1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
1664 );
1665 cleaned
1666 } else {
1667 text
1668 };
1669
1670 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
1671 let base_metadata = json!({
1672 "path": validated_path.to_str(),
1673 "slice_mode": match slice_mode {
1674 SliceMode::Onion => "onion",
1675 SliceMode::OnionFast => "onion-fast",
1676 SliceMode::Flat => "flat",
1677 }
1678 });
1679
1680 match slice_mode {
1681 SliceMode::Onion => {
1682 self.index_with_onion_slicing(&text, ns, base_metadata)
1683 .await
1684 }
1685 SliceMode::OnionFast => {
1686 self.index_with_onion_slicing_fast(&text, ns, base_metadata)
1687 .await
1688 }
1689 SliceMode::Flat => {
1690 self.index_with_flat_chunking(&text, ns, path, base_metadata)
1691 .await
1692 }
1693 }
1694 }
1695
1696 async fn index_with_onion_slicing(
1698 &self,
1699 text: &str,
1700 namespace: &str,
1701 base_metadata: serde_json::Value,
1702 ) -> Result<()> {
1703 let config = OnionSliceConfig::default();
1704 let slices = create_onion_slices(text, &base_metadata, &config);
1705 let total_slices = slices.len();
1706
1707 tracing::info!(
1708 "Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
1709 text.len(),
1710 total_slices
1711 );
1712
1713 let mut total_stored = 0;
1715 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
1716 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
1718 let embeddings = self.embed_chunks(&batch_contents).await?;
1719
1720 let mut batch_docs = Vec::with_capacity(batch.len());
1722 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
1723 let mut metadata = base_metadata.clone();
1724 if let serde_json::Value::Object(ref mut map) = metadata {
1725 map.insert("layer".to_string(), json!(slice.layer.name()));
1726 map.insert("keywords".to_string(), json!(slice.keywords));
1727 }
1728
1729 let doc = ChromaDocument::from_onion_slice(
1730 slice,
1731 namespace.to_string(),
1732 embedding.clone(),
1733 metadata,
1734 );
1735 batch_docs.push(doc);
1736 }
1737
1738 self.persist_documents(batch_docs).await?;
1740 total_stored += batch.len();
1741 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
1742 }
1743
1744 Ok(())
1745 }
1746
1747 async fn index_with_onion_slicing_fast(
1749 &self,
1750 text: &str,
1751 namespace: &str,
1752 base_metadata: serde_json::Value,
1753 ) -> Result<()> {
1754 let config = OnionSliceConfig::default();
1755 let slices = create_onion_slices_fast(text, &base_metadata, &config);
1756 let total_slices = slices.len();
1757
1758 tracing::info!(
1759 "Fast onion slicing: {} chars -> {} slices (outer/core only)",
1760 text.len(),
1761 total_slices
1762 );
1763
1764 let mut total_stored = 0;
1765 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
1766 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
1767 let embeddings = self.embed_chunks(&batch_contents).await?;
1768
1769 let mut batch_docs = Vec::with_capacity(batch.len());
1770 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
1771 let mut metadata = base_metadata.clone();
1772 if let serde_json::Value::Object(ref mut map) = metadata {
1773 map.insert("layer".to_string(), json!(slice.layer.name()));
1774 map.insert("keywords".to_string(), json!(slice.keywords));
1775 }
1776
1777 let doc = ChromaDocument::from_onion_slice(
1778 slice,
1779 namespace.to_string(),
1780 embedding.clone(),
1781 metadata,
1782 );
1783 batch_docs.push(doc);
1784 }
1785
1786 self.persist_documents(batch_docs).await?;
1787 total_stored += batch.len();
1788 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
1789 }
1790
1791 Ok(())
1792 }
1793
1794 async fn index_with_onion_slicing_and_hash(
1796 &self,
1797 text: &str,
1798 namespace: &str,
1799 base_metadata: serde_json::Value,
1800 content_hash: &str,
1801 ) -> Result<usize> {
1802 let config = OnionSliceConfig::default();
1803 let slices = create_onion_slices(text, &base_metadata, &config);
1804 let total_slices = slices.len();
1805
1806 tracing::info!(
1807 "Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
1808 text.len(),
1809 total_slices
1810 );
1811
1812 let mut total_stored = 0;
1814 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
1815 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
1817 let embeddings = self.embed_chunks(&batch_contents).await?;
1818
1819 let mut batch_docs = Vec::with_capacity(batch.len());
1821 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
1822 let mut metadata = base_metadata.clone();
1823 if let serde_json::Value::Object(ref mut map) = metadata {
1824 map.insert("layer".to_string(), json!(slice.layer.name()));
1825 map.insert("keywords".to_string(), json!(slice.keywords));
1826 }
1827
1828 let slice_hash = compute_content_hash(&slice.content);
1830 if let serde_json::Value::Object(ref mut map) = metadata {
1831 map.insert("file_hash".to_string(), json!(content_hash));
1832 }
1833 let doc = ChromaDocument::from_onion_slice_with_hash(
1834 slice,
1835 namespace.to_string(),
1836 embedding.clone(),
1837 metadata,
1838 slice_hash,
1839 );
1840 batch_docs.push(doc);
1841 }
1842
1843 self.persist_documents(batch_docs).await?;
1845 total_stored += batch.len();
1846 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
1847 }
1848
1849 Ok(total_slices)
1850 }
1851
1852 async fn index_with_onion_slicing_fast_and_hash(
1855 &self,
1856 text: &str,
1857 namespace: &str,
1858 base_metadata: serde_json::Value,
1859 content_hash: &str,
1860 ) -> Result<usize> {
1861 let config = OnionSliceConfig::default();
1862 let slices = create_onion_slices_fast(text, &base_metadata, &config);
1863 let total_slices = slices.len();
1864
1865 tracing::info!(
1866 "Fast onion slicing: {} chars -> {} slices (outer/core only)",
1867 text.len(),
1868 total_slices
1869 );
1870
1871 let mut total_stored = 0;
1873 for batch in slices.chunks(STORAGE_BATCH_SIZE) {
1874 let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
1875 let embeddings = self.embed_chunks(&batch_contents).await?;
1876
1877 let mut batch_docs = Vec::with_capacity(batch.len());
1878 for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
1879 let mut metadata = base_metadata.clone();
1880 if let serde_json::Value::Object(ref mut map) = metadata {
1881 map.insert("layer".to_string(), json!(slice.layer.name()));
1882 map.insert("keywords".to_string(), json!(slice.keywords));
1883 }
1884
1885 let slice_hash = compute_content_hash(&slice.content);
1887 if let serde_json::Value::Object(ref mut map) = metadata {
1888 map.insert("file_hash".to_string(), json!(content_hash));
1889 }
1890 let doc = ChromaDocument::from_onion_slice_with_hash(
1891 slice,
1892 namespace.to_string(),
1893 embedding.clone(),
1894 metadata,
1895 slice_hash,
1896 );
1897 batch_docs.push(doc);
1898 }
1899
1900 self.persist_documents(batch_docs).await?;
1901 total_stored += batch.len();
1902 tracing::info!("Stored {}/{} slices", total_stored, total_slices);
1903 }
1904
1905 Ok(total_slices)
1906 }
1907
1908 async fn index_with_flat_chunking(
1910 &self,
1911 text: &str,
1912 namespace: &str,
1913 path: &Path,
1914 base_metadata: serde_json::Value,
1915 ) -> Result<()> {
1916 let chunks = self.chunk_text(text, 512, 128)?;
1918 let total_chunks = chunks.len();
1919
1920 tracing::info!(
1921 "Flat chunking: {} chars -> {} chunks",
1922 text.len(),
1923 total_chunks
1924 );
1925
1926 let mut total_stored = 0;
1928 let mut global_idx = 0;
1929 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
1930 let embeddings = self.embed_chunks(batch).await?;
1932
1933 let mut batch_docs = Vec::with_capacity(batch.len());
1935 for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
1936 let mut metadata = base_metadata.clone();
1937 if let serde_json::Value::Object(ref mut map) = metadata {
1938 map.insert("chunk_index".to_string(), json!(global_idx));
1939 map.insert("total_chunks".to_string(), json!(total_chunks));
1940 }
1941
1942 let doc = ChromaDocument::new_flat(
1943 format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
1944 namespace.to_string(),
1945 embedding.clone(),
1946 metadata,
1947 chunk.clone(),
1948 );
1949 batch_docs.push(doc);
1950 global_idx += 1;
1951 }
1952
1953 self.persist_documents(batch_docs).await?;
1955 total_stored += batch.len();
1956 tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
1957 }
1958
1959 Ok(())
1960 }
1961
1962 async fn index_conversation_message_direct(
1966 &self,
1967 doc_id: &str,
1968 text: &str,
1969 namespace: &str,
1970 metadata: serde_json::Value,
1971 content_hash: &str,
1972 ) -> Result<usize> {
1973 let embedding = self.embed_query(text).await?;
1975
1976 let doc = ChromaDocument::new_flat_with_hash(
1978 doc_id.to_string(),
1979 namespace.to_string(),
1980 embedding,
1981 metadata,
1982 text.to_string(),
1983 content_hash.to_string(),
1984 );
1985
1986 self.persist_documents(vec![doc]).await?;
1988
1989 tracing::debug!(
1990 "Conversation message stored directly: {} ({} chars)",
1991 doc_id,
1992 text.len()
1993 );
1994
1995 Ok(1) }
1997
1998 async fn index_with_flat_chunking_and_hash(
2000 &self,
2001 text: &str,
2002 namespace: &str,
2003 path: &Path,
2004 base_metadata: serde_json::Value,
2005 content_hash: &str,
2006 ) -> Result<usize> {
2007 let chunks = self.chunk_text(text, 512, 128)?;
2009 let total_chunks = chunks.len();
2010
2011 tracing::info!(
2012 "Flat chunking: {} chars -> {} chunks",
2013 text.len(),
2014 total_chunks
2015 );
2016
2017 let mut total_stored = 0;
2019 let mut global_idx = 0;
2020 for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
2021 let embeddings = self.embed_chunks(batch).await?;
2023
2024 let mut batch_docs = Vec::with_capacity(batch.len());
2026 for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
2027 let mut metadata = base_metadata.clone();
2028 if let serde_json::Value::Object(ref mut map) = metadata {
2029 map.insert("chunk_index".to_string(), json!(global_idx));
2030 map.insert("total_chunks".to_string(), json!(total_chunks));
2031 }
2032
2033 let chunk_hash = compute_content_hash(chunk);
2035 if let serde_json::Value::Object(ref mut map) = metadata {
2036 map.insert("file_hash".to_string(), json!(content_hash));
2037 }
2038 let doc = ChromaDocument::new_flat_with_hash(
2039 format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
2040 namespace.to_string(),
2041 embedding.clone(),
2042 metadata,
2043 chunk.clone(),
2044 chunk_hash,
2045 );
2046 batch_docs.push(doc);
2047 global_idx += 1;
2048 }
2049
2050 self.persist_documents(batch_docs).await?;
2052 total_stored += batch.len();
2053 tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
2054 }
2055
2056 Ok(total_chunks)
2057 }
2058
2059 pub async fn index_text(
2060 &self,
2061 namespace: Option<&str>,
2062 id: String,
2063 text: String,
2064 metadata: serde_json::Value,
2065 ) -> Result<String> {
2066 self.index_text_with_mode(namespace, id, text, metadata, SliceMode::default())
2067 .await
2068 }
2069
2070 pub async fn index_text_with_mode(
2072 &self,
2073 namespace: Option<&str>,
2074 id: String,
2075 text: String,
2076 metadata: serde_json::Value,
2077 slice_mode: SliceMode,
2078 ) -> Result<String> {
2079 let ns = namespace.unwrap_or(DEFAULT_NAMESPACE).to_string();
2080 let slice_mode_name = match slice_mode {
2081 SliceMode::Onion => "onion",
2082 SliceMode::OnionFast => "onion-fast",
2083 SliceMode::Flat => "flat",
2084 };
2085
2086 match slice_mode {
2087 SliceMode::Onion | SliceMode::OnionFast => {
2088 let config = OnionSliceConfig::default();
2090 let slices = if slice_mode == SliceMode::OnionFast {
2091 create_onion_slices_fast(&text, &metadata, &config)
2092 } else {
2093 create_onion_slices(&text, &metadata, &config)
2094 };
2095
2096 let slice_contents: Vec<String> =
2097 slices.iter().map(|s| s.content.clone()).collect();
2098 let embeddings = self.embed_chunks(&slice_contents).await?;
2099
2100 let mut documents = Vec::with_capacity(slices.len());
2101 for (slice, embedding) in slices.iter().zip(embeddings.iter()) {
2102 let mut meta = metadata.clone();
2103 if let serde_json::Value::Object(ref mut map) = meta {
2104 map.insert("layer".to_string(), json!(slice.layer.name()));
2105 map.insert("original_id".to_string(), json!(id));
2106 map.insert("slice_mode".to_string(), json!(slice_mode_name));
2107 }
2108
2109 let doc = ChromaDocument::from_onion_slice(
2110 slice,
2111 ns.clone(),
2112 embedding.clone(),
2113 meta,
2114 );
2115 documents.push(doc);
2116 }
2117
2118 self.persist_documents(documents).await?;
2119
2120 Ok(slices
2122 .iter()
2123 .find(|s| s.layer == SliceLayer::Outer)
2124 .map(|s| s.id.clone())
2125 .unwrap_or(id))
2126 }
2127 SliceMode::Flat => {
2128 let embedding = self.embed_query(&text).await?;
2129 let mut metadata = metadata;
2130 if let serde_json::Value::Object(ref mut map) = metadata {
2131 map.insert("slice_mode".to_string(), json!(slice_mode_name));
2132 }
2133 let doc = ChromaDocument::new_flat(id.clone(), ns, embedding, metadata, text);
2134 self.persist_documents(vec![doc]).await?;
2135 Ok(id)
2136 }
2137 }
2138 }
2139
2140 pub async fn memory_upsert(
2141 &self,
2142 namespace: &str,
2143 id: String,
2144 text: String,
2145 metadata: serde_json::Value,
2146 ) -> Result<()> {
2147 let slice_mode = match metadata
2148 .get("slice_mode")
2149 .and_then(|value| value.as_str())
2150 .map(|value| value.to_ascii_lowercase())
2151 .as_deref()
2152 {
2153 Some("onion") => SliceMode::Onion,
2154 Some("onion-fast") | Some("onion_fast") | Some("fast") => SliceMode::OnionFast,
2155 Some("flat") | None => SliceMode::Flat,
2156 Some(other) => {
2157 return Err(anyhow!(
2158 "Unsupported metadata.slice_mode '{}'. Use 'flat', 'onion', or 'onion-fast'.",
2159 other
2160 ));
2161 }
2162 };
2163
2164 self.delete_memory_family(namespace, &id).await?;
2165 self.index_text_with_mode(Some(namespace), id, text, metadata, slice_mode)
2166 .await?;
2167 Ok(())
2168 }
2169
2170 pub async fn lookup_memory(&self, namespace: &str, id: &str) -> Result<Option<SearchResult>> {
2171 if let Some(doc) = self.storage.get_document(namespace, id).await? {
2172 let layer = doc.slice_layer();
2173 return Ok(Some(SearchResult {
2174 id: doc.id,
2175 namespace: doc.namespace,
2176 text: doc.document,
2177 score: 1.0,
2178 metadata: doc.metadata,
2179 layer,
2180 parent_id: doc.parent_id,
2181 children_ids: doc.children_ids,
2182 keywords: doc.keywords,
2183 }));
2184 }
2185
2186 if let Some(doc) = Self::preferred_memory_family_document(
2187 self.load_memory_family(namespace, id).await?,
2188 id,
2189 ) {
2190 let layer = doc.slice_layer();
2191 return Ok(Some(SearchResult {
2192 id: doc.id,
2193 namespace: doc.namespace,
2194 text: doc.document,
2195 score: 1.0,
2196 metadata: doc.metadata,
2197 layer,
2198 parent_id: doc.parent_id,
2199 children_ids: doc.children_ids,
2200 keywords: doc.keywords,
2201 }));
2202 }
2203
2204 Ok(None)
2205 }
2206
2207 pub async fn remove_memory(&self, namespace: &str, id: &str) -> Result<usize> {
2208 self.delete_memory_family(namespace, id).await
2209 }
2210
2211 pub async fn clear_namespace(&self, namespace: &str) -> Result<usize> {
2212 self.clear_namespace_from_indices(namespace).await
2213 }
2214
2215 pub async fn search_memory(
2216 &self,
2217 namespace: &str,
2218 query: &str,
2219 k: usize,
2220 ) -> Result<Vec<SearchResult>> {
2221 self.search_with_options(Some(namespace), query, k, SearchOptions::default())
2222 .await
2223 }
2224
2225 pub async fn memory_search_with_layer(
2227 &self,
2228 namespace: &str,
2229 query: &str,
2230 k: usize,
2231 layer: Option<SliceLayer>,
2232 ) -> Result<Vec<SearchResult>> {
2233 self.search_with_options(
2234 Some(namespace),
2235 query,
2236 k,
2237 SearchOptions {
2238 layer_filter: layer,
2239 project_filter: None,
2240 },
2241 )
2242 .await
2243 }
2244
2245 pub async fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
2246 self.search_inner(None, query, k).await
2247 }
2248
2249 pub async fn search_inner(
2251 &self,
2252 namespace: Option<&str>,
2253 query: &str,
2254 k: usize,
2255 ) -> Result<Vec<SearchResult>> {
2256 self.search_with_options(namespace, query, k, SearchOptions::default())
2257 .await
2258 }
2259
2260 pub async fn search_with_options(
2262 &self,
2263 namespace: Option<&str>,
2264 query: &str,
2265 k: usize,
2266 options: SearchOptions,
2267 ) -> Result<Vec<SearchResult>> {
2268 let query_embedding = self.embed_query(query).await?;
2269 let candidate_multiplier = if options.project_filter.is_some() {
2270 8
2271 } else {
2272 3
2273 };
2274
2275 let mut candidates = self
2276 .storage
2277 .search_store_with_layer(
2278 namespace,
2279 query_embedding.clone(),
2280 k * candidate_multiplier,
2281 options.layer_filter,
2282 )
2283 .await?;
2284
2285 if let Some(project) = options.project_filter.as_deref() {
2286 candidates.retain(|candidate| metadata_matches_project(&candidate.metadata, project));
2287 }
2288
2289 if !candidates.is_empty() {
2291 let documents: Vec<String> = candidates.iter().map(|c| c.document.clone()).collect();
2292 let metadatas: Vec<serde_json::Value> =
2293 candidates.iter().map(|c| c.metadata.clone()).collect();
2294
2295 let reranked = match self.mlx_bridge.lock().await.rerank(query, &documents).await {
2297 Ok(r) => Some(r),
2298 Err(e) => {
2299 tracing::warn!("MLX rerank failed, using cosine fallback: {}", e);
2300 None
2301 }
2302 };
2303
2304 let reranked = if let Some(r) = reranked {
2305 r
2306 } else {
2307 let doc_embeddings = self.ensure_doc_embeddings(&documents, &candidates).await?;
2309 let scores = doc_embeddings
2310 .iter()
2311 .enumerate()
2312 .map(|(idx, emb)| (idx, cosine(&query_embedding, emb)))
2313 .collect::<Vec<_>>();
2314 let mut scores = scores;
2315 scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2316 scores
2317 };
2318
2319 let results: Vec<SearchResult> = reranked
2321 .into_iter()
2322 .take(k)
2323 .filter_map(|(idx, score)| {
2324 candidates.get(idx).map(|candidate| {
2325 SearchResult {
2326 id: candidate.id.clone(),
2327 namespace: candidate.namespace.clone(),
2328 text: candidate.document.clone(),
2329 score,
2330 metadata: metadatas.get(idx).cloned().unwrap_or_else(|| json!({})),
2331 layer: candidate.slice_layer(),
2333 parent_id: candidate.parent_id.clone(),
2334 children_ids: candidate.children_ids.clone(),
2335 keywords: candidate.keywords.clone(),
2336 }
2337 })
2338 })
2339 .collect();
2340
2341 return Ok(results);
2342 }
2343
2344 Ok(vec![])
2345 }
2346
2347 pub async fn expand_result(&self, namespace: &str, id: &str) -> Result<Vec<SearchResult>> {
2349 let children = self.storage.get_children(namespace, id).await?;
2350 Ok(children
2351 .into_iter()
2352 .map(|doc| {
2353 let layer = doc.slice_layer();
2354 SearchResult {
2355 id: doc.id,
2356 namespace: doc.namespace,
2357 text: doc.document,
2358 score: 1.0,
2359 metadata: doc.metadata,
2360 layer,
2361 parent_id: doc.parent_id,
2362 children_ids: doc.children_ids,
2363 keywords: doc.keywords,
2364 }
2365 })
2366 .collect())
2367 }
2368
2369 pub async fn get_parent_result(
2371 &self,
2372 namespace: &str,
2373 id: &str,
2374 ) -> Result<Option<SearchResult>> {
2375 if let Some(parent) = self.storage.get_parent(namespace, id).await? {
2376 let layer = parent.slice_layer();
2377 return Ok(Some(SearchResult {
2378 id: parent.id,
2379 namespace: parent.namespace,
2380 text: parent.document,
2381 score: 1.0,
2382 metadata: parent.metadata,
2383 layer,
2384 parent_id: parent.parent_id,
2385 children_ids: parent.children_ids,
2386 keywords: parent.keywords,
2387 }));
2388 }
2389 Ok(None)
2390 }
2391
2392 async fn extract_text(&self, path: &Path) -> Result<String> {
2393 let ext = path
2394 .extension()
2395 .and_then(|e| e.to_str())
2396 .unwrap_or("")
2397 .to_lowercase();
2398
2399 if ext == "pdf" {
2400 let path = path.to_path_buf();
2402 let pdf_text =
2403 tokio::task::spawn_blocking(move || pdf_extract::extract_text(&path)).await??;
2404 return Ok(pdf_text);
2405 }
2406
2407 let (_p, content) = crate::path_utils::safe_read_to_string_async(path).await?;
2409 Ok(content)
2410 }
2411
2412 async fn extract_json_documents(
2420 &self,
2421 path: &Path,
2422 ) -> Result<Vec<(String, String, serde_json::Value)>> {
2423 let ext = path
2424 .extension()
2425 .and_then(|e| e.to_str())
2426 .unwrap_or("")
2427 .to_lowercase();
2428
2429 if ext != "json" {
2431 let text = self.extract_text(path).await?;
2432 let doc_id = format!("{}:0", path.display());
2433 let metadata = json!({ "path": path.to_str(), "index": 0 });
2434 return Ok(vec![(doc_id, text, metadata)]);
2435 }
2436
2437 let (_p, raw) = crate::path_utils::safe_read_to_string_async(path).await?;
2439 let parsed: serde_json::Value = match serde_json::from_str(&raw) {
2440 Ok(v) => v,
2441 Err(_) => {
2442 let doc_id = format!("{}:0", path.display());
2444 let metadata = json!({ "path": path.to_str(), "index": 0 });
2445 return Ok(vec![(doc_id, raw, metadata)]);
2446 }
2447 };
2448
2449 if let serde_json::Value::Array(arr) = parsed {
2451 let mut docs = Vec::new();
2452 let mut used_smart_extraction = false;
2453
2454 for item in arr.iter() {
2456 if let Some(mut conv_docs) = extract_conversation_documents(item, path) {
2457 docs.append(&mut conv_docs);
2458 used_smart_extraction = true;
2459 }
2460 }
2461
2462 if used_smart_extraction && !docs.is_empty() {
2464 tracing::info!(
2465 "Conversation array detected: {} -> {} messages",
2466 path.display(),
2467 docs.len()
2468 );
2469 return Ok(docs);
2470 }
2471
2472 docs.clear();
2474 for (idx, item) in arr.iter().enumerate() {
2475 let doc_id = format!("{}:{}", path.display(), idx);
2476 let content = extract_json_element_content(item);
2477 if content.len() > 50 {
2478 let metadata = json!({
2480 "path": path.to_str(),
2481 "index": idx,
2482 "total_elements": arr.len(),
2483 "element_type": detect_json_element_type(item),
2484 });
2485 docs.push((doc_id, content, metadata));
2486 }
2487 }
2488 if docs.is_empty() {
2489 let doc_id = format!("{}:0", path.display());
2491 let metadata = json!({ "path": path.to_str(), "index": 0 });
2492 return Ok(vec![(doc_id, raw, metadata)]);
2493 }
2494 tracing::info!(
2495 "JSON array detected: {} -> {} documents",
2496 path.display(),
2497 docs.len()
2498 );
2499 return Ok(docs);
2500 }
2501
2502 if let Some(docs) = extract_conversation_documents(&parsed, path) {
2504 return Ok(docs);
2505 }
2506
2507 let content = extract_json_element_content(&parsed);
2509 let doc_id = format!("{}:0", path.display());
2510 let metadata = json!({ "path": path.to_str(), "index": 0 });
2511 Ok(vec![(doc_id, content, metadata)])
2512 }
2513
2514 async fn embed_chunks(&self, chunks: &[String]) -> Result<Vec<Vec<f32>>> {
2515 self.mlx_bridge.lock().await.embed_batch(chunks).await
2517 }
2518
2519 async fn embed_query(&self, query: &str) -> Result<Vec<f32>> {
2520 self.mlx_bridge.lock().await.embed(query).await
2521 }
2522
2523 async fn ensure_doc_embeddings(
2524 &self,
2525 documents: &[String],
2526 candidates: &[ChromaDocument],
2527 ) -> Result<Vec<Vec<f32>>> {
2528 let has_all = candidates.iter().all(|c| !c.embedding.is_empty());
2530 if has_all {
2531 return Ok(candidates.iter().map(|c| c.embedding.clone()).collect());
2532 }
2533
2534 self.mlx_bridge.lock().await.embed_batch(documents).await
2535 }
2536
2537 fn chunk_text(&self, text: &str, target_size: usize, overlap: usize) -> Result<Vec<String>> {
2544 let sentences = split_into_sentences(text);
2545
2546 if sentences.is_empty() {
2547 return Ok(vec![text.to_string()]);
2548 }
2549
2550 if text.chars().count() <= target_size {
2552 return Ok(vec![text.to_string()]);
2553 }
2554
2555 let mut chunks = Vec::new();
2556 let mut current_chunk = String::new();
2557 let mut overlap_sentences: Vec<String> = Vec::new();
2558
2559 let overlap_sentence_count = (overlap / 50).clamp(1, 3);
2561
2562 for sentence in &sentences {
2563 let sentence_len = sentence.chars().count();
2564 let current_len = current_chunk.chars().count();
2565
2566 let max_size = target_size + target_size / 2;
2568 if current_len + sentence_len > max_size && !current_chunk.is_empty() {
2569 chunks.push(current_chunk.trim().to_string());
2570
2571 current_chunk = overlap_sentences.join(" ");
2573 if !current_chunk.is_empty() {
2574 current_chunk.push(' ');
2575 }
2576 overlap_sentences.clear();
2577 }
2578
2579 current_chunk.push_str(sentence);
2580 current_chunk.push(' ');
2581
2582 overlap_sentences.push(sentence.clone());
2584 if overlap_sentences.len() > overlap_sentence_count {
2585 overlap_sentences.remove(0);
2586 }
2587
2588 if current_chunk.chars().count() >= target_size {
2590 chunks.push(current_chunk.trim().to_string());
2591
2592 current_chunk = overlap_sentences.join(" ");
2594 if !current_chunk.is_empty() {
2595 current_chunk.push(' ');
2596 }
2597 overlap_sentences.clear();
2598 }
2599 }
2600
2601 let remaining = current_chunk.trim();
2603 if !remaining.is_empty() {
2604 if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
2606 let last_idx = chunks.len() - 1;
2607 chunks[last_idx].push(' ');
2608 chunks[last_idx].push_str(remaining);
2609 } else {
2610 chunks.push(remaining.to_string());
2611 }
2612 }
2613
2614 if chunks.is_empty() {
2616 chunks.push(text.to_string());
2617 }
2618
2619 Ok(chunks)
2620 }
2621}
2622
2623#[derive(Debug, Clone)]
2636pub struct ContextPrefixConfig {
2637 pub include_source: bool,
2639 pub include_section: bool,
2641 pub include_doc_type: bool,
2643 pub max_prefix_length: usize,
2645}
2646
2647impl Default for ContextPrefixConfig {
2648 fn default() -> Self {
2649 Self {
2650 include_source: true,
2651 include_section: true,
2652 include_doc_type: true,
2653 max_prefix_length: 100,
2654 }
2655 }
2656}
2657
2658#[derive(Debug, Clone)]
2660pub struct EnrichedChunk {
2661 pub content: String,
2663 pub original_content: String,
2665 pub doc_path: String,
2667 pub chunk_index: usize,
2669 pub section: Option<String>,
2671 pub doc_type: Option<String>,
2673}
2674
2675pub fn create_enriched_chunks(
2687 content: &str,
2688 doc_path: &str,
2689 chunk_size: usize,
2690 overlap: usize,
2691 config: &ContextPrefixConfig,
2692) -> Vec<EnrichedChunk> {
2693 let doc_type = detect_doc_type(doc_path);
2695
2696 let filename = std::path::Path::new(doc_path)
2698 .file_name()
2699 .and_then(|n| n.to_str())
2700 .unwrap_or("unknown");
2701
2702 let sections = extract_sections(content);
2704
2705 let mut enriched_chunks = Vec::new();
2706 let mut global_chunk_index = 0;
2707
2708 for (section_header, section_content) in sections {
2709 let chunks = smart_chunk_text(section_content, chunk_size, overlap);
2711
2712 for chunk in chunks {
2713 let prefix = build_context_prefix(
2715 filename,
2716 section_header.as_deref(),
2717 doc_type.as_deref(),
2718 config,
2719 );
2720
2721 let full_content = if prefix.is_empty() {
2723 chunk.clone()
2724 } else {
2725 format!("{}\n\n{}", prefix, chunk)
2726 };
2727
2728 enriched_chunks.push(EnrichedChunk {
2729 content: full_content,
2730 original_content: chunk,
2731 doc_path: doc_path.to_string(),
2732 chunk_index: global_chunk_index,
2733 section: section_header.clone(),
2734 doc_type: doc_type.clone(),
2735 });
2736
2737 global_chunk_index += 1;
2738 }
2739 }
2740
2741 if enriched_chunks.is_empty() && !content.trim().is_empty() {
2743 let prefix = build_context_prefix(filename, None, doc_type.as_deref(), config);
2744 let full_content = if prefix.is_empty() {
2745 content.to_string()
2746 } else {
2747 format!("{}\n\n{}", prefix, content)
2748 };
2749
2750 enriched_chunks.push(EnrichedChunk {
2751 content: full_content,
2752 original_content: content.to_string(),
2753 doc_path: doc_path.to_string(),
2754 chunk_index: 0,
2755 section: None,
2756 doc_type,
2757 });
2758 }
2759
2760 enriched_chunks
2761}
2762
2763fn build_context_prefix(
2765 filename: &str,
2766 section: Option<&str>,
2767 doc_type: Option<&str>,
2768 config: &ContextPrefixConfig,
2769) -> String {
2770 let mut parts = Vec::new();
2771
2772 if config.include_source && !filename.is_empty() {
2773 parts.push(format!("[Source: {}]", filename));
2774 }
2775
2776 if config.include_section
2777 && let Some(sec) = section
2778 {
2779 parts.push(format!("[Section: {}]", sec));
2780 }
2781
2782 if config.include_doc_type
2783 && let Some(dt) = doc_type
2784 {
2785 parts.push(format!("[Type: {}]", dt));
2786 }
2787
2788 let prefix = parts.join(" ");
2789
2790 if prefix.len() > config.max_prefix_length {
2792 prefix.chars().take(config.max_prefix_length).collect()
2793 } else {
2794 prefix
2795 }
2796}
2797
2798fn detect_doc_type(path: &str) -> Option<String> {
2800 let ext = std::path::Path::new(path)
2801 .extension()
2802 .and_then(|e| e.to_str())
2803 .map(|s| s.to_lowercase())?;
2804
2805 let doc_type = match ext.as_str() {
2806 "rs" => "Rust source code",
2807 "py" => "Python source code",
2808 "js" | "jsx" => "JavaScript source code",
2809 "ts" | "tsx" => "TypeScript source code",
2810 "md" => "Markdown documentation",
2811 "txt" => "Plain text",
2812 "json" => "JSON data",
2813 "yaml" | "yml" => "YAML configuration",
2814 "toml" => "TOML configuration",
2815 "html" => "HTML document",
2816 "css" => "CSS stylesheet",
2817 "sql" => "SQL query",
2818 "sh" | "bash" => "Shell script",
2819 "pdf" => "PDF document",
2820 _ => return None,
2821 };
2822
2823 Some(doc_type.to_string())
2824}
2825
2826fn extract_sections(content: &str) -> Vec<(Option<String>, &str)> {
2828 let header_pattern = regex::Regex::new(r"(?m)^(#{1,6})\s+(.+)$").ok();
2830
2831 if let Some(re) = header_pattern {
2832 let mut sections = Vec::new();
2833 let mut last_end = 0;
2834 let mut current_header: Option<String> = None;
2835
2836 for caps in re.captures_iter(content) {
2837 let Some(full_match) = caps.get(0) else {
2838 continue;
2839 };
2840 let Some(header_match) = caps.get(2) else {
2841 continue;
2842 };
2843 let match_start = full_match.start();
2844
2845 if match_start > last_end {
2847 let section_content = &content[last_end..match_start];
2848 if !section_content.trim().is_empty() {
2849 sections.push((current_header.clone(), section_content.trim()));
2850 }
2851 }
2852
2853 current_header = Some(header_match.as_str().to_string());
2854 last_end = full_match.end();
2855 }
2856
2857 if last_end < content.len() {
2859 let section_content = &content[last_end..];
2860 if !section_content.trim().is_empty() {
2861 sections.push((current_header, section_content.trim()));
2862 }
2863 }
2864
2865 if sections.is_empty() {
2866 vec![(None, content)]
2867 } else {
2868 sections
2869 }
2870 } else {
2871 vec![(None, content)]
2872 }
2873}
2874
2875fn smart_chunk_text(text: &str, target_size: usize, overlap: usize) -> Vec<String> {
2877 let sentences = split_into_sentences(text);
2878
2879 if sentences.is_empty() || text.chars().count() <= target_size {
2880 return vec![text.to_string()];
2881 }
2882
2883 let mut chunks = Vec::new();
2884 let mut current_chunk = String::new();
2885 let mut overlap_sentences: Vec<String> = Vec::new();
2886 let overlap_sentence_count = (overlap / 50).clamp(1, 3);
2887
2888 for sentence in &sentences {
2889 let sentence_len = sentence.chars().count();
2890 let current_len = current_chunk.chars().count();
2891 let max_size = target_size + target_size / 2;
2892
2893 if current_len + sentence_len > max_size && !current_chunk.is_empty() {
2894 chunks.push(current_chunk.trim().to_string());
2895 current_chunk = overlap_sentences.join(" ");
2896 if !current_chunk.is_empty() {
2897 current_chunk.push(' ');
2898 }
2899 overlap_sentences.clear();
2900 }
2901
2902 current_chunk.push_str(sentence);
2903 current_chunk.push(' ');
2904
2905 overlap_sentences.push(sentence.clone());
2906 if overlap_sentences.len() > overlap_sentence_count {
2907 overlap_sentences.remove(0);
2908 }
2909
2910 if current_chunk.chars().count() >= target_size {
2911 chunks.push(current_chunk.trim().to_string());
2912 current_chunk = overlap_sentences.join(" ");
2913 if !current_chunk.is_empty() {
2914 current_chunk.push(' ');
2915 }
2916 overlap_sentences.clear();
2917 }
2918 }
2919
2920 let remaining = current_chunk.trim();
2921 if !remaining.is_empty() {
2922 if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
2923 let last_idx = chunks.len() - 1;
2924 chunks[last_idx].push(' ');
2925 chunks[last_idx].push_str(remaining);
2926 } else {
2927 chunks.push(remaining.to_string());
2928 }
2929 }
2930
2931 if chunks.is_empty() {
2932 chunks.push(text.to_string());
2933 }
2934
2935 chunks
2936}
2937
2938fn split_into_sentences(text: &str) -> Vec<String> {
2941 let mut sentences = Vec::new();
2942 let mut current = String::new();
2943 let mut chars = text.chars().peekable();
2944
2945 while let Some(c) = chars.next() {
2946 current.push(c);
2947
2948 if matches!(c, '.' | '!' | '?') {
2950 if let Some(&next) = chars.peek() {
2952 if next.is_whitespace() {
2953 let trimmed = current.trim();
2955 let is_abbreviation = trimmed.ends_with("Mr.")
2956 || trimmed.ends_with("Mrs.")
2957 || trimmed.ends_with("Dr.")
2958 || trimmed.ends_with("Prof.")
2959 || trimmed.ends_with("vs.")
2960 || trimmed.ends_with("etc.")
2961 || trimmed.ends_with("e.g.")
2962 || trimmed.ends_with("i.e.")
2963 || (trimmed.len() >= 2 && trimmed.chars().rev().nth(1).map(|c| c.is_uppercase()).unwrap_or(false));
2965
2966 if !is_abbreviation {
2967 sentences.push(current.trim().to_string());
2968 current = String::new();
2969 chars.next();
2971 }
2972 }
2973 } else {
2974 sentences.push(current.trim().to_string());
2976 current = String::new();
2977 }
2978 } else if c == '\n' {
2979 if let Some(&next) = chars.peek()
2981 && next == '\n'
2982 {
2983 if !current.trim().is_empty() {
2984 sentences.push(current.trim().to_string());
2985 current = String::new();
2986 }
2987 chars.next(); }
2989 }
2990 }
2991
2992 let remaining = current.trim();
2994 if !remaining.is_empty() {
2995 sentences.push(remaining.to_string());
2996 }
2997
2998 sentences
2999}
3000
3001#[derive(Debug, Clone, PartialEq, Eq)]
3003pub struct SearchOptions {
3004 pub layer_filter: Option<SliceLayer>,
3006 pub project_filter: Option<String>,
3008}
3009
3010impl SearchOptions {
3011 pub fn outer_only() -> Self {
3013 Self {
3014 layer_filter: Some(SliceLayer::Outer),
3015 project_filter: None,
3016 }
3017 }
3018
3019 pub fn deep() -> Self {
3021 Self {
3022 layer_filter: None,
3023 project_filter: None,
3024 }
3025 }
3026
3027 pub fn with_project(mut self, project: Option<String>) -> Self {
3028 self.project_filter = project.filter(|value| !value.trim().is_empty());
3029 self
3030 }
3031}
3032
3033impl Default for SearchOptions {
3034 fn default() -> Self {
3035 Self::outer_only()
3036 }
3037}
3038
3039fn metadata_matches_project(metadata: &Value, project: &str) -> bool {
3040 let needle = project.trim();
3041 if needle.is_empty() {
3042 return true;
3043 }
3044
3045 metadata.as_object().is_some_and(|object| {
3046 ["project", "project_id", "source_project"]
3047 .iter()
3048 .filter_map(|key| object.get(*key))
3049 .filter_map(|value| value.as_str())
3050 .any(|value| value.eq_ignore_ascii_case(needle))
3051 })
3052}
3053
3054#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
3055pub struct SearchResult {
3056 pub id: String,
3057 pub namespace: String,
3058 pub text: String,
3059 pub score: f32,
3060 pub metadata: serde_json::Value,
3061 #[serde(skip_serializing_if = "Option::is_none")]
3063 pub layer: Option<SliceLayer>,
3064 #[serde(skip_serializing_if = "Option::is_none")]
3066 pub parent_id: Option<String>,
3067 #[serde(skip_serializing_if = "Vec::is_empty")]
3069 pub children_ids: Vec<String>,
3070 #[serde(skip_serializing_if = "Vec::is_empty")]
3072 pub keywords: Vec<String>,
3073}
3074
3075impl SearchResult {
3076 pub fn new_legacy(
3078 id: String,
3079 namespace: String,
3080 text: String,
3081 score: f32,
3082 metadata: serde_json::Value,
3083 ) -> Self {
3084 Self {
3085 id,
3086 namespace,
3087 text,
3088 score,
3089 metadata,
3090 layer: None,
3091 parent_id: None,
3092 children_ids: vec![],
3093 keywords: vec![],
3094 }
3095 }
3096
3097 pub fn can_expand(&self) -> bool {
3099 !self.children_ids.is_empty()
3100 }
3101
3102 pub fn can_drill_up(&self) -> bool {
3104 self.parent_id.is_some()
3105 }
3106}
3107
3108fn cosine(a: &[f32], b: &[f32]) -> f32 {
3109 let mut dot = 0.0_f32;
3110 let mut norm_a = 0.0_f32;
3111 let mut norm_b = 0.0_f32;
3112 for (x, y) in a.iter().zip(b.iter()) {
3113 dot += x * y;
3114 norm_a += x * x;
3115 norm_b += y * y;
3116 }
3117 if norm_a == 0.0 || norm_b == 0.0 {
3118 return 0.0;
3119 }
3120 dot / (norm_a.sqrt() * norm_b.sqrt())
3121}
3122
3123#[cfg(test)]
3124mod tests {
3125 use super::{
3126 SearchOptions, SliceLayer, extract_keywords, hash_content, metadata_matches_project,
3127 };
3128 use serde_json::json;
3129
3130 #[test]
3131 fn short_hash_uses_sha256_prefix_with_minimum_length() {
3132 let hash = hash_content("same content");
3133 assert_eq!(hash.len(), 16);
3134 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
3135 assert_eq!(hash, hash_content("same content"));
3136 }
3137
3138 #[test]
3139 fn keyword_extraction_splits_paths_and_filters_session_tokens() {
3140 let keywords = extract_keywords(
3141 "/Users/silver/Git/tools/TwinSweep session 2ff4de8b9a4e1234567890abcdef notes",
3142 10,
3143 );
3144
3145 assert!(keywords.contains(&"users".to_string()));
3146 assert!(keywords.contains(&"twinsweep".to_string()));
3147 assert!(!keywords.iter().any(|keyword| keyword.contains("2ff4de8b")));
3148 }
3149
3150 #[test]
3151 fn search_options_can_carry_project_filter() {
3152 let options = SearchOptions::deep().with_project(Some("Vista".to_string()));
3153 assert_eq!(options.layer_filter, None);
3154 assert_eq!(options.project_filter.as_deref(), Some("Vista"));
3155 }
3156
3157 #[test]
3158 fn project_match_uses_metadata_fields() {
3159 assert!(metadata_matches_project(
3160 &json!({"project": "Vista"}),
3161 "vista"
3162 ));
3163 assert!(metadata_matches_project(
3164 &json!({"project_id": "VetCoders"}),
3165 "vetcoders"
3166 ));
3167 assert!(!metadata_matches_project(
3168 &json!({"project": "rmcp-memex"}),
3169 "vista"
3170 ));
3171 assert_eq!(
3172 SearchOptions::default().layer_filter,
3173 Some(SliceLayer::Outer)
3174 );
3175 }
3176}