#![warn(clippy::all)]
pub mod chunk;
pub mod detect;
pub mod diary;
pub mod lock;
pub mod noise;
pub mod normalize;
pub mod reindex;
use std::path::{Path, PathBuf};
use crate::core::{
db::Database,
types::{BootstrapEvidenceArgs, Drawer, SourceType},
utils::{build_bootstrap_evidence_drawer_id, current_timestamp, route_room_from_taxonomy},
};
use crate::embed::{EmbedError, Embedder};
use thiserror::Error;
use crate::ingest::{
chunk::{chunk_conversation, chunk_text},
detect::{Format, detect_format},
normalize::{
CURRENT_NORMALIZE_VERSION, NormalizeError, NormalizeOptions, normalize_content_with_options,
},
};
const CHUNK_WINDOW: usize = 800;
const CHUNK_OVERLAP: usize = 100;
const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
fn mempal_home_from_db(db: &Database) -> PathBuf {
db.path()
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."))
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct IngestStats {
pub files: usize,
pub chunks: usize,
pub skipped: usize,
pub noise_bytes_stripped: Option<u64>,
pub lock_wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct IngestOptions<'a> {
pub room: Option<&'a str>,
pub source_root: Option<&'a Path>,
pub dry_run: bool,
pub source_file_override: Option<&'a str>,
pub replace_existing_source: bool,
pub replace_across_rooms: bool,
pub no_strip_noise: bool,
pub diary_rollup: bool,
pub diary_rollup_day: Option<&'a str>,
}
pub type Result<T> = std::result::Result<T, IngestError>;
#[derive(Debug, Error)]
pub enum IngestError {
#[error("failed to read {path}")]
ReadFile {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to normalize {path}")]
Normalize {
path: PathBuf,
#[source]
source: NormalizeError,
},
#[error("failed to load taxonomy for wing {wing}")]
LoadTaxonomy {
wing: String,
#[source]
source: crate::core::db::DbError,
},
#[error("failed to embed chunks from {path}")]
EmbedChunks {
path: PathBuf,
#[source]
source: EmbedError,
},
#[error("failed to check drawer {drawer_id}")]
CheckDrawer {
drawer_id: String,
#[source]
source: crate::core::db::DbError,
},
#[error("failed to insert drawer {drawer_id}")]
InsertDrawer {
drawer_id: String,
#[source]
source: crate::core::db::DbError,
},
#[error("failed to replace source drawers for {source_file}")]
ReplaceSource {
source_file: String,
#[source]
source: crate::core::db::DbError,
},
#[error("failed to insert vector for {drawer_id}")]
InsertVector {
drawer_id: String,
#[source]
source: crate::core::db::DbError,
},
#[error("diary_rollup requires wing=\"agent-diary\", got wing=\"{wing}\"")]
DiaryRollupWrongWing { wing: String },
#[error("diary_rollup requires an explicit non-empty room")]
DiaryRollupMissingRoom,
#[error(
"daily rollup drawer {drawer_id} would exceed {limit_bytes} bytes ({attempted_bytes} bytes)"
)]
DailyRollupFull {
drawer_id: String,
limit_bytes: usize,
attempted_bytes: usize,
},
#[error("embedder returned no vector for {drawer_id}")]
EmbedderReturnedNoVector { drawer_id: String },
#[error("failed to acquire ingest lock: {0}")]
Lock(#[from] lock::LockError),
#[error("failed to read directory {path}")]
ReadDir {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to read entry in {path}")]
ReadDirEntry {
path: PathBuf,
#[source]
source: std::io::Error,
},
}
pub async fn ingest_file<E: Embedder + ?Sized>(
db: &Database,
embedder: &E,
path: &Path,
wing: &str,
room: Option<&str>,
) -> Result<IngestStats> {
ingest_file_with_options(
db,
embedder,
path,
wing,
IngestOptions {
room,
source_root: path.parent(),
dry_run: false,
source_file_override: None,
replace_existing_source: false,
replace_across_rooms: false,
no_strip_noise: false,
diary_rollup: false,
diary_rollup_day: None,
},
)
.await
}
pub async fn ingest_file_with_options<E: Embedder + ?Sized>(
db: &Database,
embedder: &E,
path: &Path,
wing: &str,
options: IngestOptions<'_>,
) -> Result<IngestStats> {
let bytes = tokio::fs::read(path)
.await
.map_err(|source| IngestError::ReadFile {
path: path.to_path_buf(),
source,
})?;
let content = String::from_utf8_lossy(&bytes).to_string();
if content.trim().is_empty() {
return Ok(IngestStats {
files: 1,
..IngestStats::default()
});
}
let format = detect_format(&content);
let normalize_output = normalize_content_with_options(
&content,
format,
NormalizeOptions {
strip_noise: !options.no_strip_noise,
},
)
.map_err(|source| IngestError::Normalize {
path: path.to_path_buf(),
source,
})?;
let normalized = normalize_output.content;
let noise_bytes_stripped = normalize_output.noise_bytes_stripped;
if options.diary_rollup {
let mut outcome = diary::ingest_diary_rollup(
db,
embedder,
&normalized,
wing,
diary::DiaryRollupOptions {
room: options.room,
day: options.diary_rollup_day,
dry_run: options.dry_run,
importance: 0,
},
)
.await?;
outcome.stats.noise_bytes_stripped = noise_bytes_stripped;
return Ok(outcome.stats);
}
let resolved_room = match options.room {
Some(room) => room.to_string(),
None => {
let taxonomy = db
.taxonomy_entries()
.map_err(|source| IngestError::LoadTaxonomy {
wing: wing.to_string(),
source,
})?;
route_room_from_taxonomy(&normalized, wing, &taxonomy)
}
};
let chunks = match format {
Format::ClaudeJsonl | Format::ChatGptJson | Format::CodexJsonl | Format::SlackJson => {
chunk_conversation(&normalized)
}
Format::PlainText => chunk_text(&normalized, CHUNK_WINDOW, CHUNK_OVERLAP),
};
if chunks.is_empty() {
return Ok(IngestStats {
files: 1,
..IngestStats::default()
});
}
let mut stats = IngestStats {
files: 1,
noise_bytes_stripped,
..IngestStats::default()
};
let source_file = options
.source_file_override
.map(ToOwned::to_owned)
.unwrap_or_else(|| normalize_source_file(path, options.source_root));
let _lock_guard = if options.dry_run {
None
} else {
let home = mempal_home_from_db(db);
let key = lock::source_key(Path::new(&source_file));
let guard = lock::acquire_source_lock(&home, &key, LOCK_TIMEOUT)?;
stats.lock_wait_ms = Some(guard.wait_duration().as_millis() as u64);
Some(guard)
};
let source_type = source_type_for(format);
if options.replace_existing_source && !options.dry_run {
let replace_result = if options.replace_across_rooms {
db.replace_active_source_drawers_across_rooms(&source_file, wing)
} else {
db.replace_active_source_drawers(&source_file, wing, Some(resolved_room.as_str()))
};
replace_result.map_err(|source| IngestError::ReplaceSource {
source_file: source_file.clone(),
source,
})?;
}
let mut pending = Vec::new();
for (chunk_index, chunk) in chunks.iter().enumerate() {
let drawer_id = build_bootstrap_evidence_drawer_id(
wing,
Some(resolved_room.as_str()),
chunk,
&source_type,
);
if db
.drawer_exists(&drawer_id)
.map_err(|source| IngestError::CheckDrawer {
drawer_id: drawer_id.clone(),
source,
})?
{
stats.skipped += 1;
continue;
}
if options.dry_run {
stats.chunks += 1;
continue;
}
pending.push((chunk_index, chunk, drawer_id));
}
if options.dry_run || pending.is_empty() {
return Ok(stats);
}
let chunk_refs = pending
.iter()
.map(|(_, chunk, _)| chunk.as_ref())
.collect::<Vec<_>>();
let vectors = embedder
.embed(&chunk_refs)
.await
.map_err(|source| IngestError::EmbedChunks {
path: path.to_path_buf(),
source,
})?;
for ((chunk_index, chunk, drawer_id), vector) in pending.into_iter().zip(vectors) {
let drawer = Drawer::new_bootstrap_evidence(BootstrapEvidenceArgs {
id: drawer_id.clone(),
content: chunk.to_string(),
wing: wing.to_string(),
room: Some(resolved_room.clone()),
source_file: Some(source_file.clone()),
source_type: source_type.clone(),
added_at: current_timestamp(),
chunk_index: Some(chunk_index as i64),
importance: 0,
});
let drawer = Drawer {
normalize_version: CURRENT_NORMALIZE_VERSION,
..drawer
};
db.insert_drawer(&drawer)
.map_err(|source| IngestError::InsertDrawer {
drawer_id: drawer.id.clone(),
source,
})?;
db.insert_vector(&drawer_id, &vector)
.map_err(|source| IngestError::InsertVector {
drawer_id: drawer.id.clone(),
source,
})?;
stats.chunks += 1;
}
Ok(stats)
}
pub async fn ingest_dir<E: Embedder + ?Sized>(
db: &Database,
embedder: &E,
dir: &Path,
wing: &str,
room: Option<&str>,
) -> Result<IngestStats> {
ingest_dir_with_options(
db,
embedder,
dir,
wing,
IngestOptions {
room,
source_root: Some(dir),
dry_run: false,
source_file_override: None,
replace_existing_source: false,
replace_across_rooms: false,
no_strip_noise: false,
diary_rollup: false,
diary_rollup_day: None,
},
)
.await
}
pub async fn ingest_dir_with_options<E: Embedder + ?Sized>(
db: &Database,
embedder: &E,
dir: &Path,
wing: &str,
options: IngestOptions<'_>,
) -> Result<IngestStats> {
let mut stats = IngestStats::default();
let mut stack = vec![dir.to_path_buf()];
while let Some(current) = stack.pop() {
for entry in std::fs::read_dir(¤t).map_err(|source| IngestError::ReadDir {
path: current.clone(),
source,
})? {
let entry = entry.map_err(|source| IngestError::ReadDirEntry {
path: current.clone(),
source,
})?;
let path = entry.path();
if path.is_dir() {
if should_skip_dir(&path) {
continue;
}
stack.push(path);
continue;
}
if path.is_file() {
let file_stats =
ingest_file_with_options(db, embedder, &path, wing, options).await?;
stats.files += file_stats.files;
stats.chunks += file_stats.chunks;
stats.skipped += file_stats.skipped;
stats.noise_bytes_stripped =
merge_optional_sum(stats.noise_bytes_stripped, file_stats.noise_bytes_stripped);
}
}
}
Ok(stats)
}
fn merge_optional_sum(left: Option<u64>, right: Option<u64>) -> Option<u64> {
match (left, right) {
(Some(left), Some(right)) => Some(left + right),
(Some(value), None) | (None, Some(value)) => Some(value),
(None, None) => None,
}
}
fn source_type_for(format: Format) -> SourceType {
match format {
Format::ClaudeJsonl | Format::ChatGptJson | Format::CodexJsonl | Format::SlackJson => {
SourceType::Conversation
}
Format::PlainText => SourceType::Project,
}
}
fn should_skip_dir(path: &Path) -> bool {
path.file_name()
.and_then(|name| name.to_str())
.map(|name| matches!(name, ".git" | "target" | "node_modules"))
.unwrap_or(false)
}
fn normalize_source_file(path: &Path, source_root: Option<&Path>) -> String {
let normalized = source_root
.and_then(|root| path.strip_prefix(root).ok())
.filter(|relative| !relative.as_os_str().is_empty())
.map(Path::to_path_buf)
.or_else(|| path.file_name().map(PathBuf::from))
.unwrap_or_else(|| path.to_path_buf());
normalized.to_string_lossy().replace('\\', "/")
}