1use super::{events::*, *};
2
3pub fn run_watch_loop(
4 watch_dirs: &[String],
5 db_path: &str,
6 state_db_path: &str,
7 log_path: &str,
8 pid_file: &str,
9) {
10 let _ = std::fs::write(pid_file, std::process::id().to_string());
12
13 let log_file = std::fs::OpenOptions::new()
15 .create(true)
16 .append(true)
17 .open(log_path);
18
19 let mut logger: Box<dyn std::io::Write + Send> = match log_file {
20 Ok(f) => Box::new(f),
21 Err(_) => Box::new(std::io::stderr()),
22 };
23
24 let _ = writeln!(logger, "[innate-daemon] started pid={}", std::process::id());
25
26 let state_db = match rusqlite::Connection::open(state_db_path) {
27 Ok(c) => c,
28 Err(e) => {
29 let _ = writeln!(logger, "[innate-daemon] cannot open state db: {e}");
30 return;
31 }
32 };
33 if state_db.execute_batch(DAEMON_SCHEMA).is_err() {
34 let _ = writeln!(logger, "[innate-daemon] failed to init schema");
35 return;
36 }
37
38 let mut last_evolve_poll = std::time::Instant::now();
40 const EVOLVE_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
41 let mut last_backup_poll = std::time::Instant::now();
42 const BACKUP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30 * 60);
43 loop {
44 for dir in watch_dirs {
45 let dir_path = std::path::Path::new(dir);
46 if !dir_path.exists() {
47 continue;
48 }
49 if let Ok(entries) = std::fs::read_dir(dir_path) {
51 for entry in entries.flatten() {
52 let p = entry.path();
53 if p.extension().and_then(|e| e.to_str()) == Some("log") {
54 process_log_file(&p, &state_db, db_path, &mut *logger);
55 }
56 }
57 }
58 }
59 if last_evolve_poll.elapsed() >= EVOLVE_POLL_INTERVAL {
61 if let Err(error) = call_cli_evolve(db_path, "scheduled") {
62 let _ = writeln!(logger, "[innate-daemon] scheduled evolve failed: {error}");
63 record_daemon_error(
64 &state_db,
65 "<scheduler>",
66 "scheduled_evolve",
67 &error.to_string(),
68 );
69 }
70 last_evolve_poll = std::time::Instant::now();
71 }
72 if last_backup_poll.elapsed() >= BACKUP_POLL_INTERVAL {
73 if let Err(error) = call_cli_backup(db_path) {
74 let _ = writeln!(logger, "[innate-daemon] auto-backup failed: {error}");
75 record_daemon_error(&state_db, "<scheduler>", "auto_backup", &error.to_string());
76 }
77 last_backup_poll = std::time::Instant::now();
78 }
79 std::thread::sleep(std::time::Duration::from_millis(500));
80 }
81}
82
83pub(in crate::daemon) fn process_log_file(
84 path: &Path,
85 state_db: &rusqlite::Connection,
86 db_path: &str,
87 log: &mut dyn std::io::Write,
88) {
89 let path_str = path.to_string_lossy();
90 let meta = match std::fs::metadata(path) {
91 Ok(m) => m,
92 Err(_) => return,
93 };
94
95 #[cfg(target_os = "linux")]
97 let inode = {
98 use std::os::linux::fs::MetadataExt;
99 meta.st_ino().to_string()
100 };
101 #[cfg(not(target_os = "linux"))]
102 let inode = String::new();
103
104 let (saved_offset, saved_inode): (i64, Option<String>) = state_db.query_row(
105 "SELECT last_processed_offset, last_processed_inode FROM watch_state WHERE watch_path=?",
106 rusqlite::params![path_str.as_ref()],
107 |r| Ok((r.get(0)?, r.get(1)?)),
108 ).unwrap_or((0, None));
109
110 let file_size = meta.len() as i64;
112 let start_offset = if saved_inode.as_deref() != Some(&inode) || file_size < saved_offset {
113 0
114 } else {
115 saved_offset
116 };
117
118 if start_offset >= file_size {
119 return;
120 }
121
122 use std::io::{BufRead, Seek};
123 let mut f = match std::fs::File::open(path) {
124 Ok(f) => f,
125 Err(_) => return,
126 };
127 if f.seek(std::io::SeekFrom::Start(start_offset as u64))
128 .is_err()
129 {
130 return;
131 }
132
133 let mut reader = std::io::BufReader::new(&mut f);
134 let mut new_offset = start_offset;
135 let mut line_buf = String::new();
136
137 loop {
138 let line_start_offset = new_offset;
139 line_buf.clear();
140 let bytes_read = match reader.read_line(&mut line_buf) {
141 Ok(n) => n,
142 Err(_) => break,
143 };
144 if bytes_read == 0 {
145 break; }
147 if !line_buf.ends_with('\n') {
150 new_offset = line_start_offset;
151 break;
152 }
153 new_offset += bytes_read as i64;
154 let line = line_buf.trim_end_matches('\n').trim_end_matches('\r');
155
156 let Some(event) = parse_log_event(line) else {
157 continue;
158 };
159 let event_type = event.kind;
160
161 let event_id = event
165 .event_id
166 .clone()
167 .unwrap_or_else(|| event_id_for_line(path_str.as_ref(), &inode, new_offset, line));
168
169 let already: i64 = state_db
171 .query_row(
172 "SELECT count(*) FROM processed_events WHERE event_id=?",
173 rusqlite::params![event_id],
174 |r| r.get(0),
175 )
176 .unwrap_or(0);
177 if already > 0 {
178 continue;
179 }
180
181 if event_type == "start" {
183 let query = event.query.as_deref().unwrap_or(line);
184 match call_cli_recall(db_path, query) {
185 Ok(tid) => {
186 let ts = crate::utils::utc_now_iso();
187 let _ = state_db.execute(
188 "INSERT OR REPLACE INTO trace_context(watch_path, trace_id, updated_at) VALUES (?,?,?)",
189 rusqlite::params![path_str.as_ref(), &tid, &ts],
190 );
191 let _ = state_db.execute(
192 "INSERT OR IGNORE INTO processed_events(event_id, watch_path, trace_id, event_type, ts) VALUES (?,?,?,?,?)",
193 rusqlite::params![event_id, path_str.as_ref(), &tid, event_type, &ts],
194 );
195 let _ = writeln!(log, "{ts} [daemon] start trace={tid} query={query:?}");
196 }
197 Err(e) => {
198 let _ = writeln!(
199 log,
200 "{} [daemon] start recall failed: {e}",
201 crate::utils::utc_now_iso()
202 );
203 record_daemon_error(state_db, path_str.as_ref(), "recall", &e.to_string());
204 }
205 }
206 continue;
207 }
208
209 let context_trace_id: Option<String> = state_db
211 .query_row(
212 "SELECT trace_id FROM trace_context WHERE watch_path=?",
213 rusqlite::params![path_str.as_ref()],
214 |r| r.get(0),
215 )
216 .ok();
217 let trace_id = event.trace_id.clone().or(context_trace_id);
218
219 if event_type == "end" {
220 let result = call_cli_evolve(db_path, "manual");
221 let ts = crate::utils::utc_now_iso();
222 match result {
223 Ok(()) => {
224 let _ = call_cli_evolve(db_path, "scheduled");
225 let _ = state_db.execute(
226 "DELETE FROM trace_context WHERE watch_path=?",
227 rusqlite::params![path_str.as_ref()],
228 );
229 let _ = state_db.execute(
230 "INSERT OR IGNORE INTO processed_events
231 (event_id, watch_path, trace_id, event_type, ts)
232 VALUES (?,?,?,?,?)",
233 rusqlite::params![event_id, path_str.as_ref(), trace_id, event_type, ts],
234 );
235 let _ = writeln!(log, "{ts} [daemon] end evolve ok");
236 }
237 Err(e) => {
238 let _ = writeln!(log, "{ts} [daemon] end evolve failed: {e}");
239 record_daemon_error(state_db, path_str.as_ref(), "evolve", &e.to_string());
240 }
241 }
242 continue;
243 }
244
245 if matches!(event_type, "ok" | "fail" | "feedback") {
246 let Some(tid) = &trace_id else {
247 record_daemon_error(
248 state_db,
249 path_str.as_ref(),
250 "record",
251 "event has no trace_id and no active trace context",
252 );
253 continue;
254 };
255 let result = call_cli_record(db_path, tid, &event);
256 let ts = crate::utils::utc_now_iso();
257 match result {
258 Ok(()) => {
259 let _ = state_db.execute(
260 "INSERT OR IGNORE INTO processed_events
261 (event_id, watch_path, trace_id, event_type, ts)
262 VALUES (?,?,?,?,?)",
263 rusqlite::params![event_id, path_str.as_ref(), tid, event_type, ts],
264 );
265 let _ = writeln!(log, "{ts} [daemon] {event_type} record trace={tid} ok");
266 }
267 Err(e) => {
268 let _ = writeln!(
269 log,
270 "{ts} [daemon] {event_type} record trace={tid} failed: {e}"
271 );
272 record_daemon_error(state_db, path_str.as_ref(), "record", &e.to_string());
273 }
274 }
275 }
276 }
277
278 let ts = crate::utils::utc_now_iso();
280 let _ = state_db.execute(
281 "INSERT OR REPLACE INTO watch_state(watch_path, last_processed_offset, last_processed_inode, updated_at)
282 VALUES (?,?,?,?)",
283 rusqlite::params![path_str.as_ref(), new_offset, inode, ts],
284 );
285}