use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::Result;
use notify_debouncer_full::new_debouncer;
use notify_debouncer_full::notify::RecursiveMode;
use crate::config::Config;
use crate::confirm::SearchOptions;
use crate::index::{self, Index};
use crate::pagination::{self, PaginationStore};
use crate::paths;
use crate::proto::{self, Request};
use crate::transport::{self, Stream};
const WATCH_HEARTBEAT: Duration = Duration::from_millis(250);
const IDLE_CHECK_MAX: Duration = Duration::from_secs(15);
struct Shared {
index: RwLock<Index>,
ready: AtomicBool,
root: PathBuf,
snapshot: PathBuf,
indexed: AtomicUsize,
total: AtomicUsize,
index_bytes: AtomicU64,
seq: Mutex<u64>,
seq_cv: Condvar,
persist_threshold: Duration,
persist: AtomicBool,
idle_timeout: Option<Duration>,
last_active: Mutex<Instant>,
in_flight: AtomicUsize,
pagination: Mutex<PaginationStore>,
}
struct ActiveRequest<'a>(&'a Shared);
impl<'a> ActiveRequest<'a> {
fn new(shared: &'a Shared) -> Self {
shared.in_flight.fetch_add(1, Ordering::SeqCst);
shared.touch();
ActiveRequest(shared)
}
}
impl Drop for ActiveRequest<'_> {
fn drop(&mut self) {
self.0.touch();
self.0.in_flight.fetch_sub(1, Ordering::SeqCst);
}
}
impl Shared {
fn read_index(&self) -> std::sync::RwLockReadGuard<'_, Index> {
self.index.read().unwrap_or_else(|e| e.into_inner())
}
fn write_index(&self) -> std::sync::RwLockWriteGuard<'_, Index> {
self.index.write().unwrap_or_else(|e| e.into_inner())
}
fn bump(&self) {
self.index_bytes
.store(self.read_index().memory_bytes(), Ordering::Relaxed);
*self.seq.lock().unwrap_or_else(|e| e.into_inner()) += 1;
self.seq_cv.notify_all();
}
fn pagination(&self) -> std::sync::MutexGuard<'_, PaginationStore> {
self.pagination.lock().unwrap_or_else(|e| e.into_inner())
}
fn touch(&self) {
*self.last_active.lock().unwrap_or_else(|e| e.into_inner()) = Instant::now();
}
fn mark_ready(&self) {
self.touch();
self.ready.store(true, Ordering::SeqCst);
}
fn maybe_save(&self, idx: &Index) {
if self.persist.load(Ordering::SeqCst) {
let _ = idx.save(&self.snapshot);
}
}
fn wait_change(&self, last: u64) -> u64 {
let g = self.seq.lock().unwrap_or_else(|e| e.into_inner());
let (g, _) = self
.seq_cv
.wait_timeout_while(g, WATCH_HEARTBEAT, |s| *s == last)
.unwrap_or_else(|e| e.into_inner());
*g
}
}
fn session_seed() -> u64 {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos ^ (nanos >> 32)
}
pub fn run(root: PathBuf) -> Result<()> {
let dir = paths::state_dir(&root);
std::fs::create_dir_all(&dir)?;
let listener = match transport::bind(&root)? {
Some(l) => l,
None => return Ok(()), };
let cfg = Config::get();
let shared = Arc::new(Shared {
index: RwLock::new(Index::default()),
ready: AtomicBool::new(false),
root: root.clone(),
snapshot: paths::snapshot_path(&root),
indexed: AtomicUsize::new(0),
total: AtomicUsize::new(0),
index_bytes: AtomicU64::new(0),
seq: Mutex::new(0),
seq_cv: Condvar::new(),
persist_threshold: cfg.persist_threshold(),
persist: AtomicBool::new(true),
idle_timeout: cfg.idle_timeout(),
last_active: Mutex::new(Instant::now()),
in_flight: AtomicUsize::new(0),
pagination: Mutex::new(PaginationStore::new(
session_seed(),
pagination::DEFAULT_TTL,
)),
});
spawn_indexer(shared.clone());
spawn_watcher(shared.clone());
spawn_idle_reaper(shared.clone());
loop {
let conn = match transport::accept(&listener) {
Ok(conn) => conn,
Err(_) => continue,
};
let shared = shared.clone();
std::thread::spawn(move || {
let _active = ActiveRequest::new(&shared);
let _ = handle(conn, &shared);
});
}
}
fn spawn_indexer(shared: Arc<Shared>) {
std::thread::spawn(move || {
if let Ok(idx) = Index::load(&shared.snapshot) {
*shared.write_index() = idx;
shared.mark_ready();
shared.bump();
let mut idx = shared.write_index();
idx.reconcile(&shared.root);
shared.maybe_save(&idx);
drop(idx);
shared.bump();
} else {
let started = Instant::now();
let paths = index::walk_files(&shared.root);
shared.total.store(paths.len(), Ordering::Relaxed);
shared.bump();
let built = Index::from_paths(&paths, &shared.indexed);
shared.persist.store(
started.elapsed() >= shared.persist_threshold,
Ordering::SeqCst,
);
shared.maybe_save(&built);
*shared.write_index() = built;
shared.mark_ready();
shared.bump();
}
});
}
fn spawn_watcher(shared: Arc<Shared>) {
std::thread::spawn(move || {
let (tx, rx) = std::sync::mpsc::channel();
let mut debouncer = match new_debouncer(Duration::from_millis(300), None, move |res| {
let _ = tx.send(res);
}) {
Ok(d) => d,
Err(_) => return,
};
if debouncer
.watch(&shared.root, RecursiveMode::Recursive)
.is_err()
{
return;
}
for res in rx {
if res.is_err() || !shared.ready.load(Ordering::SeqCst) {
continue;
}
let mut idx = shared.write_index();
if idx.reconcile(&shared.root) > 0 {
shared.maybe_save(&idx);
drop(idx);
shared.bump();
}
}
});
}
fn spawn_idle_reaper(shared: Arc<Shared>) {
let Some(timeout) = shared.idle_timeout else {
return;
};
let tick = timeout.min(IDLE_CHECK_MAX);
std::thread::spawn(move || {
loop {
std::thread::sleep(tick);
if !shared.ready.load(Ordering::SeqCst) || shared.in_flight.load(Ordering::SeqCst) > 0 {
continue;
}
let idle = shared
.last_active
.lock()
.unwrap_or_else(|e| e.into_inner())
.elapsed();
if idle >= timeout {
transport::cleanup(&shared.root);
std::process::exit(0);
}
}
});
}
fn handle(mut conn: Stream, shared: &Shared) -> Result<()> {
let req = proto::read_request(&mut conn)?;
match req {
Request::Search { opts, pattern } => {
let _ = content_search(shared, &pattern, opts, &mut conn);
}
Request::Find {
needle,
after,
limit,
} => {
let out = find(shared, &needle, after.as_deref(), limit as usize);
let _ = proto::write_data(&mut conn, &out);
}
Request::Status => {
let _ = proto::write_data(&mut conn, &status(shared));
}
Request::Watch => return watch(shared, &mut conn),
Request::Shutdown => {
let _ = proto::write_data(&mut conn, b"ok\n");
let _ = proto::end_stream(&mut conn);
transport::cleanup(&shared.root);
std::process::exit(0);
}
Request::CursorStore { blob } => {
let token = shared.pagination().store(blob, Instant::now());
let _ = proto::write_data(&mut conn, token.as_bytes());
}
Request::CursorTake { token } => {
let blob = shared.pagination().take(&token, Instant::now());
let _ = proto::write_data(&mut conn, &blob.unwrap_or_default());
}
}
let _ = proto::end_stream(&mut conn);
Ok(())
}
fn watch(shared: &Shared, conn: &mut Stream) -> Result<()> {
let mut last = 0;
loop {
if proto::write_data(conn, &status(shared)).is_err() {
return Ok(()); }
last = shared.wait_change(last);
}
}
fn content_search(
shared: &Shared,
pattern: &str,
opts: SearchOptions,
conn: &mut Stream,
) -> Result<()> {
if shared.ready.load(Ordering::SeqCst) {
let paths = crate::candidate_paths(&shared.read_index(), pattern, opts);
let effective = crate::effective_pattern(pattern, opts);
let refs: Vec<&Path> = paths.iter().map(PathBuf::as_path).collect();
crate::confirm::search_streaming(&effective, &refs, &shared.root, opts, |chunk| {
proto::write_data(&mut *conn, chunk)
})
} else {
let conn = std::sync::Mutex::new(conn);
crate::stream_full_scan(&shared.root, pattern, opts, |chunk| {
if let Ok(mut c) = conn.lock() {
let _ = proto::write_data(&mut **c, chunk);
}
})
}
}
fn find(shared: &Shared, needle: &str, after: Option<&str>, limit: usize) -> Vec<u8> {
let (lines, total, start): (Vec<String>, usize, usize) = if shared.ready.load(Ordering::SeqCst)
{
let idx = shared.read_index();
let (hits, total, start) = idx.find(needle, after, limit);
let lines = hits
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
(lines, total, start)
} else {
let mut all: Vec<String> = index::walk_files(&shared.root)
.iter()
.filter(|p| p.to_string_lossy().contains(needle))
.map(|p| p.to_string_lossy().into_owned())
.collect();
all.sort_unstable();
let total = all.len();
let start = after.map_or(0, |a| all.partition_point(|p| p.as_str() <= a));
let lines = all.into_iter().skip(start).take(limit).collect();
(lines, total, start)
};
let next_after = (start + lines.len() < total)
.then(|| lines.last().map(String::as_str))
.flatten();
let mut out = proto::format_find_header(total, start, lines.len(), next_after);
for l in &lines {
out.push_str(l);
out.push('\n');
}
out.into_bytes()
}
fn status(shared: &Shared) -> Vec<u8> {
let idx = shared.read_index();
let state = if shared.ready.load(Ordering::SeqCst) {
"ready".to_string()
} else {
let done = shared.indexed.load(Ordering::Relaxed) as u64;
let total = shared.total.load(Ordering::Relaxed) as u64;
if total > 0 {
format!(
"building {} / {} files",
crate::status::human_count(done),
crate::status::human_count(total)
)
} else {
"building (scanning tree...)".to_string()
}
};
let ram_only = shared.ready.load(Ordering::SeqCst) && !shared.persist.load(Ordering::SeqCst);
crate::status::Status {
root: &shared.root,
snapshot: &shared.snapshot,
running: true,
ram_only,
state: Some(state),
files: Some(idx.file_count()),
trigrams: Some(idx.trigram_count()),
memory_bytes: Some(shared.index_bytes.load(Ordering::Relaxed)),
}
.render()
.into_bytes()
}