Skip to main content

zeph_core/subagent/
transcript.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::fs::{self, File, OpenOptions};
5use std::io::{self, BufRead, BufReader, Write as _};
6use std::path::{Path, PathBuf};
7
8use serde::{Deserialize, Serialize};
9use zeph_llm::provider::Message;
10
11use super::error::SubAgentError;
12use super::state::SubAgentState;
13
14/// A single entry in a JSONL transcript file.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TranscriptEntry {
17    pub seq: u32,
18    /// ISO 8601 timestamp (UTC).
19    pub timestamp: String,
20    pub message: Message,
21}
22
23/// Sidecar metadata for a transcript, written as `<agent_id>.meta.json`.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TranscriptMeta {
26    pub agent_id: String,
27    pub agent_name: String,
28    pub def_name: String,
29    pub status: SubAgentState,
30    pub started_at: String,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub finished_at: Option<String>,
33    /// ID of the original agent session this was resumed from.
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub resumed_from: Option<String>,
36    pub turns_used: u32,
37}
38
39/// Appends `TranscriptEntry` lines to a JSONL file.
40///
41/// The file handle is kept open for the writer's lifetime to avoid
42/// race conditions from repeated open/close cycles.
43pub struct TranscriptWriter {
44    file: File,
45}
46
47impl TranscriptWriter {
48    /// Create (or open) a JSONL transcript file in append mode.
49    ///
50    /// Creates parent directories if they do not already exist.
51    ///
52    /// # Errors
53    ///
54    /// Returns `io::Error` if the directory cannot be created or the file cannot be opened.
55    pub fn new(path: &Path) -> io::Result<Self> {
56        if let Some(parent) = path.parent() {
57            fs::create_dir_all(parent)?;
58        }
59        let file = open_private(path)?;
60        Ok(Self { file })
61    }
62
63    /// Append a single message as a JSON line and flush immediately.
64    ///
65    /// # Errors
66    ///
67    /// Returns `io::Error` on serialization or write failure.
68    pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
69        let entry = TranscriptEntry {
70            seq,
71            timestamp: utc_now(),
72            message: message.clone(),
73        };
74        let line = serde_json::to_string(&entry)
75            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
76        self.file.write_all(line.as_bytes())?;
77        self.file.write_all(b"\n")?;
78        self.file.flush()
79    }
80
81    /// Write the meta sidecar file for an agent.
82    ///
83    /// # Errors
84    ///
85    /// Returns `io::Error` on serialization or write failure.
86    pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
87        let path = dir.join(format!("{agent_id}.meta.json"));
88        let content = serde_json::to_string_pretty(meta)
89            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
90        write_private(&path, content.as_bytes())
91    }
92}
93
94/// Reads and reconstructs message history from JSONL transcript files.
95pub struct TranscriptReader;
96
97impl TranscriptReader {
98    /// Load all messages from a JSONL transcript file.
99    ///
100    /// Malformed lines are skipped with a warning. An empty or missing file
101    /// returns an empty `Vec`. If the file does not exist at all but a matching
102    /// `.meta.json` sidecar exists, returns `SubAgentError::Transcript` with a
103    /// clear message so the caller knows the data is gone rather than silently
104    /// degrading to a fresh start.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`SubAgentError::Transcript`] on unrecoverable I/O failures, or
109    /// when the transcript file is missing but meta exists (data-loss guard).
110    pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
111        let file = match File::open(path) {
112            Ok(f) => f,
113            Err(e) if e.kind() == io::ErrorKind::NotFound => {
114                // Check if a meta sidecar exists — if so, data has been lost.
115                // Build meta path from the file stem (e.g. "abc" from "abc.jsonl")
116                // so it is consistent with write_meta which uses format!("{agent_id}.meta.json").
117                let meta_path =
118                    if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
119                        parent.join(format!("{}.meta.json", stem.to_string_lossy()))
120                    } else {
121                        path.with_extension("meta.json")
122                    };
123                if meta_path.exists() {
124                    return Err(SubAgentError::Transcript(format!(
125                        "transcript file '{}' is missing but meta sidecar exists — \
126                         transcript data may have been deleted",
127                        path.display()
128                    )));
129                }
130                return Ok(vec![]);
131            }
132            Err(e) => {
133                return Err(SubAgentError::Transcript(format!(
134                    "failed to open transcript '{}': {e}",
135                    path.display()
136                )));
137            }
138        };
139
140        let reader = BufReader::new(file);
141        let mut messages = Vec::new();
142        for (line_no, line_result) in reader.lines().enumerate() {
143            let line = match line_result {
144                Ok(l) => l,
145                Err(e) => {
146                    tracing::warn!(
147                        path = %path.display(),
148                        line = line_no + 1,
149                        error = %e,
150                        "failed to read transcript line — skipping"
151                    );
152                    continue;
153                }
154            };
155            let trimmed = line.trim();
156            if trimmed.is_empty() {
157                continue;
158            }
159            match serde_json::from_str::<TranscriptEntry>(trimmed) {
160                Ok(entry) => messages.push(entry.message),
161                Err(e) => {
162                    tracing::warn!(
163                        path = %path.display(),
164                        line = line_no + 1,
165                        error = %e,
166                        "malformed transcript entry — skipping"
167                    );
168                }
169            }
170        }
171        Ok(messages)
172    }
173
174    /// Load the meta sidecar for an agent.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`SubAgentError::NotFound`] if the file does not exist,
179    /// [`SubAgentError::Transcript`] on parse failure.
180    pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
181        let path = dir.join(format!("{agent_id}.meta.json"));
182        let content = fs::read_to_string(&path).map_err(|e| {
183            if e.kind() == io::ErrorKind::NotFound {
184                SubAgentError::NotFound(agent_id.to_owned())
185            } else {
186                SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
187            }
188        })?;
189        serde_json::from_str(&content).map_err(|e| {
190            SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
191        })
192    }
193
194    /// Find the full agent ID by scanning `dir` for `.meta.json` files whose names
195    /// start with `prefix`.
196    ///
197    /// # Errors
198    ///
199    /// Returns [`SubAgentError::NotFound`] if no match is found,
200    /// [`SubAgentError::AmbiguousId`] if multiple matches are found,
201    /// [`SubAgentError::Transcript`] on I/O failure.
202    pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
203        let entries = fs::read_dir(dir).map_err(|e| {
204            SubAgentError::Transcript(format!(
205                "failed to read transcript dir '{}': {e}",
206                dir.display()
207            ))
208        })?;
209
210        let mut matches: Vec<String> = Vec::new();
211        for entry in entries {
212            let entry = entry
213                .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
214            let name = entry.file_name();
215            let name_str = name.to_string_lossy();
216            if let Some(agent_id) = name_str.strip_suffix(".meta.json")
217                && agent_id.starts_with(prefix)
218            {
219                matches.push(agent_id.to_owned());
220            }
221        }
222
223        match matches.len() {
224            0 => Err(SubAgentError::NotFound(prefix.to_owned())),
225            1 => Ok(matches.remove(0)),
226            n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
227        }
228    }
229}
230
231/// Delete the oldest `.jsonl` files in `dir` when the count exceeds `max_files`.
232///
233/// Files are sorted by modification time (oldest first). Returns the number of
234/// files deleted.
235///
236/// # Errors
237///
238/// Returns `io::Error` if the directory cannot be read or a file cannot be deleted.
239pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
240    if max_files == 0 {
241        return Ok(0);
242    }
243
244    let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
245    for entry in fs::read_dir(dir)? {
246        let entry = entry?;
247        let path = entry.path();
248        if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
249            let mtime = entry
250                .metadata()
251                .and_then(|m| m.modified())
252                .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
253            jsonl_files.push((path, mtime));
254        }
255    }
256
257    if jsonl_files.len() <= max_files {
258        return Ok(0);
259    }
260
261    // Sort oldest first.
262    jsonl_files.sort_by_key(|(_, mtime)| *mtime);
263
264    let to_delete = jsonl_files.len() - max_files;
265    let mut deleted = 0;
266    for (path, _) in jsonl_files.into_iter().take(to_delete) {
267        // Also remove the companion .meta.json sidecar if present.
268        let meta = path.with_extension("meta.json");
269        if meta.exists() {
270            let _ = fs::remove_file(&meta);
271        }
272        fs::remove_file(&path)?;
273        deleted += 1;
274    }
275    Ok(deleted)
276}
277
278/// Open a file in append mode with owner-only permissions (0o600 on Unix).
279///
280/// On non-Unix platforms falls back to standard `OpenOptions` without extra permissions.
281fn open_private(path: &Path) -> io::Result<File> {
282    #[cfg(unix)]
283    {
284        use std::os::unix::fs::OpenOptionsExt as _;
285        OpenOptions::new()
286            .create(true)
287            .append(true)
288            .mode(0o600)
289            .open(path)
290    }
291    #[cfg(not(unix))]
292    {
293        OpenOptions::new().create(true).append(true).open(path)
294    }
295}
296
297/// Write `contents` to `path` atomically with owner-only permissions (0o600 on Unix).
298///
299/// On non-Unix platforms falls back to `fs::write`.
300fn write_private(path: &Path, contents: &[u8]) -> io::Result<()> {
301    #[cfg(unix)]
302    {
303        use std::os::unix::fs::OpenOptionsExt as _;
304        let mut file = OpenOptions::new()
305            .create(true)
306            .write(true)
307            .truncate(true)
308            .mode(0o600)
309            .open(path)?;
310        file.write_all(contents)?;
311        file.flush()
312    }
313    #[cfg(not(unix))]
314    {
315        fs::write(path, contents)
316    }
317}
318
319/// Returns the current UTC time as an ISO 8601 string.
320#[must_use]
321pub fn utc_now_pub() -> String {
322    utc_now()
323}
324
325fn utc_now() -> String {
326    // Use SystemTime for a zero-dependency ISO 8601 timestamp.
327    // Format: 2026-03-05T00:18:16Z
328    let secs = std::time::SystemTime::now()
329        .duration_since(std::time::UNIX_EPOCH)
330        .unwrap_or_default()
331        .as_secs();
332    let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
333    format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
334}
335
336/// Convert Unix epoch seconds to (year, month, day, hour, minute, second).
337///
338/// Uses the proleptic Gregorian calendar algorithm (Fliegel-Van Flandern variant).
339/// All values are u64 throughout to avoid truncating casts; the caller knows values
340/// fit in u32 for the ranges used (years 1970–2554, seconds/minutes/hours/days).
341fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
342    let sec = epoch % 60;
343    let epoch = epoch / 60;
344    let min = epoch % 60;
345    let epoch = epoch / 60;
346    let hour = epoch % 24;
347    let days = epoch / 24;
348
349    // Days since 1970-01-01 → civil calendar (Gregorian).
350    let z = days + 719_468;
351    let era = z / 146_097;
352    let doe = z - era * 146_097;
353    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
354    let year = yoe + era * 400;
355    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
356    let mp = (5 * doy + 2) / 153;
357    let day = doy - (153 * mp + 2) / 5 + 1;
358    let month = if mp < 10 { mp + 3 } else { mp - 9 };
359    let year = if month <= 2 { year + 1 } else { year };
360
361    // All values are in range for u32 for any timestamp in [1970, 2554].
362    #[allow(clippy::cast_possible_truncation)]
363    (
364        year as u32,
365        month as u32,
366        day as u32,
367        hour as u32,
368        min as u32,
369        sec as u32,
370    )
371}
372
373#[cfg(test)]
374mod tests {
375    use zeph_llm::provider::{Message, MessageMetadata, Role};
376
377    use super::*;
378
379    fn test_message(role: Role, content: &str) -> Message {
380        Message {
381            role,
382            content: content.to_owned(),
383            parts: vec![],
384            metadata: MessageMetadata::default(),
385        }
386    }
387
388    fn test_meta(agent_id: &str) -> TranscriptMeta {
389        TranscriptMeta {
390            agent_id: agent_id.to_owned(),
391            agent_name: "bot".to_owned(),
392            def_name: "bot".to_owned(),
393            status: SubAgentState::Completed,
394            started_at: "2026-01-01T00:00:00Z".to_owned(),
395            finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
396            resumed_from: None,
397            turns_used: 2,
398        }
399    }
400
401    #[test]
402    fn writer_reader_roundtrip() {
403        let dir = tempfile::tempdir().unwrap();
404        let path = dir.path().join("test.jsonl");
405
406        let msg1 = test_message(Role::User, "hello");
407        let msg2 = test_message(Role::Assistant, "world");
408
409        let mut writer = TranscriptWriter::new(&path).unwrap();
410        writer.append(0, &msg1).unwrap();
411        writer.append(1, &msg2).unwrap();
412        drop(writer);
413
414        let messages = TranscriptReader::load(&path).unwrap();
415        assert_eq!(messages.len(), 2);
416        assert_eq!(messages[0].content, "hello");
417        assert_eq!(messages[1].content, "world");
418    }
419
420    #[test]
421    fn load_missing_file_no_meta_returns_empty() {
422        let dir = tempfile::tempdir().unwrap();
423        let path = dir.path().join("ghost.jsonl");
424        let messages = TranscriptReader::load(&path).unwrap();
425        assert!(messages.is_empty());
426    }
427
428    #[test]
429    fn load_missing_file_with_meta_returns_error() {
430        let dir = tempfile::tempdir().unwrap();
431        let meta_path = dir.path().join("ghost.meta.json");
432        std::fs::write(&meta_path, "{}").unwrap();
433        let jsonl_path = dir.path().join("ghost.jsonl");
434        let err = TranscriptReader::load(&jsonl_path).unwrap_err();
435        assert!(matches!(err, SubAgentError::Transcript(_)));
436    }
437
438    #[test]
439    fn load_skips_malformed_lines() {
440        let dir = tempfile::tempdir().unwrap();
441        let path = dir.path().join("mixed.jsonl");
442
443        let good = test_message(Role::User, "good");
444        let entry = TranscriptEntry {
445            seq: 0,
446            timestamp: "2026-01-01T00:00:00Z".to_owned(),
447            message: good.clone(),
448        };
449        let good_line = serde_json::to_string(&entry).unwrap();
450        let content = format!("{good_line}\nnot valid json\n{good_line}\n");
451        std::fs::write(&path, &content).unwrap();
452
453        let messages = TranscriptReader::load(&path).unwrap();
454        assert_eq!(messages.len(), 2);
455    }
456
457    #[test]
458    fn meta_roundtrip() {
459        let dir = tempfile::tempdir().unwrap();
460        let meta = test_meta("abc-123");
461        TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
462        let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
463        assert_eq!(loaded.agent_id, "abc-123");
464        assert_eq!(loaded.turns_used, 2);
465    }
466
467    #[test]
468    fn meta_not_found_returns_not_found_error() {
469        let dir = tempfile::tempdir().unwrap();
470        let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
471        assert!(matches!(err, SubAgentError::NotFound(_)));
472    }
473
474    #[test]
475    fn find_by_prefix_exact() {
476        let dir = tempfile::tempdir().unwrap();
477        let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
478        TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
479            .unwrap();
480        let id =
481            TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
482                .unwrap();
483        assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
484    }
485
486    #[test]
487    fn find_by_prefix_short_prefix() {
488        let dir = tempfile::tempdir().unwrap();
489        let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
490        TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
491            .unwrap();
492        let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
493        assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
494    }
495
496    #[test]
497    fn find_by_prefix_not_found() {
498        let dir = tempfile::tempdir().unwrap();
499        let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
500        assert!(matches!(err, SubAgentError::NotFound(_)));
501    }
502
503    #[test]
504    fn find_by_prefix_ambiguous() {
505        let dir = tempfile::tempdir().unwrap();
506        TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
507        TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
508        let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
509        assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
510    }
511
512    #[test]
513    fn sweep_old_transcripts_removes_oldest() {
514        let dir = tempfile::tempdir().unwrap();
515
516        for i in 0..5u32 {
517            let path = dir.path().join(format!("file{i:02}.jsonl"));
518            std::fs::write(&path, b"").unwrap();
519            // Vary mtime by touching the file — not reliable without explicit mtime set,
520            // but tempdir files get sequential syscall timestamps in practice.
521            // We set the mtime explicitly via filetime crate... but we have no filetime dep.
522            // Instead we just verify count is correct.
523        }
524
525        let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
526        assert_eq!(deleted, 2);
527
528        let remaining: Vec<_> = std::fs::read_dir(dir.path())
529            .unwrap()
530            .filter_map(|e| e.ok())
531            .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
532            .collect();
533        assert_eq!(remaining.len(), 3);
534    }
535
536    #[test]
537    fn sweep_with_zero_max_does_nothing() {
538        let dir = tempfile::tempdir().unwrap();
539        std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
540        let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
541        assert_eq!(deleted, 0);
542    }
543
544    #[test]
545    fn sweep_below_max_does_nothing() {
546        let dir = tempfile::tempdir().unwrap();
547        std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
548        let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
549        assert_eq!(deleted, 0);
550    }
551
552    #[test]
553    fn utc_now_format() {
554        let ts = utc_now();
555        // Basic format check: 2026-03-05T00:18:16Z
556        assert_eq!(ts.len(), 20);
557        assert!(ts.ends_with('Z'));
558        assert!(ts.contains('T'));
559    }
560
561    #[test]
562    fn load_empty_file_returns_empty() {
563        let dir = tempfile::tempdir().unwrap();
564        let path = dir.path().join("empty.jsonl");
565        std::fs::write(&path, b"").unwrap();
566        let messages = TranscriptReader::load(&path).unwrap();
567        assert!(messages.is_empty());
568    }
569
570    #[test]
571    fn load_meta_invalid_json_returns_transcript_error() {
572        let dir = tempfile::tempdir().unwrap();
573        std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
574        let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
575        assert!(matches!(err, SubAgentError::Transcript(_)));
576    }
577
578    #[test]
579    fn sweep_removes_companion_meta() {
580        let dir = tempfile::tempdir().unwrap();
581        // Create 4 JSONL files each with a companion meta sidecar.
582        for i in 0..4u32 {
583            let stem = format!("file{i:02}");
584            std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
585            std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
586        }
587        let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
588        assert_eq!(deleted, 2);
589        // Companion metas for the two deleted files should also be gone.
590        let meta_count = std::fs::read_dir(dir.path())
591            .unwrap()
592            .filter_map(|e| e.ok())
593            .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
594            .count();
595        assert_eq!(
596            meta_count, 2,
597            "orphaned meta sidecars should have been removed"
598        );
599    }
600
601    #[test]
602    fn data_loss_guard_uses_stem_based_meta_path() {
603        // path.with_extension("meta.json") on "abc.jsonl" should yield "abc.meta.json"
604        // which matches write_meta's format!("{agent_id}.meta.json") when agent_id == stem.
605        let dir = tempfile::tempdir().unwrap();
606        let agent_id = "deadbeef-0000-0000-0000-000000000000";
607        // Write meta sidecar but not the JSONL file.
608        std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
609        let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
610        let err = TranscriptReader::load(&jsonl_path).unwrap_err();
611        assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
612    }
613}