use anyhow::{anyhow, Context, Result};
use chrono::Local;
use notify::RecursiveMode;
use notify_debouncer_mini::{new_debouncer, DebouncedEvent, DebouncedEventKind};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::time::{Duration, Instant};
use crate::paths;
use crate::worker::advice::{AdviceWorker, BurstReady, ProjectBurst};
use crate::worker::config::WorkerConfig;
use crate::worker::db::{Db, Event};
use crate::worker::supabase::SupabaseClient;
pub struct DaemonStatus {
pub running: bool,
pub pid: Option<u32>,
pub stale_pid_file: bool,
}
pub fn read_pid() -> Result<Option<u32>> {
let path = paths::worker_pid_file()?;
if !path.exists() {
return Ok(None);
}
let text = fs::read_to_string(&path)?;
let pid: u32 = text.trim().parse().context("invalid PID file content")?;
Ok(Some(pid))
}
pub fn is_alive(pid: u32) -> bool {
#[cfg(unix)]
{
Command::new("kill")
.args(["-0", &pid.to_string()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
#[cfg(windows)]
{
Command::new("tasklist")
.args(["/FI", &format!("PID eq {}", pid)])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
.unwrap_or(false)
}
}
pub fn status() -> Result<DaemonStatus> {
match read_pid()? {
Some(pid) if is_alive(pid) => Ok(DaemonStatus {
running: true,
pid: Some(pid),
stale_pid_file: false,
}),
Some(pid) => Ok(DaemonStatus {
running: false,
pid: Some(pid),
stale_pid_file: true,
}),
None => Ok(DaemonStatus {
running: false,
pid: None,
stale_pid_file: false,
}),
}
}
pub fn spawn_detached() -> Result<u32> {
let dir = paths::worker_dir()?;
fs::create_dir_all(&dir)?;
let log_path = paths::worker_log_file()?;
let log_file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.with_context(|| format!("open log {}", log_path.display()))?;
let log_err = log_file.try_clone()?;
let exe = std::env::current_exe().context("locate current exe")?;
let child = Command::new(exe)
.args(["worker", "__run"])
.stdin(Stdio::null())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err))
.spawn()
.context("spawn detached worker")?;
let pid = child.id();
fs::write(paths::worker_pid_file()?, pid.to_string())?;
Ok(pid)
}
pub fn stop() -> Result<()> {
let path = paths::worker_pid_file()?;
let pid = match read_pid()? {
Some(p) => p,
None => return Err(anyhow!("Worker is not running (no PID file)")),
};
if !is_alive(pid) {
let _ = fs::remove_file(&path);
return Err(anyhow!(
"Stale PID file removed (process {} not running)",
pid
));
}
#[cfg(unix)]
{
Command::new("kill")
.args(["-TERM", &pid.to_string()])
.status()
.context("send SIGTERM")?;
}
#[cfg(windows)]
{
Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.status()
.context("taskkill")?;
}
let _ = fs::remove_file(&path);
Ok(())
}
pub fn run_loop() -> Result<()> {
let cfg = WorkerConfig::load()?;
let db = Db::open(&cfg.db_path)?;
log_line(&format!(
"[start] monitoring {} (debounce {}ms, sync every {}s, advice_enabled={})",
cfg.monitor_dir.display(),
cfg.debounce_ms,
cfg.sync_interval_secs,
cfg.advice_enabled
));
let (advice_tx, advice_rx) = channel::<BurstReady>();
let advice_cfg = cfg.clone();
let advice_handle = std::thread::Builder::new()
.name("devist-advice".into())
.spawn(move || {
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match AdviceWorker::new(
advice_cfg,
) {
Ok(w) => w.run(advice_rx),
Err(e) => eprintln!("[advice-init-err] {}", e),
}));
if result.is_err() {
eprintln!("[advice] thread panicked");
}
})
.context("spawn advice thread")?;
let jobs_cfg = cfg.clone();
let _jobs_handle = std::thread::Builder::new()
.name("devist-jobs".into())
.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Err(e) = crate::worker::jobs::run(jobs_cfg) {
eprintln!("[jobs-worker-err] {}", e);
}
}));
if result.is_err() {
eprintln!("[jobs-worker] thread panicked");
}
})
.context("spawn jobs thread")?;
let verify_cfg = cfg.clone();
let _verify_handle = std::thread::Builder::new()
.name("devist-verify".into())
.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Err(e) = crate::worker::verify::run(verify_cfg) {
eprintln!("[verify-err] {}", e);
}
}));
if result.is_err() {
eprintln!("[verify] thread panicked");
}
})
.context("spawn verify thread")?;
let (tx, rx) = channel();
let mut debouncer =
new_debouncer(Duration::from_millis(cfg.debounce_ms), tx).context("init debouncer")?;
debouncer
.watcher()
.watch(&cfg.monitor_dir, RecursiveMode::Recursive)
.with_context(|| format!("watch {}", cfg.monitor_dir.display()))?;
let mut last_sync = Instant::now();
let mut last_heartbeat = Instant::now() - Duration::from_secs(60);
let sync_interval = Duration::from_secs(cfg.sync_interval_secs);
let idle = Duration::from_secs(cfg.advice_idle_seconds);
let mut bursts: HashMap<String, ProjectBurst> = HashMap::new();
let heartbeat_client = make_heartbeat_client(&cfg);
loop {
if last_heartbeat.elapsed() >= Duration::from_secs(10) {
if let Some(c) = heartbeat_client.as_ref() {
let _ = c.heartbeat("main");
}
last_heartbeat = Instant::now();
}
match rx.recv_timeout(Duration::from_secs(5)) {
Ok(Ok(events)) => {
for ev in events {
if is_ignored(&ev.path) {
continue;
}
if let Some(record) = build_event(&cfg.monitor_dir, &ev) {
let project = record.project.clone();
let path = record.path.clone();
if let Err(e) = db.insert(&record) {
log_line(&format!("[db-err] {}", e));
}
if cfg.advice_enabled {
let entry = bursts.entry(project).or_default();
if let Some(p) = path {
entry.record(p);
}
}
}
}
}
Ok(Err(errs)) => {
log_line(&format!("[watch-err] {:?}", errs));
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
log_line("[exit] watcher channel disconnected");
break;
}
}
if cfg.advice_enabled {
let ready: Vec<String> = bursts
.iter()
.filter(|(_, b)| !b.paths.is_empty() && b.is_idle(idle))
.map(|(k, _)| k.clone())
.collect();
for project in ready {
if let Some(burst) = bursts.get_mut(&project) {
let paths = burst.drain();
let _ = advice_tx.send(BurstReady { project, paths });
}
}
}
if last_sync.elapsed() >= sync_interval {
if let Err(e) = sync_supabase(&db, &cfg) {
log_line(&format!("[sync-err] {}", e));
}
last_sync = Instant::now();
}
}
drop(advice_tx);
let _ = advice_handle.join();
Ok(())
}
fn build_event(monitor_dir: &Path, ev: &DebouncedEvent) -> Option<Event> {
let project = detect_project(monitor_dir, &ev.path)?;
let event_type = match ev.kind {
DebouncedEventKind::Any => "file_changed",
DebouncedEventKind::AnyContinuous => "file_changed_continuous",
_ => "file_changed",
}
.to_string();
let rel = ev
.path
.strip_prefix(monitor_dir)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| ev.path.to_string_lossy().to_string());
Some(Event {
id: None,
project,
event_type,
path: Some(rel),
payload: "{}".to_string(),
severity: "info".to_string(),
created_at: Local::now().to_rfc3339(),
synced_at: None,
acked_at: None,
})
}
fn detect_project(monitor_dir: &Path, file_path: &Path) -> Option<String> {
let rel = file_path.strip_prefix(monitor_dir).ok()?;
let first = rel.components().next()?;
Some(first.as_os_str().to_string_lossy().to_string())
}
fn make_heartbeat_client(cfg: &WorkerConfig) -> Option<SupabaseClient> {
let (url, key) = match (&cfg.supabase_url, &cfg.supabase_key) {
(Some(u), Some(k)) if !u.is_empty() && !k.is_empty() => (u.as_str(), k.as_str()),
_ => return None,
};
let client_id = cfg
.client_id
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or("unknown");
SupabaseClient::new(url, key, client_id).ok()
}
fn sync_supabase(db: &Db, cfg: &WorkerConfig) -> Result<()> {
let pending = db.unsynced(500)?;
if pending.is_empty() {
return Ok(());
}
let (url, key) = match (&cfg.supabase_url, &cfg.supabase_key) {
(Some(u), Some(k)) if !u.is_empty() && !k.is_empty() => (u.as_str(), k.as_str()),
_ => {
log_line(&format!(
"[sync] {} events pending (Supabase not configured — skipping)",
pending.len()
));
return Ok(());
}
};
let client_id = cfg
.client_id
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| anyhow!("client_id not set in config"))?;
let pushable: Vec<Event> = pending
.iter()
.filter(|e| should_push_to_supabase(e))
.cloned()
.collect();
let skipped = pending.len() - pushable.len();
if !pushable.is_empty() {
let client = SupabaseClient::new(url, key, client_id)?;
client.push_events(&pushable)?;
}
let ids: Vec<i64> = pending.iter().filter_map(|e| e.id).collect();
db.mark_synced(&ids, &Local::now().to_rfc3339())?;
log_line(&format!(
"[sync] pushed {} events to Supabase ({} local-only)",
pushable.len(),
skipped
));
Ok(())
}
fn should_push_to_supabase(ev: &Event) -> bool {
!matches!(
ev.event_type.as_str(),
"file_changed" | "file_changed_continuous"
)
}
fn log_line(msg: &str) {
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
println!("{} {}", now, msg);
}
const IGNORED_DIRS: &[&str] = &[
"node_modules",
"target",
"dist",
"build",
".next",
".turbo",
".cache",
".venv",
"venv",
"__pycache__",
".pytest_cache",
".git",
".idea",
".vscode",
".expo",
".dart_tool",
];
const IGNORED_FILES: &[&str] = &[".DS_Store"];
fn is_ignored(path: &Path) -> bool {
if crate::worker::secrets::is_secret_path(path) {
return true;
}
let s = path.to_string_lossy();
for d in IGNORED_DIRS {
if s.contains(&format!("/{}/", d))
|| s.contains(&format!("\\{}\\", d))
|| s.ends_with(&format!("/{}", d))
{
return true;
}
}
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if IGNORED_FILES.contains(&name) {
return true;
}
}
false
}