use anyhow::{Context, Result};
use rusqlite::params;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::chunk::{Chunk, ChunkOptions, chunk_text};
use crate::ignore::IgnoreRules;
use crate::jsonl::extract_jsonl;
use crate::store::Store;
const JSONL_KIND: &str = "application/jsonl";
const MAX_INGEST_BYTES: u64 = 50 * 1024 * 1024;
#[derive(Debug, Serialize)]
pub struct IngestReport {
pub ingested: Vec<IngestedSource>,
pub skipped: Vec<SkippedSource>,
#[serde(default, skip_serializing_if = "is_zero")]
pub ignored: usize,
}
fn is_zero(n: &usize) -> bool {
*n == 0
}
#[derive(Debug, Default, Clone)]
pub struct IngestOptions {
pub no_ignore: bool,
}
#[derive(Debug, Serialize)]
pub struct IngestedSource {
pub source_id: String,
pub uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
pub kind: String,
pub bytes: u64,
pub content_sha256: String,
pub chunks: usize,
}
#[derive(Debug, Serialize)]
pub struct SkippedSource {
pub uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub skipped_reason: Option<String>,
pub reason: String,
}
pub fn ingest_path(store: &mut Store, path: &Path) -> Result<IngestReport> {
ingest_path_with(store, path, &IngestOptions::default())
}
pub const DEFAULT_FOLLOW_INTERVAL_SECS: u64 = 5;
#[derive(Debug, Clone)]
pub struct FollowOptions {
pub interval: Duration,
pub max_iterations: Option<usize>,
}
impl FollowOptions {
pub fn new(interval: Duration) -> Self {
Self {
interval,
max_iterations: None,
}
}
}
pub fn follow_path_with(
store: &mut Store,
path: &Path,
opts: &IngestOptions,
follow_opts: &FollowOptions,
mut on_pass: impl FnMut(usize, &IngestReport),
) -> Result<()> {
if follow_opts.interval.is_zero() {
anyhow::bail!("follow interval must be greater than zero");
}
let mut iteration: usize = 0;
loop {
let report = ingest_path_with(store, path, opts)?;
on_pass(iteration, &report);
iteration += 1;
if let Some(max) = follow_opts.max_iterations {
if iteration >= max {
return Ok(());
}
}
std::thread::sleep(follow_opts.interval);
}
}
pub fn ingest_path_with(
store: &mut Store,
path: &Path,
opts: &IngestOptions,
) -> Result<IngestReport> {
if path_is_fifo(path) {
return ingest_fifo(store, path);
}
let rules = if opts.no_ignore {
IgnoreRules::disabled()
} else {
IgnoreRules::load(path)?
};
let (targets, ignored) = collect_targets(path, &rules)?;
let mut report = IngestReport {
ingested: Vec::new(),
skipped: Vec::new(),
ignored,
};
for file in targets {
let uri = path_to_uri(&file);
if let Ok(meta) = fs::metadata(&file) {
if meta.is_file() && meta.len() > MAX_INGEST_BYTES {
let reason = format!(
"file is {} bytes; exceeds {}-byte limit",
meta.len(),
MAX_INGEST_BYTES
);
eprintln!("warning: skipping {}: {}", file.display(), reason);
report.skipped.push(SkippedSource {
uri,
skipped_reason: Some("too_large".into()),
reason,
});
continue;
}
}
match ingest_file(store, &file) {
Ok(Some(src)) => report.ingested.push(src),
Ok(None) => report.skipped.push(skipped_source(
uri,
"unchanged",
"unchanged since last ingest".into(),
)),
Err(err) => report
.skipped
.push(skipped_source(uri, "error", format!("{err:#}"))),
}
}
Ok(report)
}
fn collect_targets(path: &Path, rules: &IgnoreRules) -> Result<(Vec<PathBuf>, usize)> {
let meta = fs::metadata(path).with_context(|| format!("stat {}", path.display()))?;
if meta.is_file() {
if !is_supported_file(path) {
anyhow::bail!("unsupported file extension: {}", path.display());
}
if rules.is_ignored(path, false) {
return Ok((Vec::new(), 1));
}
return Ok((vec![path.to_path_buf()], 0));
}
if meta.is_dir() {
let mut out = Vec::new();
let mut ignored = 0usize;
let walker = walkdir::WalkDir::new(path)
.sort_by_file_name()
.into_iter()
.filter_entry(|entry| {
let is_dir = entry.file_type().is_dir();
if rules.is_ignored(entry.path(), is_dir) {
ignored += 1;
false
} else {
true
}
});
for entry in walker {
let entry = entry.with_context(|| format!("walk {}", path.display()))?;
let p = entry.path();
if p.is_file() && is_supported_file(p) {
out.push(p.to_path_buf());
}
}
return Ok((out, ignored));
}
anyhow::bail!("not a file or directory: {}", path.display())
}
pub(crate) fn is_supported_file(p: &Path) -> bool {
matches!(
p.extension().and_then(|s| s.to_str()),
Some(
"txt"
| "md"
| "markdown"
| "jsonl"
| "ndjson"
| "rs"
| "py"
| "c"
| "h"
| "cpp"
| "hpp"
| "go"
| "js"
| "ts"
| "sh"
| "lua"
| "java"
| "kt"
| "swift"
| "toml"
| "yaml"
| "yml"
| "json"
| "cfg"
| "ini"
| "conf"
| "zig"
)
)
}
fn kind_for(p: &Path) -> &'static str {
match p.extension().and_then(|s| s.to_str()) {
Some("md" | "markdown") => "text/markdown",
Some("jsonl" | "ndjson") => JSONL_KIND,
_ => "text/plain",
}
}
fn extract_chunks(kind: &str, text: &str) -> Vec<Chunk> {
if kind == JSONL_KIND {
extract_jsonl(text)
} else {
chunk_text(text, ChunkOptions::default())
}
}
fn path_to_uri(p: &Path) -> String {
let abs = fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf());
format!("file://{}", abs.display())
}
#[cfg(unix)]
fn path_is_fifo(p: &Path) -> bool {
use std::os::unix::fs::FileTypeExt;
fs::symlink_metadata(p)
.map(|m| m.file_type().is_fifo())
.unwrap_or(false)
}
#[cfg(not(unix))]
fn path_is_fifo(_p: &Path) -> bool {
false
}
fn fifo_uri(p: &Path) -> String {
let abs = fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf());
format!("fifo://{}", abs.display())
}
pub fn ingest_fifo(store: &mut Store, path: &Path) -> Result<IngestReport> {
let bytes = fs::read(path).with_context(|| format!("read fifo {}", path.display()))?;
let kind = kind_for(path).to_string();
let base_uri = fifo_uri(path);
ingest_stdin_with(
store,
&base_uri,
Some(&kind),
&bytes,
&StdinIngestOptions { append: true },
)
}
fn source_id_for(uri: &str) -> String {
let mut h = Sha256::new();
h.update(uri.as_bytes());
hex::encode(&h.finalize()[..16])
}
fn chunk_id_for(source_id: &str, ordinal: usize, text: &str) -> String {
let mut h = Sha256::new();
h.update(source_id.as_bytes());
h.update((ordinal as u64).to_be_bytes());
h.update(text.as_bytes());
hex::encode(&h.finalize()[..16])
}
pub(crate) fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
hex::encode(h.finalize())
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
fn ingest_file(store: &mut Store, file: &Path) -> Result<Option<IngestedSource>> {
let uri = path_to_uri(file);
let bytes = fs::read(file).with_context(|| format!("read {}", file.display()))?;
let mtime_unix = fs::metadata(file)
.ok()
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_secs() as i64);
let kind = kind_for(file).to_string();
let path_display = file.to_string_lossy().into_owned();
persist(
store,
&uri,
Some(path_display),
&kind,
&bytes,
mtime_unix,
|err_ctx| format!("{} {err_ctx}", file.display()),
)
}
#[derive(Debug, Default, Clone, Copy)]
pub struct StdinIngestOptions {
pub append: bool,
}
pub fn ingest_stdin(
store: &mut Store,
uri: &str,
kind: Option<&str>,
bytes: &[u8],
) -> Result<IngestReport> {
ingest_stdin_with(store, uri, kind, bytes, &StdinIngestOptions::default())
}
pub fn ingest_stdin_with(
store: &mut Store,
uri: &str,
kind: Option<&str>,
bytes: &[u8],
opts: &StdinIngestOptions,
) -> Result<IngestReport> {
if uri.trim().is_empty() {
anyhow::bail!("stdin ingest requires a non-empty --uri");
}
let kind = kind.unwrap_or("text/plain").to_string();
let effective_uri = if opts.append {
format!("{uri}#{}", unique_suffix())
} else {
uri.to_string()
};
let mut report = IngestReport {
ingested: Vec::new(),
skipped: Vec::new(),
ignored: 0,
};
match persist(store, &effective_uri, None, &kind, bytes, None, |err_ctx| {
format!("stdin ({effective_uri}) {err_ctx}")
}) {
Ok(Some(src)) => report.ingested.push(src),
Ok(None) => report.skipped.push(skipped_source(
effective_uri,
"unchanged",
"unchanged since last ingest".into(),
)),
Err(err) => report
.skipped
.push(skipped_source(effective_uri, "error", format!("{err:#}"))),
}
Ok(report)
}
fn unique_suffix() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("{nanos}-{seq}")
}
fn skipped_source(uri: impl Into<String>, skipped_reason: &str, reason: String) -> SkippedSource {
SkippedSource {
uri: uri.into(),
skipped_reason: Some(skipped_reason.to_string()),
reason,
}
}
fn persist(
store: &mut Store,
uri: &str,
path: Option<String>,
kind: &str,
bytes: &[u8],
mtime_unix: Option<i64>,
err_ctx: impl Fn(&str) -> String,
) -> Result<Option<IngestedSource>> {
let content_sha = sha256_hex(bytes);
let text = std::str::from_utf8(bytes)
.with_context(|| err_ctx("is not valid UTF-8"))?
.to_string();
let source_id = source_id_for(uri);
let already: Option<String> = store
.conn()
.query_row(
"SELECT content_sha256 FROM sources WHERE id = ?1",
params![source_id],
|row| row.get(0),
)
.ok();
if already.as_deref() == Some(content_sha.as_str()) {
return Ok(None);
}
let chunks = extract_chunks(kind, &text);
let ingested_at = now_unix();
let tx = store.conn_mut().transaction()?;
tx.execute(
"DELETE FROM chunks WHERE source_id = ?1",
params![source_id],
)?;
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
ON CONFLICT(id) DO UPDATE SET
uri = excluded.uri,
path = excluded.path,
kind = excluded.kind,
bytes = excluded.bytes,
content_sha256 = excluded.content_sha256,
mtime_unix = excluded.mtime_unix,
ingested_at = excluded.ingested_at",
params![
source_id,
uri,
path,
kind,
bytes.len() as i64,
content_sha,
mtime_unix,
ingested_at,
],
)?;
{
let mut stmt = tx.prepare(
"INSERT INTO chunks
(id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, created_at,
role, session_id, turn_id, tool_name, timestamp_unix)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
)?;
for chunk in &chunks {
let chunk_id = chunk_id_for(&source_id, chunk.ordinal, &chunk.text);
let chunk_sha = sha256_hex(chunk.text.as_bytes());
stmt.execute(params![
chunk_id,
source_id,
chunk.ordinal as i64,
chunk.byte_start as i64,
chunk.byte_end as i64,
chunk.text.chars().count() as i64,
chunk.text,
chunk_sha,
ingested_at,
chunk.role,
chunk.session_id,
chunk.turn_id,
chunk.tool_name,
chunk.timestamp_unix,
])?;
}
}
tx.commit()?;
Ok(Some(IngestedSource {
source_id,
uri: uri.to_string(),
path,
kind: kind.to_string(),
bytes: bytes.len() as u64,
content_sha256: content_sha,
chunks: chunks.len(),
}))
}
pub fn print_follow_pass(iteration: usize, report: &IngestReport) {
let active = !report.ingested.is_empty() || !report.skipped.is_empty();
println!(
"[follow pass={} ingested={} skipped={} ignored={}]",
iteration,
report.ingested.len(),
report.skipped.len(),
report.ignored,
);
if active {
print_text(report);
}
}
pub fn print_text(report: &IngestReport) {
for s in &report.ingested {
println!(
"ingested source={} chunks={} bytes={} kind={} uri={}",
s.source_id, s.chunks, s.bytes, s.kind, s.uri
);
}
for s in &report.skipped {
println!("skipped uri={} reason={}", s.uri, s.reason);
}
if report.ignored > 0 {
println!(
"summary ingested={} skipped={} ignored={}",
report.ingested.len(),
report.skipped.len(),
report.ignored
);
} else {
println!(
"summary ingested={} skipped={}",
report.ingested.len(),
report.skipped.len()
);
}
}
pub fn print_json(report: &IngestReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn persists_jsonl_metadata_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-42","turn_id":"turn-3","tool_name":"search","timestamp":1700000002,"content":"tool output"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-42",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let row = store
.conn()
.query_row(
"SELECT text, role, session_id, turn_id, tool_name, timestamp_unix FROM chunks",
[],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<i64>>(5)?,
))
},
)
.unwrap();
assert_eq!(row.0, "[tool:search] tool output");
assert_eq!(row.1.as_deref(), None);
assert_eq!(row.2.as_deref(), Some("sess-42"));
assert_eq!(row.3.as_deref(), Some("turn-3"));
assert_eq!(row.4.as_deref(), Some("search"));
assert_eq!(row.5, Some(1_700_000_002));
}
#[test]
fn skipped_sources_include_structured_reason_codes_in_json() {
let report = IngestReport {
ingested: Vec::new(),
skipped: vec![SkippedSource {
uri: "file:///tmp/huge.log".into(),
skipped_reason: Some("too_large".into()),
reason: "file is 52428801 bytes; exceeds 52428800-byte limit".into(),
}],
ignored: 0,
};
let json = serde_json::to_value(&report).unwrap();
let skipped = json["skipped"].as_array().unwrap();
assert_eq!(skipped.len(), 1);
assert_eq!(skipped[0]["skipped_reason"], "too_large");
assert_eq!(
skipped[0]["reason"],
"file is 52428801 bytes; exceeds 52428800-byte limit"
);
}
}