Skip to main content

ao_core/
activity_log.rs

1//! Activity JSONL log (TS `activity-log.ts`-inspired).
2//!
3//! Used as an optional fallback for agents that don't have native session logs.
4//! Format: one JSON object per line at `{workspace}/.ao/activity.jsonl`.
5
6use crate::types::ActivityState;
7use serde::{Deserialize, Serialize};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11pub const ACTIVITY_INPUT_STALENESS_SECS: u64 = 5 * 60;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct ActivityLogEntry {
15    pub ts: String,
16    pub state: ActivityState,
17    pub source: String,
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub trigger: Option<String>,
20}
21
22pub fn activity_log_path(workspace_path: &Path) -> PathBuf {
23    workspace_path.join(".ao").join("activity.jsonl")
24}
25
26pub fn append_activity_entry(
27    workspace_path: &Path,
28    entry: &ActivityLogEntry,
29) -> std::io::Result<()> {
30    let p = activity_log_path(workspace_path);
31    if let Some(parent) = p.parent() {
32        std::fs::create_dir_all(parent)?;
33    }
34    let line = serde_json::to_string(entry).unwrap_or_default();
35    std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(p)?
39        .write_all(format!("{line}\n").as_bytes())?;
40    Ok(())
41}
42
43pub fn read_last_activity_entry(
44    workspace_path: &Path,
45) -> std::io::Result<Option<(ActivityLogEntry, std::time::SystemTime)>> {
46    use std::io::{BufRead, Seek};
47
48    let p = activity_log_path(workspace_path);
49    let meta = match std::fs::metadata(&p) {
50        Ok(m) => m,
51        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
52        Err(e) => return Err(e),
53    };
54    if meta.len() == 0 {
55        return Ok(None);
56    }
57    let modified = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
58
59    // Tail-read last 4KB.
60    let tail_size: u64 = 4096;
61    let offset = meta.len().saturating_sub(tail_size);
62    let mut f = std::fs::File::open(&p)?;
63    if offset > 0 {
64        f.seek(std::io::SeekFrom::Start(offset))?;
65    }
66    let mut r = std::io::BufReader::new(f);
67    if offset > 0 {
68        // Drop partial line.
69        let mut _discard = String::new();
70        let _ = r.read_line(&mut _discard);
71    }
72
73    let mut last_ok: Option<ActivityLogEntry> = None;
74    for line in r.lines().map_while(std::result::Result::ok) {
75        if line.trim().is_empty() {
76            continue;
77        }
78        if let Ok(e) = serde_json::from_str::<ActivityLogEntry>(&line) {
79            last_ok = Some(e);
80        }
81    }
82    Ok(last_ok.map(|e| (e, modified)))
83}
84
85/// Returns actionable states only (`waiting_input` / `blocked`) with staleness cap.
86pub fn check_actionable_state(
87    entry: Option<&ActivityLogEntry>,
88    now: std::time::SystemTime,
89) -> Option<ActivityState> {
90    let e = entry?;
91    if !matches!(
92        e.state,
93        ActivityState::WaitingInput | ActivityState::Blocked
94    ) {
95        return None;
96    }
97    let ts = chrono_like_parse(&e.ts)?;
98    let age = now.duration_since(ts).ok()?.as_secs();
99    (age <= ACTIVITY_INPUT_STALENESS_SECS).then_some(e.state)
100}
101
102/// Best-effort activity probe from `{workspace}/.ao/activity.jsonl`.
103///
104/// Surfaces only the states a stale log can still describe honestly:
105/// - `Exited` is terminal; staleness does not downgrade it.
106/// - `WaitingInput` / `Blocked` surface when within the staleness cap.
107///
108/// Everything else (including `Active` / `Ready` / `Idle` entries, stale
109/// actionable entries, and a missing or empty log) returns `None` so the
110/// caller can fall through to its own default — matching the
111/// `Agent::detect_activity` "no detection available" contract.
112pub fn detect_activity_from_log(workspace_path: &Path) -> Option<ActivityState> {
113    let (entry, _modified) = read_last_activity_entry(workspace_path).ok().flatten()?;
114    if entry.state == ActivityState::Exited {
115        return Some(ActivityState::Exited);
116    }
117    check_actionable_state(Some(&entry), std::time::SystemTime::now())
118}
119
120fn chrono_like_parse(s: &str) -> Option<std::time::SystemTime> {
121    // Accept numeric unix-ms strings (ao-rs native writers) and RFC3339 strings
122    // (ao-ts writers using `Date.toISOString()`, and any RFC3339-emitting source).
123    // Kept dep-free on purpose — the accepted RFC3339 subset matches what
124    // `activity.jsonl` producers actually emit.
125    if let Ok(ms) = s.parse::<u128>() {
126        return Some(std::time::UNIX_EPOCH + std::time::Duration::from_millis(ms as u64));
127    }
128    parse_rfc3339(s)
129}
130
131/// Minimal RFC3339 parser: `YYYY-MM-DDTHH:MM:SS[.frac][Z|±HH:MM|±HHMM]`.
132/// Accepts `T`/`t`/space as the date-time separator. Returns `None` for any
133/// malformed input — callers already treat `None` as "no usable timestamp".
134fn parse_rfc3339(s: &str) -> Option<std::time::SystemTime> {
135    let b = s.as_bytes();
136    if b.len() < 20 {
137        return None;
138    }
139    // Fixed-position punctuation check: YYYY-MM-DDTHH:MM:SS
140    if b[4] != b'-' || b[7] != b'-' || b[13] != b':' || b[16] != b':' {
141        return None;
142    }
143    if b[10] != b'T' && b[10] != b't' && b[10] != b' ' {
144        return None;
145    }
146
147    let year: i32 = s.get(0..4)?.parse().ok()?;
148    let month: u32 = s.get(5..7)?.parse().ok()?;
149    let day: u32 = s.get(8..10)?.parse().ok()?;
150    let hour: u32 = s.get(11..13)?.parse().ok()?;
151    let min: u32 = s.get(14..16)?.parse().ok()?;
152    let sec: u32 = s.get(17..19)?.parse().ok()?;
153
154    if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
155        return None;
156    }
157    if hour > 23 || min > 59 || sec > 60 {
158        return None;
159    }
160
161    let mut rest = &s[19..];
162    let mut nanos: u32 = 0;
163    if let Some(after_dot) = rest.strip_prefix('.') {
164        let frac_end = after_dot
165            .find(['Z', 'z', '+', '-'])
166            .unwrap_or(after_dot.len());
167        let frac = &after_dot[..frac_end];
168        if frac.is_empty() || frac.len() > 9 || !frac.chars().all(|c| c.is_ascii_digit()) {
169            return None;
170        }
171        let n: u64 = frac.parse().ok()?;
172        nanos = (n * 10u64.pow(9 - frac.len() as u32)) as u32;
173        rest = &after_dot[frac_end..];
174    }
175
176    let offset_secs: i64 = if rest.eq_ignore_ascii_case("z") {
177        0
178    } else if let Some(rem) = rest.strip_prefix('+').or_else(|| rest.strip_prefix('-')) {
179        let sign: i64 = if rest.starts_with('+') { 1 } else { -1 };
180        let (hh, mm) = match rem.len() {
181            5 if rem.as_bytes()[2] == b':' => (rem.get(0..2)?, rem.get(3..5)?),
182            4 => (rem.get(0..2)?, rem.get(2..4)?),
183            _ => return None,
184        };
185        let hh: i64 = hh.parse().ok()?;
186        let mm: i64 = mm.parse().ok()?;
187        if !(0..=23).contains(&hh) || !(0..=59).contains(&mm) {
188            return None;
189        }
190        sign * (hh * 3600 + mm * 60)
191    } else {
192        return None;
193    };
194
195    let days = days_from_civil(year, month, day);
196    let total = days
197        .checked_mul(86400)?
198        .checked_add(hour as i64 * 3600 + min as i64 * 60 + sec as i64)?
199        .checked_sub(offset_secs)?;
200    if total < 0 {
201        return None;
202    }
203    Some(
204        std::time::UNIX_EPOCH
205            + std::time::Duration::from_secs(total as u64)
206            + std::time::Duration::from_nanos(nanos as u64),
207    )
208}
209
210/// Days from 1970-01-01 to `(y, m, d)` using Howard Hinnant's civil→days algorithm.
211/// Handles negative years correctly; valid for the full proleptic Gregorian range.
212fn days_from_civil(y: i32, m: u32, d: u32) -> i64 {
213    let y = if m <= 2 { y as i64 - 1 } else { y as i64 };
214    let era = if y >= 0 { y } else { y - 399 } / 400;
215    let yoe = y - era * 400; // [0, 399]
216    let m_adj = if m > 2 { m as i64 - 3 } else { m as i64 + 9 };
217    let doy = (153 * m_adj + 2) / 5 + d as i64 - 1; // [0, 365]
218    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; // [0, 146096]
219    era * 146097 + doe - 719468
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use std::time::{SystemTime, UNIX_EPOCH};
226
227    fn unique_workspace(label: &str) -> PathBuf {
228        let nanos = SystemTime::now()
229            .duration_since(UNIX_EPOCH)
230            .unwrap()
231            .as_nanos();
232        let p = std::env::temp_dir().join(format!("ao-rs-activity-log-{label}-{nanos}"));
233        std::fs::create_dir_all(&p).unwrap();
234        p
235    }
236
237    fn now_ms() -> u64 {
238        SystemTime::now()
239            .duration_since(UNIX_EPOCH)
240            .unwrap()
241            .as_millis() as u64
242    }
243
244    #[test]
245    fn actionable_state_respects_staleness() {
246        let now = std::time::UNIX_EPOCH + std::time::Duration::from_secs(1_000_000);
247        let fresh = ActivityLogEntry {
248            ts: (1_000_000u64 * 1000).to_string(),
249            state: ActivityState::WaitingInput,
250            source: "terminal".into(),
251            trigger: Some("prompt".into()),
252        };
253        assert_eq!(
254            check_actionable_state(Some(&fresh), now),
255            Some(ActivityState::WaitingInput)
256        );
257
258        let stale = ActivityLogEntry {
259            ts: ((1_000_000u64 - (ACTIVITY_INPUT_STALENESS_SECS + 1)) * 1000).to_string(),
260            state: ActivityState::Blocked,
261            source: "terminal".into(),
262            trigger: None,
263        };
264        assert_eq!(check_actionable_state(Some(&stale), now), None);
265    }
266
267    #[test]
268    fn detect_from_log_missing_returns_none() {
269        let ws = unique_workspace("missing");
270        assert!(detect_activity_from_log(&ws).is_none());
271    }
272
273    #[test]
274    fn detect_from_log_exited_always_wins() {
275        let ws = unique_workspace("exited");
276        // Even with a timestamp that's way past the staleness cap,
277        // an Exited entry should surface — terminal is one-way.
278        let ancient = ActivityLogEntry {
279            ts: "0".into(),
280            state: ActivityState::Exited,
281            source: "terminal".into(),
282            trigger: None,
283        };
284        append_activity_entry(&ws, &ancient).unwrap();
285        assert_eq!(
286            detect_activity_from_log(&ws),
287            Some(ActivityState::Exited),
288            "stale Exited entries should still surface",
289        );
290    }
291
292    #[test]
293    fn detect_from_log_fresh_waiting_input_surfaces() {
294        let ws = unique_workspace("fresh-waiting");
295        let entry = ActivityLogEntry {
296            ts: now_ms().to_string(),
297            state: ActivityState::WaitingInput,
298            source: "terminal".into(),
299            trigger: Some("approve?".into()),
300        };
301        append_activity_entry(&ws, &entry).unwrap();
302        assert_eq!(
303            detect_activity_from_log(&ws),
304            Some(ActivityState::WaitingInput)
305        );
306    }
307
308    #[test]
309    fn detect_from_log_fresh_blocked_surfaces() {
310        let ws = unique_workspace("fresh-blocked");
311        let entry = ActivityLogEntry {
312            ts: now_ms().to_string(),
313            state: ActivityState::Blocked,
314            source: "terminal".into(),
315            trigger: Some("error".into()),
316        };
317        append_activity_entry(&ws, &entry).unwrap();
318        assert_eq!(detect_activity_from_log(&ws), Some(ActivityState::Blocked));
319    }
320
321    #[test]
322    fn detect_from_log_stale_actionable_falls_through() {
323        let ws = unique_workspace("stale-actionable");
324        let stale_ms = now_ms().saturating_sub((ACTIVITY_INPUT_STALENESS_SECS + 60) * 1000);
325        let entry = ActivityLogEntry {
326            ts: stale_ms.to_string(),
327            state: ActivityState::WaitingInput,
328            source: "terminal".into(),
329            trigger: None,
330        };
331        append_activity_entry(&ws, &entry).unwrap();
332        assert!(detect_activity_from_log(&ws).is_none());
333    }
334
335    #[test]
336    fn detect_from_log_ignores_active_and_ready() {
337        let ws = unique_workspace("active-ready");
338        // A fresh Active should not surface from the default detector —
339        // Active is a noisy signal that belongs to the plugin's own logic.
340        let entry = ActivityLogEntry {
341            ts: now_ms().to_string(),
342            state: ActivityState::Active,
343            source: "terminal".into(),
344            trigger: None,
345        };
346        append_activity_entry(&ws, &entry).unwrap();
347        assert!(detect_activity_from_log(&ws).is_none());
348    }
349
350    #[test]
351    fn parse_numeric_ms_still_works() {
352        let got = chrono_like_parse("1700000000000").unwrap();
353        let want = UNIX_EPOCH + std::time::Duration::from_millis(1_700_000_000_000);
354        assert_eq!(got, want);
355    }
356
357    #[test]
358    fn parse_rfc3339_utc_z() {
359        // 2024-01-15T10:30:00Z  =>  1705314600 seconds since epoch
360        let got = chrono_like_parse("2024-01-15T10:30:00Z").unwrap();
361        let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
362        assert_eq!(got, want);
363    }
364
365    #[test]
366    fn parse_rfc3339_with_milliseconds() {
367        // Matches `new Date().toISOString()` shape used by the TS writer.
368        let got = chrono_like_parse("2024-01-15T10:30:00.123Z").unwrap();
369        let want = UNIX_EPOCH
370            + std::time::Duration::from_secs(1_705_314_600)
371            + std::time::Duration::from_millis(123);
372        assert_eq!(got, want);
373    }
374
375    #[test]
376    fn parse_rfc3339_with_positive_offset() {
377        // 10:30:00+02:00 == 08:30:00Z == 1705307400
378        let got = chrono_like_parse("2024-01-15T10:30:00+02:00").unwrap();
379        let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_307_400);
380        assert_eq!(got, want);
381    }
382
383    #[test]
384    fn parse_rfc3339_with_negative_offset_and_micros() {
385        // 10:30:00.500000-05:00 == 15:30:00.5Z == 1705332600.5
386        let got = chrono_like_parse("2024-01-15T10:30:00.500000-05:00").unwrap();
387        let want = UNIX_EPOCH
388            + std::time::Duration::from_secs(1_705_332_600)
389            + std::time::Duration::from_millis(500);
390        assert_eq!(got, want);
391    }
392
393    #[test]
394    fn parse_rfc3339_lowercase_t_and_z() {
395        // RFC3339 permits lowercase `t`/`z`; several emitters rely on it.
396        let got = chrono_like_parse("2024-01-15t10:30:00z").unwrap();
397        let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
398        assert_eq!(got, want);
399    }
400
401    #[test]
402    fn parse_rejects_malformed() {
403        assert!(chrono_like_parse("").is_none());
404        assert!(chrono_like_parse("not-a-date").is_none());
405        // Missing timezone indicator — ambiguous, reject.
406        assert!(chrono_like_parse("2024-01-15T10:30:00").is_none());
407        // Bad punctuation.
408        assert!(chrono_like_parse("2024/01/15T10:30:00Z").is_none());
409        // Out-of-range month.
410        assert!(chrono_like_parse("2024-13-15T10:30:00Z").is_none());
411        // Garbage fractional.
412        assert!(chrono_like_parse("2024-01-15T10:30:00.abcZ").is_none());
413    }
414
415    #[test]
416    fn actionable_state_respects_staleness_rfc3339() {
417        // Same staleness logic as `actionable_state_respects_staleness`,
418        // but the entry's `ts` is RFC3339 instead of numeric ms.
419        let now = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
420        let fresh = ActivityLogEntry {
421            ts: "2024-01-15T10:30:00Z".into(),
422            state: ActivityState::WaitingInput,
423            source: "terminal".into(),
424            trigger: Some("prompt".into()),
425        };
426        assert_eq!(
427            check_actionable_state(Some(&fresh), now),
428            Some(ActivityState::WaitingInput)
429        );
430
431        let stale = ActivityLogEntry {
432            // 10 minutes earlier — well past the 5-minute staleness cap.
433            ts: "2024-01-15T10:20:00Z".into(),
434            state: ActivityState::Blocked,
435            source: "terminal".into(),
436            trigger: None,
437        };
438        assert_eq!(check_actionable_state(Some(&stale), now), None);
439    }
440
441    #[test]
442    fn detect_from_log_uses_last_entry() {
443        let ws = unique_workspace("last-entry");
444        // First: blocked (actionable). Then: active (noisy).
445        // The last line wins — since it's Active, we return None.
446        append_activity_entry(
447            &ws,
448            &ActivityLogEntry {
449                ts: now_ms().to_string(),
450                state: ActivityState::Blocked,
451                source: "terminal".into(),
452                trigger: None,
453            },
454        )
455        .unwrap();
456        append_activity_entry(
457            &ws,
458            &ActivityLogEntry {
459                ts: now_ms().to_string(),
460                state: ActivityState::Active,
461                source: "terminal".into(),
462                trigger: None,
463            },
464        )
465        .unwrap();
466        assert!(detect_activity_from_log(&ws).is_none());
467    }
468}