use anyhow::{Result, anyhow};
use pdf_extract;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet, hash_map::DefaultHasher};
use std::hash::{Hash, Hasher};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::debug;
use crate::{
embeddings::MLXBridge,
preprocessing::{PreprocessingConfig, Preprocessor},
search::BM25Index,
storage::{ChromaDocument, StorageManager},
};
pub mod pipeline;
pub use pipeline::{
Chunk, EmbeddedChunk, FileContent, PipelineConfig, PipelineResult, PipelineStats, run_pipeline,
};
const DEFAULT_NAMESPACE: &str = "rag";
const STORAGE_BATCH_SIZE: usize = 100;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[repr(u8)]
pub enum SliceLayer {
Outer = 1,
Middle = 2,
Inner = 3,
Core = 4,
}
impl SliceLayer {
pub fn target_chars(&self) -> usize {
match self {
SliceLayer::Outer => 100,
SliceLayer::Middle => 300,
SliceLayer::Inner => 600,
SliceLayer::Core => usize::MAX,
}
}
pub fn as_u8(&self) -> u8 {
*self as u8
}
pub fn from_u8(v: u8) -> Option<Self> {
match v {
1 => Some(SliceLayer::Outer),
2 => Some(SliceLayer::Middle),
3 => Some(SliceLayer::Inner),
4 => Some(SliceLayer::Core),
_ => None,
}
}
pub fn name(&self) -> &'static str {
match self {
SliceLayer::Outer => "outer",
SliceLayer::Middle => "middle",
SliceLayer::Inner => "inner",
SliceLayer::Core => "core",
}
}
}
impl std::fmt::Display for SliceLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OnionSlice {
pub id: String,
pub layer: SliceLayer,
pub content: String,
pub parent_id: Option<String>,
pub children_ids: Vec<String>,
pub keywords: Vec<String>,
}
impl OnionSlice {
pub fn generate_id(content: &str, layer: SliceLayer) -> String {
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
layer.as_u8().hash(&mut hasher);
format!("slice_{:016x}_{}", hasher.finish(), layer.name())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SliceMode {
#[default]
Onion,
OnionFast,
Flat,
}
impl std::str::FromStr for SliceMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"onion" => Ok(SliceMode::Onion),
"onion-fast" | "fast" => Ok(SliceMode::OnionFast),
"flat" => Ok(SliceMode::Flat),
other => Err(format!(
"Invalid slice mode: '{}'. Use 'onion', 'onion-fast', or 'flat'",
other
)),
}
}
}
#[derive(Debug, Clone)]
pub enum IndexResult {
Indexed {
chunks_indexed: usize,
content_hash: String,
},
Skipped {
reason: String,
content_hash: String,
},
}
impl IndexResult {
pub fn was_indexed(&self) -> bool {
matches!(self, IndexResult::Indexed { .. })
}
#[deprecated(note = "use was_indexed")]
pub fn is_indexed(&self) -> bool {
self.was_indexed()
}
pub fn is_skipped(&self) -> bool {
matches!(self, IndexResult::Skipped { .. })
}
pub fn content_hash(&self) -> &str {
match self {
IndexResult::Indexed { content_hash, .. } => content_hash,
IndexResult::Skipped { content_hash, .. } => content_hash,
}
}
}
pub fn compute_content_hash(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
let result = hasher.finalize();
result.iter().map(|b| format!("{:02x}", b)).collect()
}
#[derive(Debug, Clone)]
pub struct OnionSliceConfig {
pub outer_target: usize,
pub middle_target: usize,
pub inner_target: usize,
pub min_content_for_slicing: usize,
}
impl Default for OnionSliceConfig {
fn default() -> Self {
Self {
outer_target: 100,
middle_target: 300,
inner_target: 600,
min_content_for_slicing: 200,
}
}
}
pub fn create_onion_slices(
content: &str,
_metadata: &serde_json::Value,
config: &OnionSliceConfig,
) -> Vec<OnionSlice> {
let content = content.trim();
if content.len() < config.min_content_for_slicing {
let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
let keywords = extract_keywords(content, 5);
return vec![OnionSlice {
id: core_id,
layer: SliceLayer::Core,
content: content.to_string(),
parent_id: None,
children_ids: vec![],
keywords,
}];
}
let mut slices = Vec::with_capacity(4);
let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
let core_keywords = extract_keywords(content, 10);
let inner_content = extract_key_content(content, config.inner_target);
let inner_id = OnionSlice::generate_id(&inner_content, SliceLayer::Inner);
let inner_keywords = extract_keywords(&inner_content, 7);
let middle_content = extract_key_content(&inner_content, config.middle_target);
let middle_id = OnionSlice::generate_id(&middle_content, SliceLayer::Middle);
let middle_keywords = extract_keywords(&middle_content, 5);
let outer_content = create_outer_summary(&middle_content, &core_keywords, config.outer_target);
let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
let outer_keywords = extract_keywords(&outer_content, 3);
slices.push(OnionSlice {
id: outer_id.clone(),
layer: SliceLayer::Outer,
content: outer_content,
parent_id: Some(middle_id.clone()),
children_ids: vec![],
keywords: outer_keywords,
});
slices.push(OnionSlice {
id: middle_id.clone(),
layer: SliceLayer::Middle,
content: middle_content,
parent_id: Some(inner_id.clone()),
children_ids: vec![outer_id],
keywords: middle_keywords,
});
slices.push(OnionSlice {
id: inner_id.clone(),
layer: SliceLayer::Inner,
content: inner_content,
parent_id: Some(core_id.clone()),
children_ids: vec![middle_id],
keywords: inner_keywords,
});
slices.push(OnionSlice {
id: core_id.clone(),
layer: SliceLayer::Core,
content: content.to_string(),
parent_id: None,
children_ids: vec![inner_id],
keywords: core_keywords,
});
slices
}
pub fn create_onion_slices_fast(
content: &str,
_metadata: &serde_json::Value,
config: &OnionSliceConfig,
) -> Vec<OnionSlice> {
let content = content.trim();
if content.len() < config.min_content_for_slicing {
let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
let keywords = extract_keywords(content, 5);
return vec![OnionSlice {
id: core_id,
layer: SliceLayer::Core,
content: content.to_string(),
parent_id: None,
children_ids: vec![],
keywords,
}];
}
let mut slices = Vec::with_capacity(2);
let core_id = OnionSlice::generate_id(content, SliceLayer::Core);
let core_keywords = extract_keywords(content, 10);
let outer_content = create_outer_summary(content, &core_keywords, config.outer_target);
let outer_id = OnionSlice::generate_id(&outer_content, SliceLayer::Outer);
let outer_keywords = extract_keywords(&outer_content, 3);
slices.push(OnionSlice {
id: outer_id.clone(),
layer: SliceLayer::Outer,
content: outer_content,
parent_id: Some(core_id.clone()),
children_ids: vec![],
keywords: outer_keywords,
});
slices.push(OnionSlice {
id: core_id,
layer: SliceLayer::Core,
content: content.to_string(),
parent_id: None,
children_ids: vec![outer_id],
keywords: core_keywords,
});
slices
}
fn extract_keywords(text: &str, max_keywords: usize) -> Vec<String> {
use std::collections::HashMap;
const STOP_WORDS: &[&str] = &[
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by",
"from", "as", "is", "was", "are", "were", "been", "be", "have", "has", "had", "do", "does",
"did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "this",
"that", "these", "those", "i", "you", "he", "she", "it", "we", "they", "what", "which",
"who", "whom", "when", "where", "why", "how", "all", "each", "every", "both", "few",
"more", "most", "other", "some", "such", "no", "not", "only", "own", "same", "so", "than",
"too", "very", "just", "also", "now", "here", "there", "then", "once", "if", "into",
"through", "during", "before", "after", "above", "below", "between", "under", "again",
"further", "about", "out", "over", "up", "down", "off", "any", "because", "until", "while",
];
let stop_set: std::collections::HashSet<&str> = STOP_WORDS.iter().copied().collect();
let mut word_counts: HashMap<String, usize> = HashMap::new();
for raw in text.split_whitespace() {
for token in tokenize_keyword_candidates(raw) {
if token.len() >= 3
&& token.len() <= 30
&& !stop_set.contains(token.as_str())
&& !looks_like_session_token(&token)
{
*word_counts.entry(token).or_insert(0) += 1;
}
}
}
let mut words: Vec<_> = word_counts.into_iter().collect();
words.sort_by(|a, b| b.1.cmp(&a.1));
words
.into_iter()
.take(max_keywords)
.map(|(word, _)| word)
.collect()
}
fn tokenize_keyword_candidates(raw: &str) -> Vec<String> {
let mut tokens = Vec::new();
for segment in raw
.split(|ch: char| !ch.is_alphanumeric())
.filter(|segment| !segment.is_empty())
{
let compact: String = segment.chars().flat_map(|ch| ch.to_lowercase()).collect();
let mut normalized = String::with_capacity(segment.len() * 2);
let mut previous_is_lowercase = false;
for ch in segment.chars() {
if ch.is_ascii_uppercase() && previous_is_lowercase {
normalized.push(' ');
}
normalized.push(ch.to_ascii_lowercase());
previous_is_lowercase = ch.is_ascii_lowercase();
}
let segment_tokens = normalized
.split_whitespace()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
tokens.extend(segment_tokens.iter().cloned());
if segment_tokens.len() > 1
&& compact.len() >= 3
&& compact.len() <= 30
&& !tokens.iter().any(|token| token == &compact)
{
tokens.push(compact);
}
}
tokens
}
fn looks_like_session_token(token: &str) -> bool {
let hex_chars = token.chars().filter(|ch| ch.is_ascii_hexdigit()).count();
let digit_chars = token.chars().filter(|ch| ch.is_ascii_digit()).count();
let alpha_chars = token.chars().filter(|ch| ch.is_ascii_alphabetic()).count();
token.len() > 12 && hex_chars == token.len()
|| digit_chars >= 6
|| (token.len() > 20 && alpha_chars < token.len() / 3)
}
fn hash_content(text: &str) -> String {
let mut hash = compute_content_hash(text);
hash.truncate(16);
hash
}
fn extract_conversation_documents(
value: &serde_json::Value,
source_path: &std::path::Path,
) -> Option<Vec<(String, String, serde_json::Value)>> {
let obj = value.as_object()?;
let source_name = source_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
if let Some(serde_json::Value::Array(sessions)) = obj.get("sessions") {
let project = obj
.get("project")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let mut docs = Vec::new();
for session in sessions {
let session_obj = session.as_object()?;
let session_id = session_obj
.get("info")
.and_then(|i| i.get("sessionId"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let session_short = &session_id[..session_id.len().min(8)];
if let Some(serde_json::Value::Array(messages)) = session_obj.get("messages") {
for (idx, msg) in messages.iter().enumerate() {
let msg_obj = match msg.as_object() {
Some(o) => o,
None => continue,
};
let role = msg_obj
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let text = msg_obj.get("text").and_then(|v| v.as_str()).unwrap_or("");
let timestamp = msg_obj
.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("");
let text = text.trim();
if text.len() < 20 {
continue;
}
let content_hash = hash_content(text);
let doc_id = format!("msg-{}-{:04}-{}", session_short, idx, content_hash);
let metadata = json!({
"role": role,
"session": session_short,
"project": project,
"timestamp": timestamp,
"source": source_name,
"type": "conversation",
"format": "sessions"
});
docs.push((doc_id, text.to_string(), metadata));
}
}
}
if !docs.is_empty() {
tracing::info!(
"Sessions format detected: {} -> {} messages",
source_path.display(),
docs.len()
);
return Some(docs);
}
}
if let Some(serde_json::Value::Array(messages)) = obj.get("messages") {
let conv_id = obj
.get("uuid")
.or_else(|| obj.get("id"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let conv_short = &conv_id[..conv_id.len().min(8)];
let title = obj
.get("name")
.or_else(|| obj.get("title"))
.and_then(|v| v.as_str())
.unwrap_or("");
let looks_like_conversation = messages.iter().any(|m| {
m.get("sender").is_some() || m.get("role").is_some() || m.get("author").is_some()
});
if looks_like_conversation {
let mut docs = Vec::new();
for (idx, msg) in messages.iter().enumerate() {
let msg_obj = match msg.as_object() {
Some(o) => o,
None => continue,
};
let role = msg_obj
.get("sender")
.or_else(|| msg_obj.get("role"))
.or_else(|| msg_obj.get("author").and_then(|a| a.get("role")))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let role = match role {
"human" => "user",
"assistant" | "bot" => "assistant",
other => other,
};
let text = msg_obj
.get("text")
.and_then(|v| v.as_str())
.or_else(|| {
msg_obj.get("content").and_then(|c| {
if let Some(s) = c.as_str() {
Some(s)
} else if let Some(_arr) = c.as_array() {
None } else {
None
}
})
})
.unwrap_or("");
let text = if text.is_empty() {
if let Some(serde_json::Value::Array(content)) = msg_obj.get("content") {
content
.iter()
.filter_map(|c| c.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join(" ")
} else {
String::new()
}
} else {
text.to_string()
};
let timestamp = msg_obj
.get("created_at")
.or_else(|| msg_obj.get("timestamp"))
.and_then(|v| v.as_str())
.unwrap_or("");
let text = text.trim();
if text.len() < 20 {
continue;
}
let content_hash = hash_content(text);
let doc_id = format!("conv-{}-{:04}-{}", conv_short, idx, content_hash);
let metadata = json!({
"role": role,
"conversation": conv_short,
"title": title,
"timestamp": timestamp,
"source": source_name,
"type": "conversation",
"format": "claude_web"
});
docs.push((doc_id, text.to_string(), metadata));
}
if !docs.is_empty() {
tracing::info!(
"Conversation format detected: {} -> {} messages",
source_path.display(),
docs.len()
);
return Some(docs);
}
}
}
if let Some(serde_json::Value::Array(messages)) = obj.get("chat_messages") {
let conv_id = obj
.get("uuid")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let conv_short = &conv_id[..conv_id.len().min(8)];
let title = obj.get("name").and_then(|v| v.as_str()).unwrap_or("");
let mut docs = Vec::new();
for (idx, msg) in messages.iter().enumerate() {
let msg_obj = match msg.as_object() {
Some(o) => o,
None => continue,
};
let role = msg_obj
.get("sender")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let role = match role {
"human" => "user",
"assistant" | "bot" => "assistant",
other => other,
};
let text = msg_obj.get("text").and_then(|v| v.as_str()).unwrap_or("");
let timestamp = msg_obj
.get("created_at")
.and_then(|v| v.as_str())
.unwrap_or("");
let text = text.trim();
if text.len() < 20 {
continue;
}
let content_hash = hash_content(text);
let doc_id = format!("chat-{}-{:04}-{}", conv_short, idx, content_hash);
let metadata = json!({
"role": role,
"conversation": conv_short,
"title": title,
"timestamp": timestamp,
"source": source_name,
"type": "conversation",
"format": "claude_web"
});
docs.push((doc_id, text.to_string(), metadata));
}
if !docs.is_empty() {
tracing::info!(
"Claude.ai chat_messages format detected: {} -> {} messages",
source_path.display(),
docs.len()
);
return Some(docs);
}
}
if let Some(serde_json::Value::Object(mapping)) = obj.get("mapping") {
let conv_id = obj
.get("id")
.or_else(|| obj.get("conversation_id"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let conv_short = &conv_id[..conv_id.len().min(8)];
let title = obj.get("title").and_then(|v| v.as_str()).unwrap_or("");
let mut docs = Vec::new();
let mut entries: Vec<_> = mapping.iter().collect();
entries.sort_by(|a, b| {
let time_a =
a.1.get("message")
.and_then(|m| m.get("create_time"))
.and_then(|t| t.as_f64())
.unwrap_or(0.0);
let time_b =
b.1.get("message")
.and_then(|m| m.get("create_time"))
.and_then(|t| t.as_f64())
.unwrap_or(0.0);
time_a
.partial_cmp(&time_b)
.unwrap_or(std::cmp::Ordering::Equal)
});
for (idx, (_node_id, node)) in entries.iter().enumerate() {
let message = match node.get("message") {
Some(m) => m,
None => continue,
};
let role = message
.get("author")
.and_then(|a| a.get("role"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if role == "system" {
continue;
}
let text = message
.get("content")
.and_then(|c| c.get("parts"))
.and_then(|p| p.as_array())
.map(|parts| {
parts
.iter()
.filter_map(|p| p.as_str())
.collect::<Vec<_>>()
.join(" ")
})
.unwrap_or_default();
let timestamp = message
.get("create_time")
.and_then(|t| t.as_f64())
.map(|ts| {
chrono::DateTime::from_timestamp(ts as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
})
.unwrap_or_default();
let text = text.trim();
if text.len() < 20 {
continue;
}
let content_hash = hash_content(text);
let doc_id = format!("gpt-{}-{:04}-{}", conv_short, idx, content_hash);
let metadata = json!({
"role": role,
"conversation": conv_short,
"title": title,
"timestamp": timestamp,
"source": source_name,
"type": "conversation",
"format": "chatgpt"
});
docs.push((doc_id, text.to_string(), metadata));
}
if !docs.is_empty() {
tracing::info!(
"ChatGPT format detected: {} -> {} messages",
source_path.display(),
docs.len()
);
return Some(docs);
}
}
None
}
fn extract_json_element_content(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Object(map) => {
let mut parts = Vec::new();
for key in [
"content",
"text",
"message",
"summary",
"description",
"body",
] {
if let Some(serde_json::Value::String(s)) = map.get(key)
&& !s.is_empty()
{
parts.push(s.clone());
}
}
if let Some(serde_json::Value::String(role)) = map.get("role")
&& let Some(content) = map.get("content")
{
match content {
serde_json::Value::String(s) => {
parts.push(format!("{}: {}", role, s));
}
serde_json::Value::Array(arr) => {
for item in arr {
if let serde_json::Value::Object(block) = item
&& let Some(serde_json::Value::String(t)) = block.get("text")
{
parts.push(format!("{}: {}", role, t));
}
}
}
_ => {}
}
}
if let Some(serde_json::Value::Array(messages)) = map.get("messages") {
for msg in messages.iter().take(50) {
let msg_content = extract_json_element_content(msg);
if !msg_content.is_empty() && msg_content.len() > 10 {
parts.push(msg_content);
}
}
}
if let Some(serde_json::Value::Array(messages)) = map.get("chat_messages") {
for msg in messages.iter().take(50) {
let msg_content = extract_json_element_content(msg);
if !msg_content.is_empty() && msg_content.len() > 10 {
parts.push(msg_content);
}
}
}
if let Some(serde_json::Value::String(name)) = map.get("name")
&& let Some(serde_json::Value::Array(obs)) = map.get("observations")
{
let observations: Vec<String> = obs
.iter()
.filter_map(|v| v.as_str().map(String::from))
.take(10)
.collect();
if !observations.is_empty() {
parts.push(format!("{}: {}", name, observations.join("; ")));
}
}
for key in ["title", "name", "uuid", "id"] {
if let Some(serde_json::Value::String(s)) = map.get(key) {
if !s.is_empty() && parts.iter().all(|p| !p.contains(s)) {
parts.insert(0, format!("[{}]", s));
}
break;
}
}
if parts.is_empty() {
serde_json::to_string(value)
.unwrap_or_default()
.chars()
.take(5000)
.collect()
} else {
parts.join("\n")
}
}
serde_json::Value::Array(arr) => {
arr.iter()
.take(20)
.map(extract_json_element_content)
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("\n")
}
_ => value.to_string(),
}
}
fn detect_json_element_type(value: &serde_json::Value) -> &'static str {
if let serde_json::Value::Object(map) = value {
if map.contains_key("chat_messages") || map.contains_key("mapping") {
return "conversation";
}
if map.contains_key("messages") && map.contains_key("sessions") {
return "session";
}
if map.contains_key("role") && map.contains_key("content") {
return "message";
}
if map.contains_key("observations") && map.contains_key("name") {
return "entity";
}
if map.contains_key("messages") {
return "thread";
}
"object"
} else if value.is_array() {
"array"
} else if value.is_string() {
"text"
} else {
"value"
}
}
fn extract_key_content(text: &str, target_chars: usize) -> String {
if text.len() <= target_chars {
return text.to_string();
}
let sentences: Vec<&str> = text
.split(['.', '!', '?'])
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
if sentences.is_empty() {
return truncate_at_word_boundary(text, target_chars);
}
let keywords = extract_keywords(text, 10);
let keyword_set: std::collections::HashSet<&str> =
keywords.iter().map(|s| s.as_str()).collect();
let mut scored_sentences: Vec<(usize, f32, &str)> = sentences
.iter()
.enumerate()
.map(|(idx, sentence)| {
let mut score = 0.0_f32;
if idx == 0 {
score += 2.0;
} else if idx == sentences.len() - 1 {
score += 1.5;
}
let words: Vec<&str> = sentence.split_whitespace().collect();
let keyword_count = words
.iter()
.filter(|w| {
let cleaned: String = w
.chars()
.filter(|c| c.is_alphanumeric())
.collect::<String>()
.to_lowercase();
keyword_set.contains(cleaned.as_str())
})
.count();
if !words.is_empty() {
score += (keyword_count as f32 / words.len() as f32) * 3.0;
}
if sentence.len() < 20 {
score -= 0.5;
}
(idx, score, *sentence)
})
.collect();
scored_sentences.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let mut selected_indices: Vec<usize> = Vec::new();
let mut total_len = 0;
for (idx, _, sentence) in &scored_sentences {
let sentence_len = sentence.len() + 2; if total_len + sentence_len > target_chars && !selected_indices.is_empty() {
break;
}
selected_indices.push(*idx);
total_len += sentence_len;
}
selected_indices.sort();
let result: Vec<&str> = selected_indices
.iter()
.filter_map(|&idx| sentences.get(idx).copied())
.collect();
if result.is_empty() {
truncate_at_word_boundary(text, target_chars)
} else {
result.join(". ") + "."
}
}
fn create_outer_summary(middle_content: &str, keywords: &[String], target_chars: usize) -> String {
let keyword_prefix = if !keywords.is_empty() {
format!(
"[{}] ",
keywords
.iter()
.take(5)
.cloned()
.collect::<Vec<_>>()
.join(", ")
)
} else {
String::new()
};
let remaining_chars = target_chars.saturating_sub(keyword_prefix.len());
let first_sentence = middle_content
.split(['.', '!', '?'])
.next()
.unwrap_or(middle_content)
.trim();
let summary = if first_sentence.len() <= remaining_chars {
first_sentence.to_string()
} else {
truncate_at_word_boundary(first_sentence, remaining_chars)
};
format!("{}{}", keyword_prefix, summary)
}
fn truncate_at_word_boundary(text: &str, max_chars: usize) -> String {
let char_count = text.chars().count();
if char_count <= max_chars {
return text.to_string();
}
let byte_idx = text
.char_indices()
.nth(max_chars)
.map(|(idx, _)| idx)
.unwrap_or(text.len());
let truncated = &text[..byte_idx];
if let Some(last_space) = truncated.rfind(' ') {
format!("{}...", &text[..last_space])
} else {
format!("{}...", truncated)
}
}
pub struct RAGPipeline {
mlx_bridge: Arc<Mutex<MLXBridge>>,
storage: Arc<StorageManager>,
bm25_writer: Option<Arc<BM25Index>>,
}
impl RAGPipeline {
pub async fn new(
mlx_bridge: Arc<Mutex<MLXBridge>>,
storage: Arc<StorageManager>,
) -> Result<Self> {
Self::new_with_bm25(mlx_bridge, storage, None).await
}
pub async fn new_with_bm25(
mlx_bridge: Arc<Mutex<MLXBridge>>,
storage: Arc<StorageManager>,
bm25_writer: Option<Arc<BM25Index>>,
) -> Result<Self> {
Ok(Self {
mlx_bridge,
storage,
bm25_writer,
})
}
pub fn storage_manager(&self) -> Arc<StorageManager> {
self.storage.clone()
}
pub async fn refresh(&self) -> Result<()> {
self.storage.refresh().await
}
async fn persist_documents(&self, documents: Vec<ChromaDocument>) -> Result<()> {
if documents.is_empty() {
return Ok(());
}
let mut unique_documents = Vec::with_capacity(documents.len());
let mut seen_ids: HashSet<(String, String)> = HashSet::new();
let mut seen_hashes: HashSet<(String, String)> = HashSet::new();
for mut document in documents {
if let Value::Object(ref mut map) = document.metadata {
map.entry("indexed_at".to_string())
.or_insert_with(|| json!(chrono::Utc::now().to_rfc3339()));
}
let id_key = (document.namespace.clone(), document.id.clone());
if !seen_ids.insert(id_key) {
continue;
}
if let Some(hash) = document.content_hash.as_ref() {
let hash_key = (document.namespace.clone(), hash.clone());
if !seen_hashes.insert(hash_key) {
continue;
}
}
unique_documents.push(document);
}
let documents = self
.filter_documents_against_store(unique_documents)
.await?;
if documents.is_empty() {
return Ok(());
}
let bm25_documents: Vec<(String, String, String)> = documents
.iter()
.map(|doc| (doc.id.clone(), doc.namespace.clone(), doc.document.clone()))
.collect();
let inserted_ids: Vec<(String, String)> = documents
.iter()
.map(|doc| (doc.namespace.clone(), doc.id.clone()))
.collect();
self.storage.add_to_store(documents).await?;
if let Some(bm25_writer) = &self.bm25_writer
&& let Err(error) = bm25_writer.add_documents(&bm25_documents).await
{
for (namespace, id) in &inserted_ids {
let _ = self.storage.delete_document(namespace, id).await;
}
return Err(error);
}
Ok(())
}
async fn filter_documents_against_store(
&self,
documents: Vec<ChromaDocument>,
) -> Result<Vec<ChromaDocument>> {
if documents.is_empty() {
return Ok(vec![]);
}
let mut hashes_by_namespace: HashMap<String, Vec<String>> = HashMap::new();
for document in &documents {
if let Some(hash) = document.content_hash.as_ref() {
hashes_by_namespace
.entry(document.namespace.clone())
.or_default()
.push(hash.clone());
}
}
let mut allowed_hashes: HashMap<String, HashSet<String>> = HashMap::new();
for (namespace, hashes) in hashes_by_namespace {
let hashes = self
.storage
.filter_existing_hashes(&namespace, &hashes)
.await?;
allowed_hashes.insert(
namespace,
hashes.into_iter().cloned().collect::<HashSet<_>>(),
);
}
Ok(documents
.into_iter()
.filter(|document| match document.content_hash.as_ref() {
None => true,
Some(hash) => allowed_hashes
.get(&document.namespace)
.map(|hashes| hashes.contains(hash))
.unwrap_or(true),
})
.collect())
}
async fn clear_namespace_from_indices(&self, namespace: &str) -> Result<usize> {
let deleted = self.storage.delete_namespace_documents(namespace).await?;
if let Some(bm25_writer) = &self.bm25_writer {
bm25_writer.delete_namespace_term(namespace).await?;
}
Ok(deleted)
}
async fn load_memory_family(&self, namespace: &str, id: &str) -> Result<Vec<ChromaDocument>> {
let docs = self.storage.get_all_in_namespace(namespace).await?;
Ok(docs
.into_iter()
.filter(|doc| {
doc.id == id
|| doc
.metadata
.get("original_id")
.and_then(|value| value.as_str())
.is_some_and(|original_id| original_id == id)
})
.collect())
}
async fn delete_memory_family(&self, namespace: &str, id: &str) -> Result<usize> {
let family = self.load_memory_family(namespace, id).await?;
if family.is_empty() {
return Ok(0);
}
let mut deleted = 0usize;
let mut ids = Vec::with_capacity(family.len());
for document in family {
deleted += self
.storage
.delete_document(namespace, &document.id)
.await?
.min(1);
ids.push(document.id);
}
if let Some(bm25_writer) = &self.bm25_writer
&& !ids.is_empty()
{
bm25_writer.delete_documents(&ids).await?;
}
Ok(deleted)
}
fn preferred_memory_family_document(
mut family: Vec<ChromaDocument>,
requested_id: &str,
) -> Option<ChromaDocument> {
fn rank(layer: Option<SliceLayer>) -> u8 {
match layer {
None => 0,
Some(SliceLayer::Outer) => 1,
Some(SliceLayer::Middle) => 2,
Some(SliceLayer::Inner) => 3,
Some(SliceLayer::Core) => 4,
}
}
family.sort_by_key(|document| {
if document.id == requested_id {
(0_u8, 0_u8)
} else {
(1_u8, rank(document.slice_layer()))
}
});
family.into_iter().next()
}
pub fn mlx_connected_to(&self) -> String {
if let Ok(bridge) = self.mlx_bridge.try_lock() {
bridge.connected_to().to_string()
} else {
"mlx (lock held)".to_string()
}
}
pub async fn index_document(&self, path: &Path, namespace: Option<&str>) -> Result<()> {
self.index_document_with_mode(path, namespace, SliceMode::default())
.await
}
pub async fn index_document_with_mode(
&self,
path: &Path,
namespace: Option<&str>,
slice_mode: SliceMode,
) -> Result<()> {
self.index_document_internal(path, namespace, None, slice_mode)
.await
}
pub async fn index_document_with_preprocessing(
&self,
path: &Path,
namespace: Option<&str>,
preprocess_config: PreprocessingConfig,
) -> Result<()> {
self.index_document_internal(path, namespace, Some(preprocess_config), SliceMode::Flat)
.await
}
pub async fn index_document_with_dedup(
&self,
path: &Path,
namespace: Option<&str>,
slice_mode: SliceMode,
) -> Result<IndexResult> {
let validated_path = crate::path_utils::validate_read_path(path)?;
let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
let is_json = validated_path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.eq_ignore_ascii_case("json"))
.unwrap_or(false);
if is_json || matches!(slice_mode, SliceMode::Onion | SliceMode::OnionFast) {
return self
.index_document_with_json_awareness(&validated_path, ns, slice_mode)
.await;
}
let text = self.extract_text(&validated_path).await?;
let content_hash = compute_content_hash(&text);
if self.storage.has_content_hash(ns, &content_hash).await? {
debug!(
"Skipping duplicate content: {} (hash: {})",
path.display(),
&content_hash[..16]
);
return Ok(IndexResult::Skipped {
reason: "exact duplicate".to_string(),
content_hash,
});
}
let base_metadata = json!({
"path": path.to_str(),
"slice_mode": "flat",
"content_hash": &content_hash,
});
let chunks_indexed = self
.index_with_flat_chunking_and_hash(&text, ns, path, base_metadata, &content_hash)
.await?;
Ok(IndexResult::Indexed {
chunks_indexed,
content_hash,
})
}
async fn index_document_with_json_awareness(
&self,
path: &Path,
namespace: &str,
slice_mode: SliceMode,
) -> Result<IndexResult> {
let documents = self.extract_json_documents(path).await?;
let mut total_chunks = 0;
let mut skipped_docs = 0;
let file_content_hash = match crate::path_utils::safe_read_to_string_async(path).await {
Ok((_p, content)) => compute_content_hash(&content),
Err(_) => compute_content_hash(""),
};
for (doc_id, content, mut doc_metadata) in documents {
if content.len() < 50 {
continue; }
let doc_hash = compute_content_hash(&content);
if self.storage.has_content_hash(namespace, &doc_hash).await? {
skipped_docs += 1;
continue;
}
if let serde_json::Value::Object(ref mut map) = doc_metadata {
map.insert("doc_id".to_string(), json!(doc_id));
map.insert("content_hash".to_string(), json!(doc_hash));
map.insert("file_hash".to_string(), json!(&file_content_hash));
map.insert(
"slice_mode".to_string(),
json!(match slice_mode {
SliceMode::Onion => "onion",
SliceMode::OnionFast => "onion-fast",
SliceMode::Flat => "flat",
}),
);
}
let is_conversation = doc_metadata
.get("type")
.and_then(|v| v.as_str())
.map(|t| t == "conversation")
.unwrap_or(false);
let chunks = if is_conversation {
self.index_conversation_message_direct(
&doc_id,
&content,
namespace,
doc_metadata,
&doc_hash,
)
.await?
} else {
match slice_mode {
SliceMode::Onion => {
self.index_with_onion_slicing_and_hash(
&content,
namespace,
doc_metadata,
&doc_hash,
)
.await?
}
SliceMode::OnionFast => {
self.index_with_onion_slicing_fast_and_hash(
&content,
namespace,
doc_metadata,
&doc_hash,
)
.await?
}
SliceMode::Flat => {
self.index_with_flat_chunking_and_hash(
&content,
namespace,
path,
doc_metadata,
&doc_hash,
)
.await?
}
}
};
total_chunks += chunks;
}
if total_chunks == 0 && skipped_docs > 0 {
return Ok(IndexResult::Skipped {
reason: format!("all {} documents already indexed", skipped_docs),
content_hash: file_content_hash,
});
}
tracing::info!(
"JSON-aware indexing: {} -> {} chunks ({} docs skipped)",
path.display(),
total_chunks,
skipped_docs
);
Ok(IndexResult::Indexed {
chunks_indexed: total_chunks,
content_hash: file_content_hash,
})
}
pub async fn index_document_with_preprocessing_and_dedup(
&self,
path: &Path,
namespace: Option<&str>,
preprocess_config: PreprocessingConfig,
) -> Result<IndexResult> {
let text = self.extract_text(path).await?;
let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
let content_hash = compute_content_hash(&text);
if self.storage.has_content_hash(ns, &content_hash).await? {
debug!(
"Skipping duplicate content: {} (hash: {})",
path.display(),
&content_hash[..16]
);
return Ok(IndexResult::Skipped {
reason: "exact duplicate".to_string(),
content_hash,
});
}
let preprocessor = Preprocessor::new(preprocess_config);
let cleaned = preprocessor.extract_semantic_content(&text);
tracing::info!(
"Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
text.len(),
cleaned.len(),
(1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
);
let base_metadata = json!({
"path": path.to_str(),
"slice_mode": "flat",
"content_hash": &content_hash,
});
let chunks_indexed = self
.index_with_flat_chunking_and_hash(&cleaned, ns, path, base_metadata, &content_hash)
.await?;
Ok(IndexResult::Indexed {
chunks_indexed,
content_hash,
})
}
async fn index_document_internal(
&self,
path: &Path,
namespace: Option<&str>,
preprocess_config: Option<PreprocessingConfig>,
slice_mode: SliceMode,
) -> Result<()> {
let validated_path = crate::path_utils::validate_read_path(path)?;
let text = self.extract_text(&validated_path).await?;
let text = if let Some(config) = preprocess_config {
let preprocessor = Preprocessor::new(config);
let cleaned = preprocessor.extract_semantic_content(&text);
tracing::info!(
"Preprocessing: {} chars -> {} chars ({:.1}% reduction)",
text.len(),
cleaned.len(),
(1.0 - (cleaned.len() as f32 / text.len() as f32)) * 100.0
);
cleaned
} else {
text
};
let ns = namespace.unwrap_or(DEFAULT_NAMESPACE);
let base_metadata = json!({
"path": validated_path.to_str(),
"slice_mode": match slice_mode {
SliceMode::Onion => "onion",
SliceMode::OnionFast => "onion-fast",
SliceMode::Flat => "flat",
}
});
match slice_mode {
SliceMode::Onion => {
self.index_with_onion_slicing(&text, ns, base_metadata)
.await
}
SliceMode::OnionFast => {
self.index_with_onion_slicing_fast(&text, ns, base_metadata)
.await
}
SliceMode::Flat => {
self.index_with_flat_chunking(&text, ns, path, base_metadata)
.await
}
}
}
async fn index_with_onion_slicing(
&self,
text: &str,
namespace: &str,
base_metadata: serde_json::Value,
) -> Result<()> {
let config = OnionSliceConfig::default();
let slices = create_onion_slices(text, &base_metadata, &config);
let total_slices = slices.len();
tracing::info!(
"Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
text.len(),
total_slices
);
let mut total_stored = 0;
for batch in slices.chunks(STORAGE_BATCH_SIZE) {
let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
let embeddings = self.embed_chunks(&batch_contents).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("layer".to_string(), json!(slice.layer.name()));
map.insert("keywords".to_string(), json!(slice.keywords));
}
let doc = ChromaDocument::from_onion_slice(
slice,
namespace.to_string(),
embedding.clone(),
metadata,
);
batch_docs.push(doc);
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} slices", total_stored, total_slices);
}
Ok(())
}
async fn index_with_onion_slicing_fast(
&self,
text: &str,
namespace: &str,
base_metadata: serde_json::Value,
) -> Result<()> {
let config = OnionSliceConfig::default();
let slices = create_onion_slices_fast(text, &base_metadata, &config);
let total_slices = slices.len();
tracing::info!(
"Fast onion slicing: {} chars -> {} slices (outer/core only)",
text.len(),
total_slices
);
let mut total_stored = 0;
for batch in slices.chunks(STORAGE_BATCH_SIZE) {
let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
let embeddings = self.embed_chunks(&batch_contents).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("layer".to_string(), json!(slice.layer.name()));
map.insert("keywords".to_string(), json!(slice.keywords));
}
let doc = ChromaDocument::from_onion_slice(
slice,
namespace.to_string(),
embedding.clone(),
metadata,
);
batch_docs.push(doc);
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} slices", total_stored, total_slices);
}
Ok(())
}
async fn index_with_onion_slicing_and_hash(
&self,
text: &str,
namespace: &str,
base_metadata: serde_json::Value,
content_hash: &str,
) -> Result<usize> {
let config = OnionSliceConfig::default();
let slices = create_onion_slices(text, &base_metadata, &config);
let total_slices = slices.len();
tracing::info!(
"Onion slicing: {} chars -> {} slices (outer/middle/inner/core)",
text.len(),
total_slices
);
let mut total_stored = 0;
for batch in slices.chunks(STORAGE_BATCH_SIZE) {
let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
let embeddings = self.embed_chunks(&batch_contents).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("layer".to_string(), json!(slice.layer.name()));
map.insert("keywords".to_string(), json!(slice.keywords));
}
let slice_hash = compute_content_hash(&slice.content);
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("file_hash".to_string(), json!(content_hash));
}
let doc = ChromaDocument::from_onion_slice_with_hash(
slice,
namespace.to_string(),
embedding.clone(),
metadata,
slice_hash,
);
batch_docs.push(doc);
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} slices", total_stored, total_slices);
}
Ok(total_slices)
}
async fn index_with_onion_slicing_fast_and_hash(
&self,
text: &str,
namespace: &str,
base_metadata: serde_json::Value,
content_hash: &str,
) -> Result<usize> {
let config = OnionSliceConfig::default();
let slices = create_onion_slices_fast(text, &base_metadata, &config);
let total_slices = slices.len();
tracing::info!(
"Fast onion slicing: {} chars -> {} slices (outer/core only)",
text.len(),
total_slices
);
let mut total_stored = 0;
for batch in slices.chunks(STORAGE_BATCH_SIZE) {
let batch_contents: Vec<String> = batch.iter().map(|s| s.content.clone()).collect();
let embeddings = self.embed_chunks(&batch_contents).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (slice, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("layer".to_string(), json!(slice.layer.name()));
map.insert("keywords".to_string(), json!(slice.keywords));
}
let slice_hash = compute_content_hash(&slice.content);
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("file_hash".to_string(), json!(content_hash));
}
let doc = ChromaDocument::from_onion_slice_with_hash(
slice,
namespace.to_string(),
embedding.clone(),
metadata,
slice_hash,
);
batch_docs.push(doc);
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} slices", total_stored, total_slices);
}
Ok(total_slices)
}
async fn index_with_flat_chunking(
&self,
text: &str,
namespace: &str,
path: &Path,
base_metadata: serde_json::Value,
) -> Result<()> {
let chunks = self.chunk_text(text, 512, 128)?;
let total_chunks = chunks.len();
tracing::info!(
"Flat chunking: {} chars -> {} chunks",
text.len(),
total_chunks
);
let mut total_stored = 0;
let mut global_idx = 0;
for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
let embeddings = self.embed_chunks(batch).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("chunk_index".to_string(), json!(global_idx));
map.insert("total_chunks".to_string(), json!(total_chunks));
}
let doc = ChromaDocument::new_flat(
format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
namespace.to_string(),
embedding.clone(),
metadata,
chunk.clone(),
);
batch_docs.push(doc);
global_idx += 1;
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
}
Ok(())
}
async fn index_conversation_message_direct(
&self,
doc_id: &str,
text: &str,
namespace: &str,
metadata: serde_json::Value,
content_hash: &str,
) -> Result<usize> {
let embedding = self.embed_query(text).await?;
let doc = ChromaDocument::new_flat_with_hash(
doc_id.to_string(),
namespace.to_string(),
embedding,
metadata,
text.to_string(),
content_hash.to_string(),
);
self.persist_documents(vec![doc]).await?;
tracing::debug!(
"Conversation message stored directly: {} ({} chars)",
doc_id,
text.len()
);
Ok(1) }
async fn index_with_flat_chunking_and_hash(
&self,
text: &str,
namespace: &str,
path: &Path,
base_metadata: serde_json::Value,
content_hash: &str,
) -> Result<usize> {
let chunks = self.chunk_text(text, 512, 128)?;
let total_chunks = chunks.len();
tracing::info!(
"Flat chunking: {} chars -> {} chunks",
text.len(),
total_chunks
);
let mut total_stored = 0;
let mut global_idx = 0;
for batch in chunks.chunks(STORAGE_BATCH_SIZE) {
let embeddings = self.embed_chunks(batch).await?;
let mut batch_docs = Vec::with_capacity(batch.len());
for (chunk, embedding) in batch.iter().zip(embeddings.iter()) {
let mut metadata = base_metadata.clone();
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("chunk_index".to_string(), json!(global_idx));
map.insert("total_chunks".to_string(), json!(total_chunks));
}
let chunk_hash = compute_content_hash(chunk);
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("file_hash".to_string(), json!(content_hash));
}
let doc = ChromaDocument::new_flat_with_hash(
format!("{}_{}", path.to_str().unwrap_or("unknown"), global_idx),
namespace.to_string(),
embedding.clone(),
metadata,
chunk.clone(),
chunk_hash,
);
batch_docs.push(doc);
global_idx += 1;
}
self.persist_documents(batch_docs).await?;
total_stored += batch.len();
tracing::info!("Stored {}/{} chunks", total_stored, total_chunks);
}
Ok(total_chunks)
}
pub async fn index_text(
&self,
namespace: Option<&str>,
id: String,
text: String,
metadata: serde_json::Value,
) -> Result<String> {
self.index_text_with_mode(namespace, id, text, metadata, SliceMode::default())
.await
}
pub async fn index_text_with_mode(
&self,
namespace: Option<&str>,
id: String,
text: String,
metadata: serde_json::Value,
slice_mode: SliceMode,
) -> Result<String> {
let ns = namespace.unwrap_or(DEFAULT_NAMESPACE).to_string();
let slice_mode_name = match slice_mode {
SliceMode::Onion => "onion",
SliceMode::OnionFast => "onion-fast",
SliceMode::Flat => "flat",
};
match slice_mode {
SliceMode::Onion | SliceMode::OnionFast => {
let config = OnionSliceConfig::default();
let slices = if slice_mode == SliceMode::OnionFast {
create_onion_slices_fast(&text, &metadata, &config)
} else {
create_onion_slices(&text, &metadata, &config)
};
let slice_contents: Vec<String> =
slices.iter().map(|s| s.content.clone()).collect();
let embeddings = self.embed_chunks(&slice_contents).await?;
let mut documents = Vec::with_capacity(slices.len());
for (slice, embedding) in slices.iter().zip(embeddings.iter()) {
let mut meta = metadata.clone();
if let serde_json::Value::Object(ref mut map) = meta {
map.insert("layer".to_string(), json!(slice.layer.name()));
map.insert("original_id".to_string(), json!(id));
map.insert("slice_mode".to_string(), json!(slice_mode_name));
}
let doc = ChromaDocument::from_onion_slice(
slice,
ns.clone(),
embedding.clone(),
meta,
);
documents.push(doc);
}
self.persist_documents(documents).await?;
Ok(slices
.iter()
.find(|s| s.layer == SliceLayer::Outer)
.map(|s| s.id.clone())
.unwrap_or(id))
}
SliceMode::Flat => {
let embedding = self.embed_query(&text).await?;
let mut metadata = metadata;
if let serde_json::Value::Object(ref mut map) = metadata {
map.insert("slice_mode".to_string(), json!(slice_mode_name));
}
let doc = ChromaDocument::new_flat(id.clone(), ns, embedding, metadata, text);
self.persist_documents(vec![doc]).await?;
Ok(id)
}
}
}
pub async fn memory_upsert(
&self,
namespace: &str,
id: String,
text: String,
metadata: serde_json::Value,
) -> Result<()> {
let slice_mode = match metadata
.get("slice_mode")
.and_then(|value| value.as_str())
.map(|value| value.to_ascii_lowercase())
.as_deref()
{
Some("onion") => SliceMode::Onion,
Some("onion-fast") | Some("onion_fast") | Some("fast") => SliceMode::OnionFast,
Some("flat") | None => SliceMode::Flat,
Some(other) => {
return Err(anyhow!(
"Unsupported metadata.slice_mode '{}'. Use 'flat', 'onion', or 'onion-fast'.",
other
));
}
};
self.delete_memory_family(namespace, &id).await?;
self.index_text_with_mode(Some(namespace), id, text, metadata, slice_mode)
.await?;
Ok(())
}
pub async fn lookup_memory(&self, namespace: &str, id: &str) -> Result<Option<SearchResult>> {
if let Some(doc) = self.storage.get_document(namespace, id).await? {
let layer = doc.slice_layer();
return Ok(Some(SearchResult {
id: doc.id,
namespace: doc.namespace,
text: doc.document,
score: 1.0,
metadata: doc.metadata,
layer,
parent_id: doc.parent_id,
children_ids: doc.children_ids,
keywords: doc.keywords,
}));
}
if let Some(doc) = Self::preferred_memory_family_document(
self.load_memory_family(namespace, id).await?,
id,
) {
let layer = doc.slice_layer();
return Ok(Some(SearchResult {
id: doc.id,
namespace: doc.namespace,
text: doc.document,
score: 1.0,
metadata: doc.metadata,
layer,
parent_id: doc.parent_id,
children_ids: doc.children_ids,
keywords: doc.keywords,
}));
}
Ok(None)
}
pub async fn remove_memory(&self, namespace: &str, id: &str) -> Result<usize> {
self.delete_memory_family(namespace, id).await
}
pub async fn clear_namespace(&self, namespace: &str) -> Result<usize> {
self.clear_namespace_from_indices(namespace).await
}
pub async fn search_memory(
&self,
namespace: &str,
query: &str,
k: usize,
) -> Result<Vec<SearchResult>> {
self.search_with_options(Some(namespace), query, k, SearchOptions::default())
.await
}
pub async fn memory_search_with_layer(
&self,
namespace: &str,
query: &str,
k: usize,
layer: Option<SliceLayer>,
) -> Result<Vec<SearchResult>> {
self.search_with_options(
Some(namespace),
query,
k,
SearchOptions {
layer_filter: layer,
project_filter: None,
},
)
.await
}
pub async fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
self.search_inner(None, query, k).await
}
pub async fn search_inner(
&self,
namespace: Option<&str>,
query: &str,
k: usize,
) -> Result<Vec<SearchResult>> {
self.search_with_options(namespace, query, k, SearchOptions::default())
.await
}
pub async fn search_with_options(
&self,
namespace: Option<&str>,
query: &str,
k: usize,
options: SearchOptions,
) -> Result<Vec<SearchResult>> {
let query_embedding = self.embed_query(query).await?;
let candidate_multiplier = if options.project_filter.is_some() {
8
} else {
3
};
let mut candidates = self
.storage
.search_store_with_layer(
namespace,
query_embedding.clone(),
k * candidate_multiplier,
options.layer_filter,
)
.await?;
if let Some(project) = options.project_filter.as_deref() {
candidates.retain(|candidate| metadata_matches_project(&candidate.metadata, project));
}
if !candidates.is_empty() {
let documents: Vec<String> = candidates.iter().map(|c| c.document.clone()).collect();
let metadatas: Vec<serde_json::Value> =
candidates.iter().map(|c| c.metadata.clone()).collect();
let reranked = match self.mlx_bridge.lock().await.rerank(query, &documents).await {
Ok(r) => Some(r),
Err(e) => {
tracing::warn!("MLX rerank failed, using cosine fallback: {}", e);
None
}
};
let reranked = if let Some(r) = reranked {
r
} else {
let doc_embeddings = self.ensure_doc_embeddings(&documents, &candidates).await?;
let scores = doc_embeddings
.iter()
.enumerate()
.map(|(idx, emb)| (idx, cosine(&query_embedding, emb)))
.collect::<Vec<_>>();
let mut scores = scores;
scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scores
};
let results: Vec<SearchResult> = reranked
.into_iter()
.take(k)
.filter_map(|(idx, score)| {
candidates.get(idx).map(|candidate| {
SearchResult {
id: candidate.id.clone(),
namespace: candidate.namespace.clone(),
text: candidate.document.clone(),
score,
metadata: metadatas.get(idx).cloned().unwrap_or_else(|| json!({})),
layer: candidate.slice_layer(),
parent_id: candidate.parent_id.clone(),
children_ids: candidate.children_ids.clone(),
keywords: candidate.keywords.clone(),
}
})
})
.collect();
return Ok(results);
}
Ok(vec![])
}
pub async fn expand_result(&self, namespace: &str, id: &str) -> Result<Vec<SearchResult>> {
let children = self.storage.get_children(namespace, id).await?;
Ok(children
.into_iter()
.map(|doc| {
let layer = doc.slice_layer();
SearchResult {
id: doc.id,
namespace: doc.namespace,
text: doc.document,
score: 1.0,
metadata: doc.metadata,
layer,
parent_id: doc.parent_id,
children_ids: doc.children_ids,
keywords: doc.keywords,
}
})
.collect())
}
pub async fn get_parent_result(
&self,
namespace: &str,
id: &str,
) -> Result<Option<SearchResult>> {
if let Some(parent) = self.storage.get_parent(namespace, id).await? {
let layer = parent.slice_layer();
return Ok(Some(SearchResult {
id: parent.id,
namespace: parent.namespace,
text: parent.document,
score: 1.0,
metadata: parent.metadata,
layer,
parent_id: parent.parent_id,
children_ids: parent.children_ids,
keywords: parent.keywords,
}));
}
Ok(None)
}
async fn extract_text(&self, path: &Path) -> Result<String> {
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
if ext == "pdf" {
let path = path.to_path_buf();
let pdf_text =
tokio::task::spawn_blocking(move || pdf_extract::extract_text(&path)).await??;
return Ok(pdf_text);
}
let (_p, content) = crate::path_utils::safe_read_to_string_async(path).await?;
Ok(content)
}
async fn extract_json_documents(
&self,
path: &Path,
) -> Result<Vec<(String, String, serde_json::Value)>> {
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
if ext != "json" {
let text = self.extract_text(path).await?;
let doc_id = format!("{}:0", path.display());
let metadata = json!({ "path": path.to_str(), "index": 0 });
return Ok(vec![(doc_id, text, metadata)]);
}
let (_p, raw) = crate::path_utils::safe_read_to_string_async(path).await?;
let parsed: serde_json::Value = match serde_json::from_str(&raw) {
Ok(v) => v,
Err(_) => {
let doc_id = format!("{}:0", path.display());
let metadata = json!({ "path": path.to_str(), "index": 0 });
return Ok(vec![(doc_id, raw, metadata)]);
}
};
if let serde_json::Value::Array(arr) = parsed {
let mut docs = Vec::new();
let mut used_smart_extraction = false;
for item in arr.iter() {
if let Some(mut conv_docs) = extract_conversation_documents(item, path) {
docs.append(&mut conv_docs);
used_smart_extraction = true;
}
}
if used_smart_extraction && !docs.is_empty() {
tracing::info!(
"Conversation array detected: {} -> {} messages",
path.display(),
docs.len()
);
return Ok(docs);
}
docs.clear();
for (idx, item) in arr.iter().enumerate() {
let doc_id = format!("{}:{}", path.display(), idx);
let content = extract_json_element_content(item);
if content.len() > 50 {
let metadata = json!({
"path": path.to_str(),
"index": idx,
"total_elements": arr.len(),
"element_type": detect_json_element_type(item),
});
docs.push((doc_id, content, metadata));
}
}
if docs.is_empty() {
let doc_id = format!("{}:0", path.display());
let metadata = json!({ "path": path.to_str(), "index": 0 });
return Ok(vec![(doc_id, raw, metadata)]);
}
tracing::info!(
"JSON array detected: {} -> {} documents",
path.display(),
docs.len()
);
return Ok(docs);
}
if let Some(docs) = extract_conversation_documents(&parsed, path) {
return Ok(docs);
}
let content = extract_json_element_content(&parsed);
let doc_id = format!("{}:0", path.display());
let metadata = json!({ "path": path.to_str(), "index": 0 });
Ok(vec![(doc_id, content, metadata)])
}
async fn embed_chunks(&self, chunks: &[String]) -> Result<Vec<Vec<f32>>> {
self.mlx_bridge.lock().await.embed_batch(chunks).await
}
async fn embed_query(&self, query: &str) -> Result<Vec<f32>> {
self.mlx_bridge.lock().await.embed(query).await
}
async fn ensure_doc_embeddings(
&self,
documents: &[String],
candidates: &[ChromaDocument],
) -> Result<Vec<Vec<f32>>> {
let has_all = candidates.iter().all(|c| !c.embedding.is_empty());
if has_all {
return Ok(candidates.iter().map(|c| c.embedding.clone()).collect());
}
self.mlx_bridge.lock().await.embed_batch(documents).await
}
fn chunk_text(&self, text: &str, target_size: usize, overlap: usize) -> Result<Vec<String>> {
let sentences = split_into_sentences(text);
if sentences.is_empty() {
return Ok(vec![text.to_string()]);
}
if text.chars().count() <= target_size {
return Ok(vec![text.to_string()]);
}
let mut chunks = Vec::new();
let mut current_chunk = String::new();
let mut overlap_sentences: Vec<String> = Vec::new();
let overlap_sentence_count = (overlap / 50).clamp(1, 3);
for sentence in &sentences {
let sentence_len = sentence.chars().count();
let current_len = current_chunk.chars().count();
let max_size = target_size + target_size / 2;
if current_len + sentence_len > max_size && !current_chunk.is_empty() {
chunks.push(current_chunk.trim().to_string());
current_chunk = overlap_sentences.join(" ");
if !current_chunk.is_empty() {
current_chunk.push(' ');
}
overlap_sentences.clear();
}
current_chunk.push_str(sentence);
current_chunk.push(' ');
overlap_sentences.push(sentence.clone());
if overlap_sentences.len() > overlap_sentence_count {
overlap_sentences.remove(0);
}
if current_chunk.chars().count() >= target_size {
chunks.push(current_chunk.trim().to_string());
current_chunk = overlap_sentences.join(" ");
if !current_chunk.is_empty() {
current_chunk.push(' ');
}
overlap_sentences.clear();
}
}
let remaining = current_chunk.trim();
if !remaining.is_empty() {
if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
let last_idx = chunks.len() - 1;
chunks[last_idx].push(' ');
chunks[last_idx].push_str(remaining);
} else {
chunks.push(remaining.to_string());
}
}
if chunks.is_empty() {
chunks.push(text.to_string());
}
Ok(chunks)
}
}
#[derive(Debug, Clone)]
pub struct ContextPrefixConfig {
pub include_source: bool,
pub include_section: bool,
pub include_doc_type: bool,
pub max_prefix_length: usize,
}
impl Default for ContextPrefixConfig {
fn default() -> Self {
Self {
include_source: true,
include_section: true,
include_doc_type: true,
max_prefix_length: 100,
}
}
}
#[derive(Debug, Clone)]
pub struct EnrichedChunk {
pub content: String,
pub original_content: String,
pub doc_path: String,
pub chunk_index: usize,
pub section: Option<String>,
pub doc_type: Option<String>,
}
pub fn create_enriched_chunks(
content: &str,
doc_path: &str,
chunk_size: usize,
overlap: usize,
config: &ContextPrefixConfig,
) -> Vec<EnrichedChunk> {
let doc_type = detect_doc_type(doc_path);
let filename = std::path::Path::new(doc_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
let sections = extract_sections(content);
let mut enriched_chunks = Vec::new();
let mut global_chunk_index = 0;
for (section_header, section_content) in sections {
let chunks = smart_chunk_text(section_content, chunk_size, overlap);
for chunk in chunks {
let prefix = build_context_prefix(
filename,
section_header.as_deref(),
doc_type.as_deref(),
config,
);
let full_content = if prefix.is_empty() {
chunk.clone()
} else {
format!("{}\n\n{}", prefix, chunk)
};
enriched_chunks.push(EnrichedChunk {
content: full_content,
original_content: chunk,
doc_path: doc_path.to_string(),
chunk_index: global_chunk_index,
section: section_header.clone(),
doc_type: doc_type.clone(),
});
global_chunk_index += 1;
}
}
if enriched_chunks.is_empty() && !content.trim().is_empty() {
let prefix = build_context_prefix(filename, None, doc_type.as_deref(), config);
let full_content = if prefix.is_empty() {
content.to_string()
} else {
format!("{}\n\n{}", prefix, content)
};
enriched_chunks.push(EnrichedChunk {
content: full_content,
original_content: content.to_string(),
doc_path: doc_path.to_string(),
chunk_index: 0,
section: None,
doc_type,
});
}
enriched_chunks
}
fn build_context_prefix(
filename: &str,
section: Option<&str>,
doc_type: Option<&str>,
config: &ContextPrefixConfig,
) -> String {
let mut parts = Vec::new();
if config.include_source && !filename.is_empty() {
parts.push(format!("[Source: {}]", filename));
}
if config.include_section
&& let Some(sec) = section
{
parts.push(format!("[Section: {}]", sec));
}
if config.include_doc_type
&& let Some(dt) = doc_type
{
parts.push(format!("[Type: {}]", dt));
}
let prefix = parts.join(" ");
if prefix.len() > config.max_prefix_length {
prefix.chars().take(config.max_prefix_length).collect()
} else {
prefix
}
}
fn detect_doc_type(path: &str) -> Option<String> {
let ext = std::path::Path::new(path)
.extension()
.and_then(|e| e.to_str())
.map(|s| s.to_lowercase())?;
let doc_type = match ext.as_str() {
"rs" => "Rust source code",
"py" => "Python source code",
"js" | "jsx" => "JavaScript source code",
"ts" | "tsx" => "TypeScript source code",
"md" => "Markdown documentation",
"txt" => "Plain text",
"json" => "JSON data",
"yaml" | "yml" => "YAML configuration",
"toml" => "TOML configuration",
"html" => "HTML document",
"css" => "CSS stylesheet",
"sql" => "SQL query",
"sh" | "bash" => "Shell script",
"pdf" => "PDF document",
_ => return None,
};
Some(doc_type.to_string())
}
fn extract_sections(content: &str) -> Vec<(Option<String>, &str)> {
let header_pattern = regex::Regex::new(r"(?m)^(#{1,6})\s+(.+)$").ok();
if let Some(re) = header_pattern {
let mut sections = Vec::new();
let mut last_end = 0;
let mut current_header: Option<String> = None;
for caps in re.captures_iter(content) {
let Some(full_match) = caps.get(0) else {
continue;
};
let Some(header_match) = caps.get(2) else {
continue;
};
let match_start = full_match.start();
if match_start > last_end {
let section_content = &content[last_end..match_start];
if !section_content.trim().is_empty() {
sections.push((current_header.clone(), section_content.trim()));
}
}
current_header = Some(header_match.as_str().to_string());
last_end = full_match.end();
}
if last_end < content.len() {
let section_content = &content[last_end..];
if !section_content.trim().is_empty() {
sections.push((current_header, section_content.trim()));
}
}
if sections.is_empty() {
vec![(None, content)]
} else {
sections
}
} else {
vec![(None, content)]
}
}
fn smart_chunk_text(text: &str, target_size: usize, overlap: usize) -> Vec<String> {
let sentences = split_into_sentences(text);
if sentences.is_empty() || text.chars().count() <= target_size {
return vec![text.to_string()];
}
let mut chunks = Vec::new();
let mut current_chunk = String::new();
let mut overlap_sentences: Vec<String> = Vec::new();
let overlap_sentence_count = (overlap / 50).clamp(1, 3);
for sentence in &sentences {
let sentence_len = sentence.chars().count();
let current_len = current_chunk.chars().count();
let max_size = target_size + target_size / 2;
if current_len + sentence_len > max_size && !current_chunk.is_empty() {
chunks.push(current_chunk.trim().to_string());
current_chunk = overlap_sentences.join(" ");
if !current_chunk.is_empty() {
current_chunk.push(' ');
}
overlap_sentences.clear();
}
current_chunk.push_str(sentence);
current_chunk.push(' ');
overlap_sentences.push(sentence.clone());
if overlap_sentences.len() > overlap_sentence_count {
overlap_sentences.remove(0);
}
if current_chunk.chars().count() >= target_size {
chunks.push(current_chunk.trim().to_string());
current_chunk = overlap_sentences.join(" ");
if !current_chunk.is_empty() {
current_chunk.push(' ');
}
overlap_sentences.clear();
}
}
let remaining = current_chunk.trim();
if !remaining.is_empty() {
if remaining.chars().count() < target_size / 4 && !chunks.is_empty() {
let last_idx = chunks.len() - 1;
chunks[last_idx].push(' ');
chunks[last_idx].push_str(remaining);
} else {
chunks.push(remaining.to_string());
}
}
if chunks.is_empty() {
chunks.push(text.to_string());
}
chunks
}
fn split_into_sentences(text: &str) -> Vec<String> {
let mut sentences = Vec::new();
let mut current = String::new();
let mut chars = text.chars().peekable();
while let Some(c) = chars.next() {
current.push(c);
if matches!(c, '.' | '!' | '?') {
if let Some(&next) = chars.peek() {
if next.is_whitespace() {
let trimmed = current.trim();
let is_abbreviation = trimmed.ends_with("Mr.")
|| trimmed.ends_with("Mrs.")
|| trimmed.ends_with("Dr.")
|| trimmed.ends_with("Prof.")
|| trimmed.ends_with("vs.")
|| trimmed.ends_with("etc.")
|| trimmed.ends_with("e.g.")
|| trimmed.ends_with("i.e.")
|| (trimmed.len() >= 2 && trimmed.chars().rev().nth(1).map(|c| c.is_uppercase()).unwrap_or(false));
if !is_abbreviation {
sentences.push(current.trim().to_string());
current = String::new();
chars.next();
}
}
} else {
sentences.push(current.trim().to_string());
current = String::new();
}
} else if c == '\n' {
if let Some(&next) = chars.peek()
&& next == '\n'
{
if !current.trim().is_empty() {
sentences.push(current.trim().to_string());
current = String::new();
}
chars.next(); }
}
}
let remaining = current.trim();
if !remaining.is_empty() {
sentences.push(remaining.to_string());
}
sentences
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SearchOptions {
pub layer_filter: Option<SliceLayer>,
pub project_filter: Option<String>,
}
impl SearchOptions {
pub fn outer_only() -> Self {
Self {
layer_filter: Some(SliceLayer::Outer),
project_filter: None,
}
}
pub fn deep() -> Self {
Self {
layer_filter: None,
project_filter: None,
}
}
pub fn with_project(mut self, project: Option<String>) -> Self {
self.project_filter = project.filter(|value| !value.trim().is_empty());
self
}
}
impl Default for SearchOptions {
fn default() -> Self {
Self::outer_only()
}
}
fn metadata_matches_project(metadata: &Value, project: &str) -> bool {
let needle = project.trim();
if needle.is_empty() {
return true;
}
metadata.as_object().is_some_and(|object| {
["project", "project_id", "source_project"]
.iter()
.filter_map(|key| object.get(*key))
.filter_map(|value| value.as_str())
.any(|value| value.eq_ignore_ascii_case(needle))
})
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SearchResult {
pub id: String,
pub namespace: String,
pub text: String,
pub score: f32,
pub metadata: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub layer: Option<SliceLayer>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub children_ids: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub keywords: Vec<String>,
}
impl SearchResult {
pub fn new_legacy(
id: String,
namespace: String,
text: String,
score: f32,
metadata: serde_json::Value,
) -> Self {
Self {
id,
namespace,
text,
score,
metadata,
layer: None,
parent_id: None,
children_ids: vec![],
keywords: vec![],
}
}
pub fn can_expand(&self) -> bool {
!self.children_ids.is_empty()
}
pub fn can_drill_up(&self) -> bool {
self.parent_id.is_some()
}
}
fn cosine(a: &[f32], b: &[f32]) -> f32 {
let mut dot = 0.0_f32;
let mut norm_a = 0.0_f32;
let mut norm_b = 0.0_f32;
for (x, y) in a.iter().zip(b.iter()) {
dot += x * y;
norm_a += x * x;
norm_b += y * y;
}
if norm_a == 0.0 || norm_b == 0.0 {
return 0.0;
}
dot / (norm_a.sqrt() * norm_b.sqrt())
}
#[cfg(test)]
mod tests {
use super::{
SearchOptions, SliceLayer, extract_keywords, hash_content, metadata_matches_project,
};
use serde_json::json;
#[test]
fn short_hash_uses_sha256_prefix_with_minimum_length() {
let hash = hash_content("same content");
assert_eq!(hash.len(), 16);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
assert_eq!(hash, hash_content("same content"));
}
#[test]
fn keyword_extraction_splits_paths_and_filters_session_tokens() {
let keywords = extract_keywords(
"/Users/silver/Git/tools/TwinSweep session 2ff4de8b9a4e1234567890abcdef notes",
10,
);
assert!(keywords.contains(&"users".to_string()));
assert!(keywords.contains(&"twinsweep".to_string()));
assert!(!keywords.iter().any(|keyword| keyword.contains("2ff4de8b")));
}
#[test]
fn search_options_can_carry_project_filter() {
let options = SearchOptions::deep().with_project(Some("Vista".to_string()));
assert_eq!(options.layer_filter, None);
assert_eq!(options.project_filter.as_deref(), Some("Vista"));
}
#[test]
fn project_match_uses_metadata_fields() {
assert!(metadata_matches_project(
&json!({"project": "Vista"}),
"vista"
));
assert!(metadata_matches_project(
&json!({"project_id": "VetCoders"}),
"vetcoders"
));
assert!(!metadata_matches_project(
&json!({"project": "rmcp-memex"}),
"vista"
));
assert_eq!(
SearchOptions::default().layer_filter,
Some(SliceLayer::Outer)
);
}
}