use crate::builder::Builder;
use crate::daemon_sock::{DaemonServer, DaemonStatus, FileChange, FileOp, ServerMessage};
use crate::format::{self, Beacon};
use crate::idle::IdleTracker;
use crate::watcher::Watcher;
use llmosafe::{EscalationPolicy, PressureLevel, ResourceGuard, SafetyDecision};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
const ENTROPY_CRITICAL: u16 = 1000;
const PRE_BUILD_WAIT_SECS: u64 = 5;
const WARN_COOLDOWN_MS: u64 = 300;
struct DaemonCtx<'a> {
builder: &'a mut Builder,
ix_dir: &'a Path,
beacon: &'a mut Beacon,
idle: &'a mut IdleTracker,
guard: &'a ResourceGuard,
daemon_sock: Option<&'a DaemonServer>,
running: &'a Arc<AtomicBool>,
log_prefix: &'a str,
}
#[allow(clippy::too_many_lines)]
pub fn run(root: &Path) -> crate::error::Result<()> {
run_many(&[root.to_path_buf()])
}
pub fn run_many(roots: &[PathBuf]) -> crate::error::Result<()> {
if roots.is_empty() {
return Err(crate::error::Error::Config(
"at least one root directory is required".into(),
));
}
let mut seen = std::collections::HashSet::new();
let unique: Vec<PathBuf> = roots
.iter()
.filter_map(|r| {
let canonical = r.canonicalize().unwrap_or_else(|_| r.clone());
if seen.insert(canonical.clone()) {
Some(canonical)
} else {
None
}
})
.collect();
if unique.is_empty() {
return Err(crate::error::Error::Config(
"no valid root directories provided".into(),
));
}
SHUTDOWN.store(false, Ordering::SeqCst);
install_signal_handlers();
let guard = ResourceGuard::auto(0.6);
let instance_id = format::instance_id_now();
let mut handles = Vec::new();
for root in &unique {
let root = root.clone();
let name = root_name(&root);
let name2 = name.clone();
let guard = guard.clone();
let handle = std::thread::Builder::new()
.name(format!("ixd-{name}"))
.spawn(move || match run_single_root(&root, &guard, instance_id) {
Ok(()) => {}
Err(e) => {
eprintln!("ixd [{name}]: fatal start-up error: {e}");
}
})
.map_err(|e| {
crate::error::Error::Config(format!(
"cannot spawn watcher thread for {name2}: {e}",
))
})?;
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}
Ok(())
}
fn run_single_root(root: &Path, guard: &ResourceGuard, instance_id: u64) -> crate::error::Result<()> {
let root = root.canonicalize().map_err(crate::error::Error::Io)?;
let name = root_name(&root);
println!("ixd [{name}]: watching ...");
let running = Arc::new(AtomicBool::new(true));
let ix_dir_early = root.join(".ix");
let beacon_path = ix_dir_early.join("beacon.json");
check_concurrent_instance(&ix_dir_early, &beacon_path, &root, instance_id)?;
let mut builder = match Builder::new(&root) {
Ok(b) => b.with_resource_guard(guard.clone()),
Err(e) => {
eprintln!("ixd [{name}]: cannot create index: {e}");
return Err(e);
}
};
wait_for_memory(guard, &name);
if let Err(e) = builder.build() {
eprintln!("ixd [{name}]: initial build failed: {e} — will watch for changes anyway");
} else {
println!(
"ixd [{name}]: initial build complete ({} files, {} trigrams)",
builder.files_len(),
builder.trigrams_len()
);
}
let mut watcher = Watcher::new(&root);
let rx = watcher.start()?;
let ix_dir = root.join(".ix");
if !ix_dir.exists() {
fs::create_dir_all(&ix_dir)?;
}
let mut beacon = Beacon::with_instance_id(&root, instance_id);
beacon.write_to(&ix_dir)?;
let mut idle = IdleTracker::new();
let mut daemon_sock = match DaemonServer::new(&root) {
Ok(s) => {
println!("ixd [{name}]: socket at {}", s.path().display());
beacon.socket_path = Some(s.path().to_path_buf());
let _ = beacon.write_to(&ix_dir);
Some(s)
}
Err(e) => {
eprintln!("ixd [{name}]: warning: could not create daemon socket: {e}");
None
}
};
if let Some(ref mut s) = daemon_sock
&& let Err(e) = s.start()
{
eprintln!("ixd [{name}]: failed to start socket server: {e}");
}
run_main_loop(
&mut builder,
&rx,
&ix_dir,
&mut beacon,
&mut idle,
guard,
&running,
daemon_sock.as_ref(),
&name,
);
eprintln!("ixd [{name}]: shutting down...");
watcher.stop();
let _ = fs::remove_file(ix_dir.join("beacon.json"));
Ok(())
}
fn check_concurrent_instance(
ix_dir: &Path,
beacon_path: &Path,
root: &Path,
instance_id: u64,
) -> crate::error::Result<()> {
if !beacon_path.exists() {
return Ok(());
}
let Ok(existing) = Beacon::read_from(ix_dir) else {
return Ok(());
};
let pid = nix::unistd::Pid::from_raw(existing.pid);
if nix::sys::signal::kill(pid, None).is_ok() {
let our_pid =
i32::try_from(std::process::id()).unwrap_or(-1);
if existing.pid == our_pid && existing.instance_id == instance_id {
return Err(crate::error::Error::Config(format!(
"another thread in this process is already watching {}. \
Remove duplicate `{}/beacon.json` to force.",
root.display(),
ix_dir.display()
)));
}
if existing.pid == our_pid && existing.instance_id != instance_id {
eprintln!(
"ixd: removing stale beacon from PID {} (instance {} → {})",
existing.pid, existing.instance_id, instance_id
);
let _ = std::fs::remove_file(beacon_path);
return Ok(());
}
return Err(crate::error::Error::Config(format!(
"another instance is already watching {} (PID {}). \
Stop it first or remove `{}/beacon.json`.",
root.display(),
existing.pid,
ix_dir.display()
)));
}
eprintln!("ixd: removing stale beacon from PID {}", existing.pid);
let _ = std::fs::remove_file(beacon_path);
Ok(())
}
fn install_signal_handlers() {
use nix::sys::signal::{SaFlags, SigAction, SigHandler, SigSet, Signal, sigaction};
let action = SigAction::new(
SigHandler::Handler(handle_signal),
SaFlags::empty(),
SigSet::empty(),
);
unsafe {
sigaction(Signal::SIGTERM, &action).expect("failed to install SIGTERM handler");
sigaction(Signal::SIGINT, &action).expect("failed to install SIGINT handler");
}
}
fn wait_for_memory(guard: &ResourceGuard, log_prefix: &str) {
let pre_build_timeout = Duration::from_secs(30);
let pre_build_start = std::time::Instant::now();
while pre_build_start.elapsed() < pre_build_timeout {
match guard.check_blocking() {
Ok(_) => break,
Err(e) => {
eprintln!("ixd [{log_prefix}]: memory pressure before initial build: {e:?} — waiting...");
std::thread::sleep(Duration::from_secs(PRE_BUILD_WAIT_SECS));
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn run_main_loop(
builder: &mut Builder,
rx: &crossbeam_channel::Receiver<Vec<PathBuf>>,
ix_dir: &Path,
beacon: &mut Beacon,
idle: &mut IdleTracker,
guard: &ResourceGuard,
running: &Arc<AtomicBool>,
daemon_sock: Option<&DaemonServer>,
log_prefix: &str,
) {
loop {
if SHUTDOWN.load(Ordering::SeqCst) {
running.store(false, Ordering::SeqCst);
if let Some(sock) = daemon_sock {
sock.shutdown_notify("signal", 1000);
}
break;
}
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(changed_files) => {
let mut ctx = DaemonCtx {
builder,
ix_dir,
beacon,
idle,
guard,
daemon_sock,
running,
log_prefix,
};
handle_changes(&mut ctx, &changed_files);
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
}
}
}
fn handle_changes(ctx: &mut DaemonCtx, changed_files: &[PathBuf]) {
let (entropy, safety_decision) = evaluate_safety(ctx.guard, ctx.log_prefix);
match &safety_decision {
SafetyDecision::Halt(err, cooldown) => {
let prefix = ctx.log_prefix;
eprintln!(
"ixd [{prefix}]: critical safety decision (Halt: {err:?}) — pausing operations",
);
ctx.beacon.status = "safety halt".to_string();
let _ = ctx.beacon.write_to(ctx.ix_dir);
std::thread::sleep(Duration::from_millis(u64::from(*cooldown)));
return;
}
SafetyDecision::Exit(err) => {
let prefix = ctx.log_prefix;
eprintln!(
"ixd [{prefix}]: SAFETY EXIT (unrecoverable: {err:?}) — terminating",
);
ctx.beacon.status = "safety exit".to_string();
ctx.running.store(false, Ordering::SeqCst);
return;
}
SafetyDecision::Escalate {
entropy: esc_entropy,
reason,
cooldown_ms,
} => {
let prefix = ctx.log_prefix;
eprintln!(
"ixd [{prefix}]: safety escalation (entropy: {esc_entropy}, reason: {reason:?}) — throttling",
);
let deferred_status = DaemonStatus::Deferred {
entropy: *esc_entropy,
};
ctx.beacon.status = deferred_status.to_string();
let _ = ctx.beacon.write_to(ctx.ix_dir);
if let Some(sock) = ctx.daemon_sock {
sock.set_status(&deferred_status, ctx.builder.files_len());
}
std::thread::sleep(Duration::from_millis(u64::from(*cooldown_ms)));
return;
}
SafetyDecision::Warn(reason) => {
if safety_decision.severity() >= 2 {
let prefix = ctx.log_prefix;
eprintln!(
"ixd [{prefix}]: safety warning (severity {}): {reason}",
safety_decision.severity()
);
ctx.beacon.status = format!("warned: {reason}");
let _ = ctx.beacon.write_to(ctx.ix_dir);
std::thread::sleep(Duration::from_millis(WARN_COOLDOWN_MS));
}
}
SafetyDecision::Proceed => {}
}
let prefix = ctx.log_prefix;
println!(
"ixd [{prefix}]: {} files changed, updating index... (Entropy: {entropy}, Decision: {safety_decision:?})",
changed_files.len(),
);
let daemon_status = DaemonStatus::Indexing { entropy };
ctx.beacon.status = daemon_status.to_string();
ctx.beacon.last_event_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let _ = ctx.beacon.write_to(ctx.ix_dir);
broadcast_status(
ctx,
&beacon_status_msg(&daemon_status, ctx.builder.files_len()),
);
ctx.idle.record_change();
if let Err(e) = ctx.builder.update(changed_files) {
let prefix = ctx.log_prefix;
eprintln!("ixd [{prefix}]: update failed: {e} — retrying on next change");
} else {
let prefix = ctx.log_prefix;
tracing::debug!("ixd [{prefix}]: index updated - caches will self-invalidate on next query");
}
broadcast_file_changes(ctx, changed_files);
let idle_status = DaemonStatus::Idle;
ctx.beacon.status = idle_status.to_string();
let _ = ctx.beacon.write_to(ctx.ix_dir);
if let Some(sock) = ctx.daemon_sock {
sock.set_status(&idle_status, ctx.builder.files_len());
}
}
fn evaluate_safety(guard: &ResourceGuard, log_prefix: &str) -> (u16, SafetyDecision) {
match guard.check_blocking() {
Ok(synapse) => {
let raw_entropy = synapse.raw_entropy();
let surprise = synapse.raw_surprise();
let has_bias = synapse.has_bias();
let pressure = PressureLevel::from(guard.pressure());
let policy = EscalationPolicy::default();
let decision = policy.decide_with_pressure(raw_entropy, surprise, has_bias, pressure);
(raw_entropy, decision)
}
Err(e) => {
eprintln!(
"ixd [{log_prefix}]: resource check error: {e:?} — proceeding with elevated caution",
);
(
ENTROPY_CRITICAL,
SafetyDecision::Escalate {
entropy: ENTROPY_CRITICAL,
reason: llmosafe::llmosafe_integration::EscalationReason::ResourcePressure,
cooldown_ms: u32::from(ENTROPY_CRITICAL),
},
)
}
}
}
fn beacon_status_msg(status: &DaemonStatus, files: usize) -> ServerMessage {
ServerMessage::Status {
pid: std::process::id(),
status: status.to_string(),
files,
daemon_status: Some(status.clone()),
}
}
fn broadcast_status(ctx: &DaemonCtx, msg: &ServerMessage) {
if let Some(sock) = ctx.daemon_sock {
sock.broadcast(msg);
}
}
fn broadcast_file_changes(ctx: &DaemonCtx, changed_files: &[PathBuf]) {
let Some(sock) = ctx.daemon_sock else { return };
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let changes: Vec<FileChange> = changed_files
.iter()
.map(|p| FileChange {
path: p.clone(),
mtime: now,
op: FileOp::Modify,
})
.collect();
sock.notify_changes(changes, ctx.builder.files_len());
}
fn root_name(root: &Path) -> String {
root.file_name()
.and_then(|n| n.to_str())
.map_or_else(|| root.display().to_string(), String::from)
}
static SHUTDOWN: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_signal(_: libc::c_int) {
SHUTDOWN.store(true, Ordering::SeqCst);
}