1use std::fs::{self, File, OpenOptions};
16use std::io::{self, BufRead, BufReader, Write as _};
17use std::path::{Path, PathBuf};
18
19use serde::{Deserialize, Serialize};
20use zeph_llm::provider::Message;
21
22use super::error::SubAgentError;
23use super::state::SubAgentState;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TranscriptEntry {
32 pub seq: u32,
34 pub timestamp: String,
36 pub message: Message,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TranscriptMeta {
46 pub agent_id: String,
48 pub agent_name: String,
50 pub def_name: String,
52 pub status: SubAgentState,
54 pub started_at: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub finished_at: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub resumed_from: Option<String>,
62 pub turns_used: u32,
64}
65
66pub struct TranscriptWriter {
81 file: File,
82}
83
84impl TranscriptWriter {
85 pub fn new(path: &Path) -> io::Result<Self> {
93 if let Some(parent) = path.parent() {
94 fs::create_dir_all(parent)?;
95 }
96 let file = open_private(path)?;
97 Ok(Self { file })
98 }
99
100 pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
106 let entry = TranscriptEntry {
107 seq,
108 timestamp: utc_now(),
109 message: message.clone(),
110 };
111 let line = serde_json::to_string(&entry)
112 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
113 self.file.write_all(line.as_bytes())?;
114 self.file.write_all(b"\n")?;
115 self.file.flush()
116 }
117
118 pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
124 let path = dir.join(format!("{agent_id}.meta.json"));
125 let content = serde_json::to_string_pretty(meta)
126 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
127 write_private(&path, content.as_bytes())
128 }
129}
130
131pub struct TranscriptReader;
138
139impl TranscriptReader {
140 pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
153 let file = match File::open(path) {
154 Ok(f) => f,
155 Err(e) if e.kind() == io::ErrorKind::NotFound => {
156 let meta_path =
160 if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
161 parent.join(format!("{}.meta.json", stem.to_string_lossy()))
162 } else {
163 path.with_extension("meta.json")
164 };
165 if meta_path.exists() {
166 return Err(SubAgentError::Transcript(format!(
167 "transcript file '{}' is missing but meta sidecar exists — \
168 transcript data may have been deleted",
169 path.display()
170 )));
171 }
172 return Ok(vec![]);
173 }
174 Err(e) => {
175 return Err(SubAgentError::Transcript(format!(
176 "failed to open transcript '{}': {e}",
177 path.display()
178 )));
179 }
180 };
181
182 let reader = BufReader::new(file);
183 let mut messages = Vec::new();
184 for (line_no, line_result) in reader.lines().enumerate() {
185 let line = match line_result {
186 Ok(l) => l,
187 Err(e) => {
188 tracing::warn!(
189 path = %path.display(),
190 line = line_no + 1,
191 error = %e,
192 "failed to read transcript line — skipping"
193 );
194 continue;
195 }
196 };
197 let trimmed = line.trim();
198 if trimmed.is_empty() {
199 continue;
200 }
201 match serde_json::from_str::<TranscriptEntry>(trimmed) {
202 Ok(entry) => messages.push(entry.message),
203 Err(e) => {
204 tracing::warn!(
205 path = %path.display(),
206 line = line_no + 1,
207 error = %e,
208 "malformed transcript entry — skipping"
209 );
210 }
211 }
212 }
213 Ok(messages)
214 }
215
216 pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
223 let path = dir.join(format!("{agent_id}.meta.json"));
224 let content = fs::read_to_string(&path).map_err(|e| {
225 if e.kind() == io::ErrorKind::NotFound {
226 SubAgentError::NotFound(agent_id.to_owned())
227 } else {
228 SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
229 }
230 })?;
231 serde_json::from_str(&content).map_err(|e| {
232 SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
233 })
234 }
235
236 pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
245 let entries = fs::read_dir(dir).map_err(|e| {
246 SubAgentError::Transcript(format!(
247 "failed to read transcript dir '{}': {e}",
248 dir.display()
249 ))
250 })?;
251
252 let mut matches: Vec<String> = Vec::new();
253 for entry in entries {
254 let entry = entry
255 .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
256 let name = entry.file_name();
257 let name_str = name.to_string_lossy();
258 if let Some(agent_id) = name_str.strip_suffix(".meta.json")
259 && agent_id.starts_with(prefix)
260 {
261 matches.push(agent_id.to_owned());
262 }
263 }
264
265 match matches.len() {
266 0 => Err(SubAgentError::NotFound(prefix.to_owned())),
267 1 => Ok(matches.remove(0)),
268 n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
269 }
270 }
271}
272
273pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
282 if max_files == 0 {
283 return Ok(0);
284 }
285
286 if !dir.exists() {
288 fs::create_dir_all(dir)?;
289 return Ok(0);
290 }
291
292 let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
293 for entry in fs::read_dir(dir)? {
294 let entry = entry?;
295 let path = entry.path();
296 if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
297 let mtime = entry
298 .metadata()
299 .and_then(|m| m.modified())
300 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
301 jsonl_files.push((path, mtime));
302 }
303 }
304
305 if jsonl_files.len() <= max_files {
306 return Ok(0);
307 }
308
309 jsonl_files.sort_by_key(|(_, mtime)| *mtime);
311
312 let to_delete = jsonl_files.len() - max_files;
313 let mut deleted = 0;
314 for (path, _) in jsonl_files.into_iter().take(to_delete) {
315 let meta = path.with_extension("meta.json");
317 if meta.exists() {
318 let _ = fs::remove_file(&meta);
319 }
320 fs::remove_file(&path)?;
321 deleted += 1;
322 }
323 Ok(deleted)
324}
325
326fn open_private(path: &Path) -> io::Result<File> {
330 #[cfg(unix)]
331 {
332 use std::os::unix::fs::OpenOptionsExt as _;
333 OpenOptions::new()
334 .create(true)
335 .append(true)
336 .mode(0o600)
337 .open(path)
338 }
339 #[cfg(not(unix))]
340 {
341 OpenOptions::new().create(true).append(true).open(path)
342 }
343}
344
345fn write_private(path: &Path, contents: &[u8]) -> io::Result<()> {
349 #[cfg(unix)]
350 {
351 use std::os::unix::fs::OpenOptionsExt as _;
352 let mut file = OpenOptions::new()
353 .create(true)
354 .write(true)
355 .truncate(true)
356 .mode(0o600)
357 .open(path)?;
358 file.write_all(contents)?;
359 file.flush()
360 }
361 #[cfg(not(unix))]
362 {
363 fs::write(path, contents)
364 }
365}
366
367#[must_use]
383pub fn utc_now_pub() -> String {
384 utc_now()
385}
386
387fn utc_now() -> String {
388 let secs = std::time::SystemTime::now()
391 .duration_since(std::time::UNIX_EPOCH)
392 .unwrap_or_default()
393 .as_secs();
394 let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
395 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
396}
397
398fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
404 let sec = epoch % 60;
405 let epoch = epoch / 60;
406 let min = epoch % 60;
407 let epoch = epoch / 60;
408 let hour = epoch % 24;
409 let days = epoch / 24;
410
411 let z = days + 719_468;
413 let era = z / 146_097;
414 let doe = z - era * 146_097;
415 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
416 let year = yoe + era * 400;
417 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
418 let mp = (5 * doy + 2) / 153;
419 let day = doy - (153 * mp + 2) / 5 + 1;
420 let month = if mp < 10 { mp + 3 } else { mp - 9 };
421 let year = if month <= 2 { year + 1 } else { year };
422
423 #[allow(clippy::cast_possible_truncation)]
425 (
426 year as u32,
427 month as u32,
428 day as u32,
429 hour as u32,
430 min as u32,
431 sec as u32,
432 )
433}
434
435#[cfg(test)]
436mod tests {
437 use zeph_llm::provider::{Message, MessageMetadata, Role};
438
439 use super::*;
440
441 fn test_message(role: Role, content: &str) -> Message {
442 Message {
443 role,
444 content: content.to_owned(),
445 parts: vec![],
446 metadata: MessageMetadata::default(),
447 }
448 }
449
450 fn test_meta(agent_id: &str) -> TranscriptMeta {
451 TranscriptMeta {
452 agent_id: agent_id.to_owned(),
453 agent_name: "bot".to_owned(),
454 def_name: "bot".to_owned(),
455 status: SubAgentState::Completed,
456 started_at: "2026-01-01T00:00:00Z".to_owned(),
457 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
458 resumed_from: None,
459 turns_used: 2,
460 }
461 }
462
463 #[test]
464 fn writer_reader_roundtrip() {
465 let dir = tempfile::tempdir().unwrap();
466 let path = dir.path().join("test.jsonl");
467
468 let msg1 = test_message(Role::User, "hello");
469 let msg2 = test_message(Role::Assistant, "world");
470
471 let mut writer = TranscriptWriter::new(&path).unwrap();
472 writer.append(0, &msg1).unwrap();
473 writer.append(1, &msg2).unwrap();
474 drop(writer);
475
476 let messages = TranscriptReader::load(&path).unwrap();
477 assert_eq!(messages.len(), 2);
478 assert_eq!(messages[0].content, "hello");
479 assert_eq!(messages[1].content, "world");
480 }
481
482 #[test]
483 fn load_missing_file_no_meta_returns_empty() {
484 let dir = tempfile::tempdir().unwrap();
485 let path = dir.path().join("ghost.jsonl");
486 let messages = TranscriptReader::load(&path).unwrap();
487 assert!(messages.is_empty());
488 }
489
490 #[test]
491 fn load_missing_file_with_meta_returns_error() {
492 let dir = tempfile::tempdir().unwrap();
493 let meta_path = dir.path().join("ghost.meta.json");
494 std::fs::write(&meta_path, "{}").unwrap();
495 let jsonl_path = dir.path().join("ghost.jsonl");
496 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
497 assert!(matches!(err, SubAgentError::Transcript(_)));
498 }
499
500 #[test]
501 fn load_skips_malformed_lines() {
502 let dir = tempfile::tempdir().unwrap();
503 let path = dir.path().join("mixed.jsonl");
504
505 let good = test_message(Role::User, "good");
506 let entry = TranscriptEntry {
507 seq: 0,
508 timestamp: "2026-01-01T00:00:00Z".to_owned(),
509 message: good.clone(),
510 };
511 let good_line = serde_json::to_string(&entry).unwrap();
512 let content = format!("{good_line}\nnot valid json\n{good_line}\n");
513 std::fs::write(&path, &content).unwrap();
514
515 let messages = TranscriptReader::load(&path).unwrap();
516 assert_eq!(messages.len(), 2);
517 }
518
519 #[test]
520 fn meta_roundtrip() {
521 let dir = tempfile::tempdir().unwrap();
522 let meta = test_meta("abc-123");
523 TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
524 let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
525 assert_eq!(loaded.agent_id, "abc-123");
526 assert_eq!(loaded.turns_used, 2);
527 }
528
529 #[test]
530 fn meta_not_found_returns_not_found_error() {
531 let dir = tempfile::tempdir().unwrap();
532 let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
533 assert!(matches!(err, SubAgentError::NotFound(_)));
534 }
535
536 #[test]
537 fn find_by_prefix_exact() {
538 let dir = tempfile::tempdir().unwrap();
539 let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
540 TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
541 .unwrap();
542 let id =
543 TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
544 .unwrap();
545 assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
546 }
547
548 #[test]
549 fn find_by_prefix_short_prefix() {
550 let dir = tempfile::tempdir().unwrap();
551 let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
552 TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
553 .unwrap();
554 let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
555 assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
556 }
557
558 #[test]
559 fn find_by_prefix_not_found() {
560 let dir = tempfile::tempdir().unwrap();
561 let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
562 assert!(matches!(err, SubAgentError::NotFound(_)));
563 }
564
565 #[test]
566 fn find_by_prefix_ambiguous() {
567 let dir = tempfile::tempdir().unwrap();
568 TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
569 TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
570 let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
571 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
572 }
573
574 #[test]
575 fn sweep_old_transcripts_removes_oldest() {
576 let dir = tempfile::tempdir().unwrap();
577
578 for i in 0..5u32 {
579 let path = dir.path().join(format!("file{i:02}.jsonl"));
580 std::fs::write(&path, b"").unwrap();
581 }
586
587 let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
588 assert_eq!(deleted, 2);
589
590 let remaining: Vec<_> = std::fs::read_dir(dir.path())
591 .unwrap()
592 .filter_map(std::result::Result::ok)
593 .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
594 .collect();
595 assert_eq!(remaining.len(), 3);
596 }
597
598 #[test]
599 fn sweep_with_zero_max_does_nothing() {
600 let dir = tempfile::tempdir().unwrap();
601 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
602 let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
603 assert_eq!(deleted, 0);
604 }
605
606 #[test]
607 fn sweep_below_max_does_nothing() {
608 let dir = tempfile::tempdir().unwrap();
609 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
610 let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
611 assert_eq!(deleted, 0);
612 }
613
614 #[test]
615 fn utc_now_format() {
616 let ts = utc_now();
617 assert_eq!(ts.len(), 20);
619 assert!(ts.ends_with('Z'));
620 assert!(ts.contains('T'));
621 }
622
623 #[test]
624 fn load_empty_file_returns_empty() {
625 let dir = tempfile::tempdir().unwrap();
626 let path = dir.path().join("empty.jsonl");
627 std::fs::write(&path, b"").unwrap();
628 let messages = TranscriptReader::load(&path).unwrap();
629 assert!(messages.is_empty());
630 }
631
632 #[test]
633 fn load_meta_invalid_json_returns_transcript_error() {
634 let dir = tempfile::tempdir().unwrap();
635 std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
636 let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
637 assert!(matches!(err, SubAgentError::Transcript(_)));
638 }
639
640 #[test]
641 fn sweep_removes_companion_meta() {
642 let dir = tempfile::tempdir().unwrap();
643 for i in 0..4u32 {
645 let stem = format!("file{i:02}");
646 std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
647 std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
648 }
649 let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
650 assert_eq!(deleted, 2);
651 let meta_count = std::fs::read_dir(dir.path())
653 .unwrap()
654 .filter_map(std::result::Result::ok)
655 .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
656 .count();
657 assert_eq!(
658 meta_count, 2,
659 "orphaned meta sidecars should have been removed"
660 );
661 }
662
663 #[test]
664 fn data_loss_guard_uses_stem_based_meta_path() {
665 let dir = tempfile::tempdir().unwrap();
668 let agent_id = "deadbeef-0000-0000-0000-000000000000";
669 std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
671 let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
672 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
673 assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
674 }
675}