Skip to main content

zeph_subagent/
transcript.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::fs::{self, File, OpenOptions};
5use std::io::{self, BufRead, BufReader, Write as _};
6use std::path::{Path, PathBuf};
7
8use serde::{Deserialize, Serialize};
9use zeph_llm::provider::Message;
10
11use super::error::SubAgentError;
12use super::state::SubAgentState;
13
14/// A single entry in a JSONL transcript file.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TranscriptEntry {
17    pub seq: u32,
18    /// ISO 8601 timestamp (UTC).
19    pub timestamp: String,
20    pub message: Message,
21}
22
23/// Sidecar metadata for a transcript, written as `<agent_id>.meta.json`.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TranscriptMeta {
26    pub agent_id: String,
27    pub agent_name: String,
28    pub def_name: String,
29    pub status: SubAgentState,
30    pub started_at: String,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub finished_at: Option<String>,
33    /// ID of the original agent session this was resumed from.
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub resumed_from: Option<String>,
36    pub turns_used: u32,
37}
38
39/// Appends `TranscriptEntry` lines to a JSONL file.
40///
41/// The file handle is kept open for the writer's lifetime to avoid
42/// race conditions from repeated open/close cycles.
43pub struct TranscriptWriter {
44    file: File,
45}
46
47impl TranscriptWriter {
48    /// Create (or open) a JSONL transcript file in append mode.
49    ///
50    /// Creates parent directories if they do not already exist.
51    ///
52    /// # Errors
53    ///
54    /// Returns `io::Error` if the directory cannot be created or the file cannot be opened.
55    pub fn new(path: &Path) -> io::Result<Self> {
56        if let Some(parent) = path.parent() {
57            fs::create_dir_all(parent)?;
58        }
59        let file = open_private(path)?;
60        Ok(Self { file })
61    }
62
63    /// Append a single message as a JSON line and flush immediately.
64    ///
65    /// # Errors
66    ///
67    /// Returns `io::Error` on serialization or write failure.
68    pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
69        let entry = TranscriptEntry {
70            seq,
71            timestamp: utc_now(),
72            message: message.clone(),
73        };
74        let line = serde_json::to_string(&entry)
75            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
76        self.file.write_all(line.as_bytes())?;
77        self.file.write_all(b"\n")?;
78        self.file.flush()
79    }
80
81    /// Write the meta sidecar file for an agent.
82    ///
83    /// # Errors
84    ///
85    /// Returns `io::Error` on serialization or write failure.
86    pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
87        let path = dir.join(format!("{agent_id}.meta.json"));
88        let content = serde_json::to_string_pretty(meta)
89            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
90        write_private(&path, content.as_bytes())
91    }
92}
93
94/// Reads and reconstructs message history from JSONL transcript files.
95pub struct TranscriptReader;
96
97impl TranscriptReader {
98    /// Load all messages from a JSONL transcript file.
99    ///
100    /// Malformed lines are skipped with a warning. An empty or missing file
101    /// returns an empty `Vec`. If the file does not exist at all but a matching
102    /// `.meta.json` sidecar exists, returns `SubAgentError::Transcript` with a
103    /// clear message so the caller knows the data is gone rather than silently
104    /// degrading to a fresh start.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`SubAgentError::Transcript`] on unrecoverable I/O failures, or
109    /// when the transcript file is missing but meta exists (data-loss guard).
110    pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
111        let file = match File::open(path) {
112            Ok(f) => f,
113            Err(e) if e.kind() == io::ErrorKind::NotFound => {
114                // Check if a meta sidecar exists — if so, data has been lost.
115                // Build meta path from the file stem (e.g. "abc" from "abc.jsonl")
116                // so it is consistent with write_meta which uses format!("{agent_id}.meta.json").
117                let meta_path =
118                    if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
119                        parent.join(format!("{}.meta.json", stem.to_string_lossy()))
120                    } else {
121                        path.with_extension("meta.json")
122                    };
123                if meta_path.exists() {
124                    return Err(SubAgentError::Transcript(format!(
125                        "transcript file '{}' is missing but meta sidecar exists — \
126                         transcript data may have been deleted",
127                        path.display()
128                    )));
129                }
130                return Ok(vec![]);
131            }
132            Err(e) => {
133                return Err(SubAgentError::Transcript(format!(
134                    "failed to open transcript '{}': {e}",
135                    path.display()
136                )));
137            }
138        };
139
140        let reader = BufReader::new(file);
141        let mut messages = Vec::new();
142        for (line_no, line_result) in reader.lines().enumerate() {
143            let line = match line_result {
144                Ok(l) => l,
145                Err(e) => {
146                    tracing::warn!(
147                        path = %path.display(),
148                        line = line_no + 1,
149                        error = %e,
150                        "failed to read transcript line — skipping"
151                    );
152                    continue;
153                }
154            };
155            let trimmed = line.trim();
156            if trimmed.is_empty() {
157                continue;
158            }
159            match serde_json::from_str::<TranscriptEntry>(trimmed) {
160                Ok(entry) => messages.push(entry.message),
161                Err(e) => {
162                    tracing::warn!(
163                        path = %path.display(),
164                        line = line_no + 1,
165                        error = %e,
166                        "malformed transcript entry — skipping"
167                    );
168                }
169            }
170        }
171        Ok(messages)
172    }
173
174    /// Load the meta sidecar for an agent.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`SubAgentError::NotFound`] if the file does not exist,
179    /// [`SubAgentError::Transcript`] on parse failure.
180    pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
181        let path = dir.join(format!("{agent_id}.meta.json"));
182        let content = fs::read_to_string(&path).map_err(|e| {
183            if e.kind() == io::ErrorKind::NotFound {
184                SubAgentError::NotFound(agent_id.to_owned())
185            } else {
186                SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
187            }
188        })?;
189        serde_json::from_str(&content).map_err(|e| {
190            SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
191        })
192    }
193
194    /// Find the full agent ID by scanning `dir` for `.meta.json` files whose names
195    /// start with `prefix`.
196    ///
197    /// # Errors
198    ///
199    /// Returns [`SubAgentError::NotFound`] if no match is found,
200    /// [`SubAgentError::AmbiguousId`] if multiple matches are found,
201    /// [`SubAgentError::Transcript`] on I/O failure.
202    pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
203        let entries = fs::read_dir(dir).map_err(|e| {
204            SubAgentError::Transcript(format!(
205                "failed to read transcript dir '{}': {e}",
206                dir.display()
207            ))
208        })?;
209
210        let mut matches: Vec<String> = Vec::new();
211        for entry in entries {
212            let entry = entry
213                .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
214            let name = entry.file_name();
215            let name_str = name.to_string_lossy();
216            if let Some(agent_id) = name_str.strip_suffix(".meta.json")
217                && agent_id.starts_with(prefix)
218            {
219                matches.push(agent_id.to_owned());
220            }
221        }
222
223        match matches.len() {
224            0 => Err(SubAgentError::NotFound(prefix.to_owned())),
225            1 => Ok(matches.remove(0)),
226            n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
227        }
228    }
229}
230
231/// Delete the oldest `.jsonl` files in `dir` when the count exceeds `max_files`.
232///
233/// Files are sorted by modification time (oldest first). Returns the number of
234/// files deleted.
235///
236/// # Errors
237///
238/// Returns `io::Error` if the directory cannot be read or a file cannot be deleted.
239pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
240    if max_files == 0 {
241        return Ok(0);
242    }
243
244    // Create the directory if it does not exist yet (first run).
245    if !dir.exists() {
246        fs::create_dir_all(dir)?;
247        return Ok(0);
248    }
249
250    let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
251    for entry in fs::read_dir(dir)? {
252        let entry = entry?;
253        let path = entry.path();
254        if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
255            let mtime = entry
256                .metadata()
257                .and_then(|m| m.modified())
258                .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
259            jsonl_files.push((path, mtime));
260        }
261    }
262
263    if jsonl_files.len() <= max_files {
264        return Ok(0);
265    }
266
267    // Sort oldest first.
268    jsonl_files.sort_by_key(|(_, mtime)| *mtime);
269
270    let to_delete = jsonl_files.len() - max_files;
271    let mut deleted = 0;
272    for (path, _) in jsonl_files.into_iter().take(to_delete) {
273        // Also remove the companion .meta.json sidecar if present.
274        let meta = path.with_extension("meta.json");
275        if meta.exists() {
276            let _ = fs::remove_file(&meta);
277        }
278        fs::remove_file(&path)?;
279        deleted += 1;
280    }
281    Ok(deleted)
282}
283
284/// Open a file in append mode with owner-only permissions (0o600 on Unix).
285///
286/// On non-Unix platforms falls back to standard `OpenOptions` without extra permissions.
287fn open_private(path: &Path) -> io::Result<File> {
288    #[cfg(unix)]
289    {
290        use std::os::unix::fs::OpenOptionsExt as _;
291        OpenOptions::new()
292            .create(true)
293            .append(true)
294            .mode(0o600)
295            .open(path)
296    }
297    #[cfg(not(unix))]
298    {
299        OpenOptions::new().create(true).append(true).open(path)
300    }
301}
302
303/// Write `contents` to `path` atomically with owner-only permissions (0o600 on Unix).
304///
305/// On non-Unix platforms falls back to `fs::write`.
306fn write_private(path: &Path, contents: &[u8]) -> io::Result<()> {
307    #[cfg(unix)]
308    {
309        use std::os::unix::fs::OpenOptionsExt as _;
310        let mut file = OpenOptions::new()
311            .create(true)
312            .write(true)
313            .truncate(true)
314            .mode(0o600)
315            .open(path)?;
316        file.write_all(contents)?;
317        file.flush()
318    }
319    #[cfg(not(unix))]
320    {
321        fs::write(path, contents)
322    }
323}
324
325/// Returns the current UTC time as an ISO 8601 string.
326#[must_use]
327pub fn utc_now_pub() -> String {
328    utc_now()
329}
330
331fn utc_now() -> String {
332    // Use SystemTime for a zero-dependency ISO 8601 timestamp.
333    // Format: 2026-03-05T00:18:16Z
334    let secs = std::time::SystemTime::now()
335        .duration_since(std::time::UNIX_EPOCH)
336        .unwrap_or_default()
337        .as_secs();
338    let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
339    format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
340}
341
342/// Convert Unix epoch seconds to (year, month, day, hour, minute, second).
343///
344/// Uses the proleptic Gregorian calendar algorithm (Fliegel-Van Flandern variant).
345/// All values are u64 throughout to avoid truncating casts; the caller knows values
346/// fit in u32 for the ranges used (years 1970–2554, seconds/minutes/hours/days).
347fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
348    let sec = epoch % 60;
349    let epoch = epoch / 60;
350    let min = epoch % 60;
351    let epoch = epoch / 60;
352    let hour = epoch % 24;
353    let days = epoch / 24;
354
355    // Days since 1970-01-01 → civil calendar (Gregorian).
356    let z = days + 719_468;
357    let era = z / 146_097;
358    let doe = z - era * 146_097;
359    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
360    let year = yoe + era * 400;
361    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
362    let mp = (5 * doy + 2) / 153;
363    let day = doy - (153 * mp + 2) / 5 + 1;
364    let month = if mp < 10 { mp + 3 } else { mp - 9 };
365    let year = if month <= 2 { year + 1 } else { year };
366
367    // All values are in range for u32 for any timestamp in [1970, 2554].
368    #[allow(clippy::cast_possible_truncation)]
369    (
370        year as u32,
371        month as u32,
372        day as u32,
373        hour as u32,
374        min as u32,
375        sec as u32,
376    )
377}
378
379#[cfg(test)]
380mod tests {
381    use zeph_llm::provider::{Message, MessageMetadata, Role};
382
383    use super::*;
384
385    fn test_message(role: Role, content: &str) -> Message {
386        Message {
387            role,
388            content: content.to_owned(),
389            parts: vec![],
390            metadata: MessageMetadata::default(),
391        }
392    }
393
394    fn test_meta(agent_id: &str) -> TranscriptMeta {
395        TranscriptMeta {
396            agent_id: agent_id.to_owned(),
397            agent_name: "bot".to_owned(),
398            def_name: "bot".to_owned(),
399            status: SubAgentState::Completed,
400            started_at: "2026-01-01T00:00:00Z".to_owned(),
401            finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
402            resumed_from: None,
403            turns_used: 2,
404        }
405    }
406
407    #[test]
408    fn writer_reader_roundtrip() {
409        let dir = tempfile::tempdir().unwrap();
410        let path = dir.path().join("test.jsonl");
411
412        let msg1 = test_message(Role::User, "hello");
413        let msg2 = test_message(Role::Assistant, "world");
414
415        let mut writer = TranscriptWriter::new(&path).unwrap();
416        writer.append(0, &msg1).unwrap();
417        writer.append(1, &msg2).unwrap();
418        drop(writer);
419
420        let messages = TranscriptReader::load(&path).unwrap();
421        assert_eq!(messages.len(), 2);
422        assert_eq!(messages[0].content, "hello");
423        assert_eq!(messages[1].content, "world");
424    }
425
426    #[test]
427    fn load_missing_file_no_meta_returns_empty() {
428        let dir = tempfile::tempdir().unwrap();
429        let path = dir.path().join("ghost.jsonl");
430        let messages = TranscriptReader::load(&path).unwrap();
431        assert!(messages.is_empty());
432    }
433
434    #[test]
435    fn load_missing_file_with_meta_returns_error() {
436        let dir = tempfile::tempdir().unwrap();
437        let meta_path = dir.path().join("ghost.meta.json");
438        std::fs::write(&meta_path, "{}").unwrap();
439        let jsonl_path = dir.path().join("ghost.jsonl");
440        let err = TranscriptReader::load(&jsonl_path).unwrap_err();
441        assert!(matches!(err, SubAgentError::Transcript(_)));
442    }
443
444    #[test]
445    fn load_skips_malformed_lines() {
446        let dir = tempfile::tempdir().unwrap();
447        let path = dir.path().join("mixed.jsonl");
448
449        let good = test_message(Role::User, "good");
450        let entry = TranscriptEntry {
451            seq: 0,
452            timestamp: "2026-01-01T00:00:00Z".to_owned(),
453            message: good.clone(),
454        };
455        let good_line = serde_json::to_string(&entry).unwrap();
456        let content = format!("{good_line}\nnot valid json\n{good_line}\n");
457        std::fs::write(&path, &content).unwrap();
458
459        let messages = TranscriptReader::load(&path).unwrap();
460        assert_eq!(messages.len(), 2);
461    }
462
463    #[test]
464    fn meta_roundtrip() {
465        let dir = tempfile::tempdir().unwrap();
466        let meta = test_meta("abc-123");
467        TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
468        let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
469        assert_eq!(loaded.agent_id, "abc-123");
470        assert_eq!(loaded.turns_used, 2);
471    }
472
473    #[test]
474    fn meta_not_found_returns_not_found_error() {
475        let dir = tempfile::tempdir().unwrap();
476        let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
477        assert!(matches!(err, SubAgentError::NotFound(_)));
478    }
479
480    #[test]
481    fn find_by_prefix_exact() {
482        let dir = tempfile::tempdir().unwrap();
483        let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
484        TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
485            .unwrap();
486        let id =
487            TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
488                .unwrap();
489        assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
490    }
491
492    #[test]
493    fn find_by_prefix_short_prefix() {
494        let dir = tempfile::tempdir().unwrap();
495        let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
496        TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
497            .unwrap();
498        let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
499        assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
500    }
501
502    #[test]
503    fn find_by_prefix_not_found() {
504        let dir = tempfile::tempdir().unwrap();
505        let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
506        assert!(matches!(err, SubAgentError::NotFound(_)));
507    }
508
509    #[test]
510    fn find_by_prefix_ambiguous() {
511        let dir = tempfile::tempdir().unwrap();
512        TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
513        TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
514        let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
515        assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
516    }
517
518    #[test]
519    fn sweep_old_transcripts_removes_oldest() {
520        let dir = tempfile::tempdir().unwrap();
521
522        for i in 0..5u32 {
523            let path = dir.path().join(format!("file{i:02}.jsonl"));
524            std::fs::write(&path, b"").unwrap();
525            // Vary mtime by touching the file — not reliable without explicit mtime set,
526            // but tempdir files get sequential syscall timestamps in practice.
527            // We set the mtime explicitly via filetime crate... but we have no filetime dep.
528            // Instead we just verify count is correct.
529        }
530
531        let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
532        assert_eq!(deleted, 2);
533
534        let remaining: Vec<_> = std::fs::read_dir(dir.path())
535            .unwrap()
536            .filter_map(std::result::Result::ok)
537            .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
538            .collect();
539        assert_eq!(remaining.len(), 3);
540    }
541
542    #[test]
543    fn sweep_with_zero_max_does_nothing() {
544        let dir = tempfile::tempdir().unwrap();
545        std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
546        let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
547        assert_eq!(deleted, 0);
548    }
549
550    #[test]
551    fn sweep_below_max_does_nothing() {
552        let dir = tempfile::tempdir().unwrap();
553        std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
554        let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
555        assert_eq!(deleted, 0);
556    }
557
558    #[test]
559    fn utc_now_format() {
560        let ts = utc_now();
561        // Basic format check: 2026-03-05T00:18:16Z
562        assert_eq!(ts.len(), 20);
563        assert!(ts.ends_with('Z'));
564        assert!(ts.contains('T'));
565    }
566
567    #[test]
568    fn load_empty_file_returns_empty() {
569        let dir = tempfile::tempdir().unwrap();
570        let path = dir.path().join("empty.jsonl");
571        std::fs::write(&path, b"").unwrap();
572        let messages = TranscriptReader::load(&path).unwrap();
573        assert!(messages.is_empty());
574    }
575
576    #[test]
577    fn load_meta_invalid_json_returns_transcript_error() {
578        let dir = tempfile::tempdir().unwrap();
579        std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
580        let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
581        assert!(matches!(err, SubAgentError::Transcript(_)));
582    }
583
584    #[test]
585    fn sweep_removes_companion_meta() {
586        let dir = tempfile::tempdir().unwrap();
587        // Create 4 JSONL files each with a companion meta sidecar.
588        for i in 0..4u32 {
589            let stem = format!("file{i:02}");
590            std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
591            std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
592        }
593        let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
594        assert_eq!(deleted, 2);
595        // Companion metas for the two deleted files should also be gone.
596        let meta_count = std::fs::read_dir(dir.path())
597            .unwrap()
598            .filter_map(std::result::Result::ok)
599            .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
600            .count();
601        assert_eq!(
602            meta_count, 2,
603            "orphaned meta sidecars should have been removed"
604        );
605    }
606
607    #[test]
608    fn data_loss_guard_uses_stem_based_meta_path() {
609        // path.with_extension("meta.json") on "abc.jsonl" should yield "abc.meta.json"
610        // which matches write_meta's format!("{agent_id}.meta.json") when agent_id == stem.
611        let dir = tempfile::tempdir().unwrap();
612        let agent_id = "deadbeef-0000-0000-0000-000000000000";
613        // Write meta sidecar but not the JSONL file.
614        std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
615        let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
616        let err = TranscriptReader::load(&jsonl_path).unwrap_err();
617        assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
618    }
619}