use super::{events::*, *};
pub fn run_watch_loop(
watch_dirs: &[String],
db_path: &str,
state_db_path: &str,
log_path: &str,
pid_file: &str,
) {
let _ = std::fs::write(pid_file, std::process::id().to_string());
let log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path);
let mut logger: Box<dyn std::io::Write + Send> = match log_file {
Ok(f) => Box::new(f),
Err(_) => Box::new(std::io::stderr()),
};
let _ = writeln!(logger, "[innate-daemon] started pid={}", std::process::id());
let state_db = match rusqlite::Connection::open(state_db_path) {
Ok(c) => c,
Err(e) => {
let _ = writeln!(logger, "[innate-daemon] cannot open state db: {e}");
return;
}
};
if state_db.execute_batch(DAEMON_SCHEMA).is_err() {
let _ = writeln!(logger, "[innate-daemon] failed to init schema");
return;
}
let mut last_evolve_poll = std::time::Instant::now();
const EVOLVE_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
let mut last_backup_poll = std::time::Instant::now();
const BACKUP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30 * 60);
loop {
for dir in watch_dirs {
let dir_path = std::path::Path::new(dir);
if !dir_path.exists() {
continue;
}
if let Ok(entries) = std::fs::read_dir(dir_path) {
for entry in entries.flatten() {
let p = entry.path();
if p.extension().and_then(|e| e.to_str()) == Some("log") {
process_log_file(&p, &state_db, db_path, &mut *logger);
}
}
}
}
if last_evolve_poll.elapsed() >= EVOLVE_POLL_INTERVAL {
if let Err(error) = call_cli_evolve(db_path, "scheduled") {
let _ = writeln!(logger, "[innate-daemon] scheduled evolve failed: {error}");
record_daemon_error(
&state_db,
"<scheduler>",
"scheduled_evolve",
&error.to_string(),
);
}
last_evolve_poll = std::time::Instant::now();
}
if last_backup_poll.elapsed() >= BACKUP_POLL_INTERVAL {
if let Err(error) = call_cli_backup(db_path) {
let _ = writeln!(logger, "[innate-daemon] auto-backup failed: {error}");
record_daemon_error(&state_db, "<scheduler>", "auto_backup", &error.to_string());
}
last_backup_poll = std::time::Instant::now();
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
pub(in crate::daemon) fn process_log_file(
path: &Path,
state_db: &rusqlite::Connection,
db_path: &str,
log: &mut dyn std::io::Write,
) {
let path_str = path.to_string_lossy();
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(_) => return,
};
#[cfg(target_os = "linux")]
let inode = {
use std::os::linux::fs::MetadataExt;
meta.st_ino().to_string()
};
#[cfg(not(target_os = "linux"))]
let inode = String::new();
let (saved_offset, saved_inode): (i64, Option<String>) = state_db.query_row(
"SELECT last_processed_offset, last_processed_inode FROM watch_state WHERE watch_path=?",
rusqlite::params![path_str.as_ref()],
|r| Ok((r.get(0)?, r.get(1)?)),
).unwrap_or((0, None));
let file_size = meta.len() as i64;
let start_offset = if saved_inode.as_deref() != Some(&inode) || file_size < saved_offset {
0
} else {
saved_offset
};
if start_offset >= file_size {
return;
}
use std::io::{BufRead, Seek};
let mut f = match std::fs::File::open(path) {
Ok(f) => f,
Err(_) => return,
};
if f.seek(std::io::SeekFrom::Start(start_offset as u64))
.is_err()
{
return;
}
let mut reader = std::io::BufReader::new(&mut f);
let mut new_offset = start_offset;
let mut line_buf = String::new();
loop {
let line_start_offset = new_offset;
line_buf.clear();
let bytes_read = match reader.read_line(&mut line_buf) {
Ok(n) => n,
Err(_) => break,
};
if bytes_read == 0 {
break; }
if !line_buf.ends_with('\n') {
new_offset = line_start_offset;
break;
}
new_offset += bytes_read as i64;
let line = line_buf.trim_end_matches('\n').trim_end_matches('\r');
let Some(event) = parse_log_event(line) else {
continue;
};
let event_type = event.kind;
let event_id = event
.event_id
.clone()
.unwrap_or_else(|| event_id_for_line(path_str.as_ref(), &inode, new_offset, line));
let already: i64 = state_db
.query_row(
"SELECT count(*) FROM processed_events WHERE event_id=?",
rusqlite::params![event_id],
|r| r.get(0),
)
.unwrap_or(0);
if already > 0 {
continue;
}
if event_type == "start" {
let query = event.query.as_deref().unwrap_or(line);
match call_cli_recall(db_path, query) {
Ok(tid) => {
let ts = crate::utils::utc_now_iso();
let _ = state_db.execute(
"INSERT OR REPLACE INTO trace_context(watch_path, trace_id, updated_at) VALUES (?,?,?)",
rusqlite::params![path_str.as_ref(), &tid, &ts],
);
let _ = state_db.execute(
"INSERT OR IGNORE INTO processed_events(event_id, watch_path, trace_id, event_type, ts) VALUES (?,?,?,?,?)",
rusqlite::params![event_id, path_str.as_ref(), &tid, event_type, &ts],
);
let _ = writeln!(log, "{ts} [daemon] start trace={tid} query={query:?}");
}
Err(e) => {
let _ = writeln!(
log,
"{} [daemon] start recall failed: {e}",
crate::utils::utc_now_iso()
);
record_daemon_error(state_db, path_str.as_ref(), "recall", &e.to_string());
}
}
continue;
}
let context_trace_id: Option<String> = state_db
.query_row(
"SELECT trace_id FROM trace_context WHERE watch_path=?",
rusqlite::params![path_str.as_ref()],
|r| r.get(0),
)
.ok();
let trace_id = event.trace_id.clone().or(context_trace_id);
if event_type == "end" {
let result = call_cli_evolve(db_path, "manual");
let ts = crate::utils::utc_now_iso();
match result {
Ok(()) => {
let _ = call_cli_evolve(db_path, "scheduled");
let _ = state_db.execute(
"DELETE FROM trace_context WHERE watch_path=?",
rusqlite::params![path_str.as_ref()],
);
let _ = state_db.execute(
"INSERT OR IGNORE INTO processed_events
(event_id, watch_path, trace_id, event_type, ts)
VALUES (?,?,?,?,?)",
rusqlite::params![event_id, path_str.as_ref(), trace_id, event_type, ts],
);
let _ = writeln!(log, "{ts} [daemon] end evolve ok");
}
Err(e) => {
let _ = writeln!(log, "{ts} [daemon] end evolve failed: {e}");
record_daemon_error(state_db, path_str.as_ref(), "evolve", &e.to_string());
}
}
continue;
}
if matches!(event_type, "ok" | "fail" | "feedback") {
let Some(tid) = &trace_id else {
record_daemon_error(
state_db,
path_str.as_ref(),
"record",
"event has no trace_id and no active trace context",
);
continue;
};
let result = call_cli_record(db_path, tid, &event);
let ts = crate::utils::utc_now_iso();
match result {
Ok(()) => {
let _ = state_db.execute(
"INSERT OR IGNORE INTO processed_events
(event_id, watch_path, trace_id, event_type, ts)
VALUES (?,?,?,?,?)",
rusqlite::params![event_id, path_str.as_ref(), tid, event_type, ts],
);
let _ = writeln!(log, "{ts} [daemon] {event_type} record trace={tid} ok");
}
Err(e) => {
let _ = writeln!(
log,
"{ts} [daemon] {event_type} record trace={tid} failed: {e}"
);
record_daemon_error(state_db, path_str.as_ref(), "record", &e.to_string());
}
}
}
}
let ts = crate::utils::utc_now_iso();
let _ = state_db.execute(
"INSERT OR REPLACE INTO watch_state(watch_path, last_processed_offset, last_processed_inode, updated_at)
VALUES (?,?,?,?)",
rusqlite::params![path_str.as_ref(), new_offset, inode, ts],
);
}