Skip to main content

innate_core/daemon/
watch.rs

1use super::{events::*, *};
2
3pub fn run_watch_loop(
4    watch_dirs: &[String],
5    db_path: &str,
6    state_db_path: &str,
7    log_path: &str,
8    pid_file: &str,
9) {
10    // Write our own pid.
11    let _ = std::fs::write(pid_file, std::process::id().to_string());
12
13    // Open log file (append).
14    let log_file = std::fs::OpenOptions::new()
15        .create(true)
16        .append(true)
17        .open(log_path);
18
19    let mut logger: Box<dyn std::io::Write + Send> = match log_file {
20        Ok(f) => Box::new(f),
21        Err(_) => Box::new(std::io::stderr()),
22    };
23
24    let _ = writeln!(logger, "[innate-daemon] started pid={}", std::process::id());
25
26    let state_db = match rusqlite::Connection::open(state_db_path) {
27        Ok(c) => c,
28        Err(e) => {
29            let _ = writeln!(logger, "[innate-daemon] cannot open state db: {e}");
30            return;
31        }
32    };
33    if state_db.execute_batch(DAEMON_SCHEMA).is_err() {
34        let _ = writeln!(logger, "[innate-daemon] failed to init schema");
35        return;
36    }
37
38    // Main poll loop: 500 ms tick.
39    let mut last_evolve_poll = std::time::Instant::now();
40    const EVOLVE_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
41    let mut last_backup_poll = std::time::Instant::now();
42    const BACKUP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30 * 60);
43    loop {
44        for dir in watch_dirs {
45            let dir_path = std::path::Path::new(dir);
46            if !dir_path.exists() {
47                continue;
48            }
49            // Find .log files in directory.
50            if let Ok(entries) = std::fs::read_dir(dir_path) {
51                for entry in entries.flatten() {
52                    let p = entry.path();
53                    if p.extension().and_then(|e| e.to_str()) == Some("log") {
54                        process_log_file(&p, &state_db, db_path, &mut *logger);
55                    }
56                }
57            }
58        }
59        // Periodically consume pending evolve_requests so knowledge grows even without session_end.
60        if last_evolve_poll.elapsed() >= EVOLVE_POLL_INTERVAL {
61            if let Err(error) = call_cli_evolve(db_path, "scheduled") {
62                let _ = writeln!(logger, "[innate-daemon] scheduled evolve failed: {error}");
63                record_daemon_error(
64                    &state_db,
65                    "<scheduler>",
66                    "scheduled_evolve",
67                    &error.to_string(),
68                );
69            }
70            last_evolve_poll = std::time::Instant::now();
71        }
72        if last_backup_poll.elapsed() >= BACKUP_POLL_INTERVAL {
73            if let Err(error) = call_cli_backup(db_path) {
74                let _ = writeln!(logger, "[innate-daemon] auto-backup failed: {error}");
75                record_daemon_error(&state_db, "<scheduler>", "auto_backup", &error.to_string());
76            }
77            last_backup_poll = std::time::Instant::now();
78        }
79        std::thread::sleep(std::time::Duration::from_millis(500));
80    }
81}
82
83pub(in crate::daemon) fn process_log_file(
84    path: &Path,
85    state_db: &rusqlite::Connection,
86    db_path: &str,
87    log: &mut dyn std::io::Write,
88) {
89    let path_str = path.to_string_lossy();
90    let meta = match std::fs::metadata(path) {
91        Ok(m) => m,
92        Err(_) => return,
93    };
94
95    // inode detection for rotation.
96    #[cfg(target_os = "linux")]
97    let inode = {
98        use std::os::linux::fs::MetadataExt;
99        meta.st_ino().to_string()
100    };
101    #[cfg(not(target_os = "linux"))]
102    let inode = String::new();
103
104    let (saved_offset, saved_inode): (i64, Option<String>) = state_db.query_row(
105        "SELECT last_processed_offset, last_processed_inode FROM watch_state WHERE watch_path=?",
106        rusqlite::params![path_str.as_ref()],
107        |r| Ok((r.get(0)?, r.get(1)?)),
108    ).unwrap_or((0, None));
109
110    // Reset on file rotation (inode change or file got shorter).
111    let file_size = meta.len() as i64;
112    let start_offset = if saved_inode.as_deref() != Some(&inode) || file_size < saved_offset {
113        0
114    } else {
115        saved_offset
116    };
117
118    if start_offset >= file_size {
119        return;
120    }
121
122    use std::io::{BufRead, Seek};
123    let mut f = match std::fs::File::open(path) {
124        Ok(f) => f,
125        Err(_) => return,
126    };
127    if f.seek(std::io::SeekFrom::Start(start_offset as u64))
128        .is_err()
129    {
130        return;
131    }
132
133    let mut reader = std::io::BufReader::new(&mut f);
134    let mut new_offset = start_offset;
135    let mut line_buf = String::new();
136
137    loop {
138        let line_start_offset = new_offset;
139        line_buf.clear();
140        let bytes_read = match reader.read_line(&mut line_buf) {
141            Ok(n) => n,
142            Err(_) => break,
143        };
144        if bytes_read == 0 {
145            break; // EOF
146        }
147        // Partial line at EOF (no trailing newline): leave offset before this line
148        // so it is re-read once the writer completes it.
149        if !line_buf.ends_with('\n') {
150            new_offset = line_start_offset;
151            break;
152        }
153        new_offset += bytes_read as i64;
154        let line = line_buf.trim_end_matches('\n').trim_end_matches('\r');
155
156        let Some(event) = parse_log_event(line) else {
157            continue;
158        };
159        let event_type = event.kind;
160
161        // Compute event_id for idempotency.
162        // Include inode so that a rotated file at the same path with the same
163        // offset + content is not mistakenly treated as a duplicate event.
164        let event_id = event
165            .event_id
166            .clone()
167            .unwrap_or_else(|| event_id_for_line(path_str.as_ref(), &inode, new_offset, line));
168
169        // Skip if already processed.
170        let already: i64 = state_db
171            .query_row(
172                "SELECT count(*) FROM processed_events WHERE event_id=?",
173                rusqlite::params![event_id],
174                |r| r.get(0),
175            )
176            .unwrap_or(0);
177        if already > 0 {
178            continue;
179        }
180
181        // Handle "start": recall to open a trace and store it.
182        if event_type == "start" {
183            let query = event.query.as_deref().unwrap_or(line);
184            match call_cli_recall(db_path, query) {
185                Ok(tid) => {
186                    let ts = crate::utils::utc_now_iso();
187                    let _ = state_db.execute(
188                        "INSERT OR REPLACE INTO trace_context(watch_path, trace_id, updated_at) VALUES (?,?,?)",
189                        rusqlite::params![path_str.as_ref(), &tid, &ts],
190                    );
191                    let _ = state_db.execute(
192                        "INSERT OR IGNORE INTO processed_events(event_id, watch_path, trace_id, event_type, ts) VALUES (?,?,?,?,?)",
193                        rusqlite::params![event_id, path_str.as_ref(), &tid, event_type, &ts],
194                    );
195                    let _ = writeln!(log, "{ts} [daemon] start trace={tid} query={query:?}");
196                }
197                Err(e) => {
198                    let _ = writeln!(
199                        log,
200                        "{} [daemon] start recall failed: {e}",
201                        crate::utils::utc_now_iso()
202                    );
203                    record_daemon_error(state_db, path_str.as_ref(), "recall", &e.to_string());
204                }
205            }
206            continue;
207        }
208
209        // Look up trace for this watch path (ok/fail/end events).
210        let context_trace_id: Option<String> = state_db
211            .query_row(
212                "SELECT trace_id FROM trace_context WHERE watch_path=?",
213                rusqlite::params![path_str.as_ref()],
214                |r| r.get(0),
215            )
216            .ok();
217        let trace_id = event.trace_id.clone().or(context_trace_id);
218
219        if event_type == "end" {
220            let result = call_cli_evolve(db_path, "manual");
221            let ts = crate::utils::utc_now_iso();
222            match result {
223                Ok(()) => {
224                    let _ = call_cli_evolve(db_path, "scheduled");
225                    let _ = state_db.execute(
226                        "DELETE FROM trace_context WHERE watch_path=?",
227                        rusqlite::params![path_str.as_ref()],
228                    );
229                    let _ = state_db.execute(
230                        "INSERT OR IGNORE INTO processed_events
231                         (event_id, watch_path, trace_id, event_type, ts)
232                         VALUES (?,?,?,?,?)",
233                        rusqlite::params![event_id, path_str.as_ref(), trace_id, event_type, ts],
234                    );
235                    let _ = writeln!(log, "{ts} [daemon] end evolve ok");
236                }
237                Err(e) => {
238                    let _ = writeln!(log, "{ts} [daemon] end evolve failed: {e}");
239                    record_daemon_error(state_db, path_str.as_ref(), "evolve", &e.to_string());
240                }
241            }
242            continue;
243        }
244
245        if matches!(event_type, "ok" | "fail" | "feedback") {
246            let Some(tid) = &trace_id else {
247                record_daemon_error(
248                    state_db,
249                    path_str.as_ref(),
250                    "record",
251                    "event has no trace_id and no active trace context",
252                );
253                continue;
254            };
255            let result = call_cli_record(db_path, tid, &event);
256            let ts = crate::utils::utc_now_iso();
257            match result {
258                Ok(()) => {
259                    let _ = state_db.execute(
260                        "INSERT OR IGNORE INTO processed_events
261                         (event_id, watch_path, trace_id, event_type, ts)
262                         VALUES (?,?,?,?,?)",
263                        rusqlite::params![event_id, path_str.as_ref(), tid, event_type, ts],
264                    );
265                    let _ = writeln!(log, "{ts} [daemon] {event_type} record trace={tid} ok");
266                }
267                Err(e) => {
268                    let _ = writeln!(
269                        log,
270                        "{ts} [daemon] {event_type} record trace={tid} failed: {e}"
271                    );
272                    record_daemon_error(state_db, path_str.as_ref(), "record", &e.to_string());
273                }
274            }
275        }
276    }
277
278    // Update watch_state.
279    let ts = crate::utils::utc_now_iso();
280    let _ = state_db.execute(
281        "INSERT OR REPLACE INTO watch_state(watch_path, last_processed_offset, last_processed_inode, updated_at)
282         VALUES (?,?,?,?)",
283        rusqlite::params![path_str.as_ref(), new_offset, inode, ts],
284    );
285}