1use crate::types::ActivityState;
7use serde::{Deserialize, Serialize};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11pub const ACTIVITY_INPUT_STALENESS_SECS: u64 = 5 * 60;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct ActivityLogEntry {
15 pub ts: String,
16 pub state: ActivityState,
17 pub source: String,
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub trigger: Option<String>,
20}
21
22pub fn activity_log_path(workspace_path: &Path) -> PathBuf {
23 workspace_path.join(".ao").join("activity.jsonl")
24}
25
26pub fn append_activity_entry(
27 workspace_path: &Path,
28 entry: &ActivityLogEntry,
29) -> std::io::Result<()> {
30 let p = activity_log_path(workspace_path);
31 if let Some(parent) = p.parent() {
32 std::fs::create_dir_all(parent)?;
33 }
34 let line = serde_json::to_string(entry).unwrap_or_default();
35 std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(p)?
39 .write_all(format!("{line}\n").as_bytes())?;
40 Ok(())
41}
42
43pub fn read_last_activity_entry(
44 workspace_path: &Path,
45) -> std::io::Result<Option<(ActivityLogEntry, std::time::SystemTime)>> {
46 use std::io::{BufRead, Seek};
47
48 let p = activity_log_path(workspace_path);
49 let meta = match std::fs::metadata(&p) {
50 Ok(m) => m,
51 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
52 Err(e) => return Err(e),
53 };
54 if meta.len() == 0 {
55 return Ok(None);
56 }
57 let modified = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
58
59 let tail_size: u64 = 4096;
61 let offset = meta.len().saturating_sub(tail_size);
62 let mut f = std::fs::File::open(&p)?;
63 if offset > 0 {
64 f.seek(std::io::SeekFrom::Start(offset))?;
65 }
66 let mut r = std::io::BufReader::new(f);
67 if offset > 0 {
68 let mut _discard = String::new();
70 let _ = r.read_line(&mut _discard);
71 }
72
73 let mut last_ok: Option<ActivityLogEntry> = None;
74 for line in r.lines().map_while(std::result::Result::ok) {
75 if line.trim().is_empty() {
76 continue;
77 }
78 if let Ok(e) = serde_json::from_str::<ActivityLogEntry>(&line) {
79 last_ok = Some(e);
80 }
81 }
82 Ok(last_ok.map(|e| (e, modified)))
83}
84
85pub fn check_actionable_state(
87 entry: Option<&ActivityLogEntry>,
88 now: std::time::SystemTime,
89) -> Option<ActivityState> {
90 let e = entry?;
91 if !matches!(
92 e.state,
93 ActivityState::WaitingInput | ActivityState::Blocked
94 ) {
95 return None;
96 }
97 let ts = chrono_like_parse(&e.ts)?;
98 let age = now.duration_since(ts).ok()?.as_secs();
99 (age <= ACTIVITY_INPUT_STALENESS_SECS).then_some(e.state)
100}
101
102pub fn detect_activity_from_log(workspace_path: &Path) -> Option<ActivityState> {
113 let (entry, _modified) = read_last_activity_entry(workspace_path).ok().flatten()?;
114 if entry.state == ActivityState::Exited {
115 return Some(ActivityState::Exited);
116 }
117 check_actionable_state(Some(&entry), std::time::SystemTime::now())
118}
119
120fn chrono_like_parse(s: &str) -> Option<std::time::SystemTime> {
121 if let Ok(ms) = s.parse::<u128>() {
126 return Some(std::time::UNIX_EPOCH + std::time::Duration::from_millis(ms as u64));
127 }
128 parse_rfc3339(s)
129}
130
131fn parse_rfc3339(s: &str) -> Option<std::time::SystemTime> {
135 let b = s.as_bytes();
136 if b.len() < 20 {
137 return None;
138 }
139 if b[4] != b'-' || b[7] != b'-' || b[13] != b':' || b[16] != b':' {
141 return None;
142 }
143 if b[10] != b'T' && b[10] != b't' && b[10] != b' ' {
144 return None;
145 }
146
147 let year: i32 = s.get(0..4)?.parse().ok()?;
148 let month: u32 = s.get(5..7)?.parse().ok()?;
149 let day: u32 = s.get(8..10)?.parse().ok()?;
150 let hour: u32 = s.get(11..13)?.parse().ok()?;
151 let min: u32 = s.get(14..16)?.parse().ok()?;
152 let sec: u32 = s.get(17..19)?.parse().ok()?;
153
154 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
155 return None;
156 }
157 if hour > 23 || min > 59 || sec > 60 {
158 return None;
159 }
160
161 let mut rest = &s[19..];
162 let mut nanos: u32 = 0;
163 if let Some(after_dot) = rest.strip_prefix('.') {
164 let frac_end = after_dot
165 .find(['Z', 'z', '+', '-'])
166 .unwrap_or(after_dot.len());
167 let frac = &after_dot[..frac_end];
168 if frac.is_empty() || frac.len() > 9 || !frac.chars().all(|c| c.is_ascii_digit()) {
169 return None;
170 }
171 let n: u64 = frac.parse().ok()?;
172 nanos = (n * 10u64.pow(9 - frac.len() as u32)) as u32;
173 rest = &after_dot[frac_end..];
174 }
175
176 let offset_secs: i64 = if rest.eq_ignore_ascii_case("z") {
177 0
178 } else if let Some(rem) = rest.strip_prefix('+').or_else(|| rest.strip_prefix('-')) {
179 let sign: i64 = if rest.starts_with('+') { 1 } else { -1 };
180 let (hh, mm) = match rem.len() {
181 5 if rem.as_bytes()[2] == b':' => (rem.get(0..2)?, rem.get(3..5)?),
182 4 => (rem.get(0..2)?, rem.get(2..4)?),
183 _ => return None,
184 };
185 let hh: i64 = hh.parse().ok()?;
186 let mm: i64 = mm.parse().ok()?;
187 if !(0..=23).contains(&hh) || !(0..=59).contains(&mm) {
188 return None;
189 }
190 sign * (hh * 3600 + mm * 60)
191 } else {
192 return None;
193 };
194
195 let days = days_from_civil(year, month, day);
196 let total = days
197 .checked_mul(86400)?
198 .checked_add(hour as i64 * 3600 + min as i64 * 60 + sec as i64)?
199 .checked_sub(offset_secs)?;
200 if total < 0 {
201 return None;
202 }
203 Some(
204 std::time::UNIX_EPOCH
205 + std::time::Duration::from_secs(total as u64)
206 + std::time::Duration::from_nanos(nanos as u64),
207 )
208}
209
210fn days_from_civil(y: i32, m: u32, d: u32) -> i64 {
213 let y = if m <= 2 { y as i64 - 1 } else { y as i64 };
214 let era = if y >= 0 { y } else { y - 399 } / 400;
215 let yoe = y - era * 400; let m_adj = if m > 2 { m as i64 - 3 } else { m as i64 + 9 };
217 let doy = (153 * m_adj + 2) / 5 + d as i64 - 1; let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; era * 146097 + doe - 719468
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use std::time::{SystemTime, UNIX_EPOCH};
226
227 fn unique_workspace(label: &str) -> PathBuf {
228 let nanos = SystemTime::now()
229 .duration_since(UNIX_EPOCH)
230 .unwrap()
231 .as_nanos();
232 let p = std::env::temp_dir().join(format!("ao-rs-activity-log-{label}-{nanos}"));
233 std::fs::create_dir_all(&p).unwrap();
234 p
235 }
236
237 fn now_ms() -> u64 {
238 SystemTime::now()
239 .duration_since(UNIX_EPOCH)
240 .unwrap()
241 .as_millis() as u64
242 }
243
244 #[test]
245 fn actionable_state_respects_staleness() {
246 let now = std::time::UNIX_EPOCH + std::time::Duration::from_secs(1_000_000);
247 let fresh = ActivityLogEntry {
248 ts: (1_000_000u64 * 1000).to_string(),
249 state: ActivityState::WaitingInput,
250 source: "terminal".into(),
251 trigger: Some("prompt".into()),
252 };
253 assert_eq!(
254 check_actionable_state(Some(&fresh), now),
255 Some(ActivityState::WaitingInput)
256 );
257
258 let stale = ActivityLogEntry {
259 ts: ((1_000_000u64 - (ACTIVITY_INPUT_STALENESS_SECS + 1)) * 1000).to_string(),
260 state: ActivityState::Blocked,
261 source: "terminal".into(),
262 trigger: None,
263 };
264 assert_eq!(check_actionable_state(Some(&stale), now), None);
265 }
266
267 #[test]
268 fn detect_from_log_missing_returns_none() {
269 let ws = unique_workspace("missing");
270 assert!(detect_activity_from_log(&ws).is_none());
271 }
272
273 #[test]
274 fn detect_from_log_exited_always_wins() {
275 let ws = unique_workspace("exited");
276 let ancient = ActivityLogEntry {
279 ts: "0".into(),
280 state: ActivityState::Exited,
281 source: "terminal".into(),
282 trigger: None,
283 };
284 append_activity_entry(&ws, &ancient).unwrap();
285 assert_eq!(
286 detect_activity_from_log(&ws),
287 Some(ActivityState::Exited),
288 "stale Exited entries should still surface",
289 );
290 }
291
292 #[test]
293 fn detect_from_log_fresh_waiting_input_surfaces() {
294 let ws = unique_workspace("fresh-waiting");
295 let entry = ActivityLogEntry {
296 ts: now_ms().to_string(),
297 state: ActivityState::WaitingInput,
298 source: "terminal".into(),
299 trigger: Some("approve?".into()),
300 };
301 append_activity_entry(&ws, &entry).unwrap();
302 assert_eq!(
303 detect_activity_from_log(&ws),
304 Some(ActivityState::WaitingInput)
305 );
306 }
307
308 #[test]
309 fn detect_from_log_fresh_blocked_surfaces() {
310 let ws = unique_workspace("fresh-blocked");
311 let entry = ActivityLogEntry {
312 ts: now_ms().to_string(),
313 state: ActivityState::Blocked,
314 source: "terminal".into(),
315 trigger: Some("error".into()),
316 };
317 append_activity_entry(&ws, &entry).unwrap();
318 assert_eq!(detect_activity_from_log(&ws), Some(ActivityState::Blocked));
319 }
320
321 #[test]
322 fn detect_from_log_stale_actionable_falls_through() {
323 let ws = unique_workspace("stale-actionable");
324 let stale_ms = now_ms().saturating_sub((ACTIVITY_INPUT_STALENESS_SECS + 60) * 1000);
325 let entry = ActivityLogEntry {
326 ts: stale_ms.to_string(),
327 state: ActivityState::WaitingInput,
328 source: "terminal".into(),
329 trigger: None,
330 };
331 append_activity_entry(&ws, &entry).unwrap();
332 assert!(detect_activity_from_log(&ws).is_none());
333 }
334
335 #[test]
336 fn detect_from_log_ignores_active_and_ready() {
337 let ws = unique_workspace("active-ready");
338 let entry = ActivityLogEntry {
341 ts: now_ms().to_string(),
342 state: ActivityState::Active,
343 source: "terminal".into(),
344 trigger: None,
345 };
346 append_activity_entry(&ws, &entry).unwrap();
347 assert!(detect_activity_from_log(&ws).is_none());
348 }
349
350 #[test]
351 fn parse_numeric_ms_still_works() {
352 let got = chrono_like_parse("1700000000000").unwrap();
353 let want = UNIX_EPOCH + std::time::Duration::from_millis(1_700_000_000_000);
354 assert_eq!(got, want);
355 }
356
357 #[test]
358 fn parse_rfc3339_utc_z() {
359 let got = chrono_like_parse("2024-01-15T10:30:00Z").unwrap();
361 let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
362 assert_eq!(got, want);
363 }
364
365 #[test]
366 fn parse_rfc3339_with_milliseconds() {
367 let got = chrono_like_parse("2024-01-15T10:30:00.123Z").unwrap();
369 let want = UNIX_EPOCH
370 + std::time::Duration::from_secs(1_705_314_600)
371 + std::time::Duration::from_millis(123);
372 assert_eq!(got, want);
373 }
374
375 #[test]
376 fn parse_rfc3339_with_positive_offset() {
377 let got = chrono_like_parse("2024-01-15T10:30:00+02:00").unwrap();
379 let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_307_400);
380 assert_eq!(got, want);
381 }
382
383 #[test]
384 fn parse_rfc3339_with_negative_offset_and_micros() {
385 let got = chrono_like_parse("2024-01-15T10:30:00.500000-05:00").unwrap();
387 let want = UNIX_EPOCH
388 + std::time::Duration::from_secs(1_705_332_600)
389 + std::time::Duration::from_millis(500);
390 assert_eq!(got, want);
391 }
392
393 #[test]
394 fn parse_rfc3339_lowercase_t_and_z() {
395 let got = chrono_like_parse("2024-01-15t10:30:00z").unwrap();
397 let want = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
398 assert_eq!(got, want);
399 }
400
401 #[test]
402 fn parse_rejects_malformed() {
403 assert!(chrono_like_parse("").is_none());
404 assert!(chrono_like_parse("not-a-date").is_none());
405 assert!(chrono_like_parse("2024-01-15T10:30:00").is_none());
407 assert!(chrono_like_parse("2024/01/15T10:30:00Z").is_none());
409 assert!(chrono_like_parse("2024-13-15T10:30:00Z").is_none());
411 assert!(chrono_like_parse("2024-01-15T10:30:00.abcZ").is_none());
413 }
414
415 #[test]
416 fn actionable_state_respects_staleness_rfc3339() {
417 let now = UNIX_EPOCH + std::time::Duration::from_secs(1_705_314_600);
420 let fresh = ActivityLogEntry {
421 ts: "2024-01-15T10:30:00Z".into(),
422 state: ActivityState::WaitingInput,
423 source: "terminal".into(),
424 trigger: Some("prompt".into()),
425 };
426 assert_eq!(
427 check_actionable_state(Some(&fresh), now),
428 Some(ActivityState::WaitingInput)
429 );
430
431 let stale = ActivityLogEntry {
432 ts: "2024-01-15T10:20:00Z".into(),
434 state: ActivityState::Blocked,
435 source: "terminal".into(),
436 trigger: None,
437 };
438 assert_eq!(check_actionable_state(Some(&stale), now), None);
439 }
440
441 #[test]
442 fn detect_from_log_uses_last_entry() {
443 let ws = unique_workspace("last-entry");
444 append_activity_entry(
447 &ws,
448 &ActivityLogEntry {
449 ts: now_ms().to_string(),
450 state: ActivityState::Blocked,
451 source: "terminal".into(),
452 trigger: None,
453 },
454 )
455 .unwrap();
456 append_activity_entry(
457 &ws,
458 &ActivityLogEntry {
459 ts: now_ms().to_string(),
460 state: ActivityState::Active,
461 source: "terminal".into(),
462 trigger: None,
463 },
464 )
465 .unwrap();
466 assert!(detect_activity_from_log(&ws).is_none());
467 }
468}