1use std::fs::{self, File};
16use std::io::{self, BufRead, BufReader, Write as _};
17use std::path::{Path, PathBuf};
18
19use serde::{Deserialize, Serialize};
20use zeph_llm::provider::Message;
21
22use super::error::SubAgentError;
23use super::state::SubAgentState;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TranscriptEntry {
32 pub seq: u32,
34 pub timestamp: String,
36 pub message: Message,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TranscriptMeta {
46 pub agent_id: String,
48 pub agent_name: String,
50 pub def_name: String,
52 pub status: SubAgentState,
54 pub started_at: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub finished_at: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub resumed_from: Option<String>,
62 pub turns_used: u32,
64 #[serde(default)]
69 pub mcp_tool_names: Vec<String>,
70}
71
72pub struct TranscriptWriter {
87 file: File,
88}
89
90impl TranscriptWriter {
91 pub fn new(path: &Path) -> io::Result<Self> {
99 if let Some(parent) = path.parent() {
100 fs::create_dir_all(parent)?;
101 }
102 let file = zeph_common::fs_secure::append_private(path)?;
103 Ok(Self { file })
104 }
105
106 pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
112 let entry = TranscriptEntry {
113 seq,
114 timestamp: utc_now(),
115 message: message.clone(),
116 };
117 let line = serde_json::to_string(&entry)
118 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
119 self.file.write_all(line.as_bytes())?;
120 self.file.write_all(b"\n")?;
121 self.file.flush()
122 }
123
124 pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
130 let path = dir.join(format!("{agent_id}.meta.json"));
131 let content = serde_json::to_string_pretty(meta)
132 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
133 zeph_common::fs_secure::write_private(&path, content.as_bytes())
134 }
135}
136
137pub struct TranscriptReader;
144
145impl TranscriptReader {
146 pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
159 let file = match File::open(path) {
160 Ok(f) => f,
161 Err(e) if e.kind() == io::ErrorKind::NotFound => {
162 let meta_path =
166 if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
167 parent.join(format!("{}.meta.json", stem.to_string_lossy()))
168 } else {
169 path.with_extension("meta.json")
170 };
171 if meta_path.exists() {
172 return Err(SubAgentError::Transcript(format!(
173 "transcript file '{}' is missing but meta sidecar exists — \
174 transcript data may have been deleted",
175 path.display()
176 )));
177 }
178 return Ok(vec![]);
179 }
180 Err(e) => {
181 return Err(SubAgentError::Transcript(format!(
182 "failed to open transcript '{}': {e}",
183 path.display()
184 )));
185 }
186 };
187
188 let reader = BufReader::new(file);
189 let mut messages = Vec::new();
190 for (line_no, line_result) in reader.lines().enumerate() {
191 let line = match line_result {
192 Ok(l) => l,
193 Err(e) => {
194 tracing::warn!(
195 path = %path.display(),
196 line = line_no + 1,
197 error = %e,
198 "failed to read transcript line — skipping"
199 );
200 continue;
201 }
202 };
203 let trimmed = line.trim();
204 if trimmed.is_empty() {
205 continue;
206 }
207 match serde_json::from_str::<TranscriptEntry>(trimmed) {
208 Ok(entry) => messages.push(entry.message),
209 Err(e) => {
210 tracing::warn!(
211 path = %path.display(),
212 line = line_no + 1,
213 error = %e,
214 "malformed transcript entry — skipping"
215 );
216 }
217 }
218 }
219 Ok(messages)
220 }
221
222 pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
229 let path = dir.join(format!("{agent_id}.meta.json"));
230 let content = fs::read_to_string(&path).map_err(|e| {
231 if e.kind() == io::ErrorKind::NotFound {
232 SubAgentError::NotFound(agent_id.to_owned())
233 } else {
234 SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
235 }
236 })?;
237 serde_json::from_str(&content).map_err(|e| {
238 SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
239 })
240 }
241
242 pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
251 let entries = fs::read_dir(dir).map_err(|e| {
252 SubAgentError::Transcript(format!(
253 "failed to read transcript dir '{}': {e}",
254 dir.display()
255 ))
256 })?;
257
258 let mut matches: Vec<String> = Vec::new();
259 for entry in entries {
260 let entry = entry
261 .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
262 let name = entry.file_name();
263 let name_str = name.to_string_lossy();
264 if let Some(agent_id) = name_str.strip_suffix(".meta.json")
265 && agent_id.starts_with(prefix)
266 {
267 matches.push(agent_id.to_owned());
268 }
269 }
270
271 match matches.len() {
272 0 => Err(SubAgentError::NotFound(prefix.to_owned())),
273 1 => Ok(matches.remove(0)),
274 n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
275 }
276 }
277}
278
279pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
288 if max_files == 0 {
289 return Ok(0);
290 }
291
292 if !dir.exists() {
294 fs::create_dir_all(dir)?;
295 return Ok(0);
296 }
297
298 let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
299 for entry in fs::read_dir(dir)? {
300 let entry = entry?;
301 let path = entry.path();
302 if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
303 let mtime = entry
304 .metadata()
305 .and_then(|m| m.modified())
306 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
307 jsonl_files.push((path, mtime));
308 }
309 }
310
311 if jsonl_files.len() <= max_files {
312 return Ok(0);
313 }
314
315 jsonl_files.sort_by_key(|(_, mtime)| *mtime);
317
318 let to_delete = jsonl_files.len() - max_files;
319 let mut deleted = 0;
320 for (path, _) in jsonl_files.into_iter().take(to_delete) {
321 let meta = path.with_extension("meta.json");
323 if meta.exists() {
324 let _ = fs::remove_file(&meta);
325 }
326 fs::remove_file(&path)?;
327 deleted += 1;
328 }
329 Ok(deleted)
330}
331
332#[must_use]
348pub fn utc_now_pub() -> String {
349 utc_now()
350}
351
352fn utc_now() -> String {
353 let secs = std::time::SystemTime::now()
356 .duration_since(std::time::UNIX_EPOCH)
357 .unwrap_or_default()
358 .as_secs();
359 let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
360 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
361}
362
363fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
369 let sec = epoch % 60;
370 let epoch = epoch / 60;
371 let min = epoch % 60;
372 let epoch = epoch / 60;
373 let hour = epoch % 24;
374 let days = epoch / 24;
375
376 let z = days + 719_468;
378 let era = z / 146_097;
379 let doe = z - era * 146_097;
380 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
381 let year = yoe + era * 400;
382 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
383 let mp = (5 * doy + 2) / 153;
384 let day = doy - (153 * mp + 2) / 5 + 1;
385 let month = if mp < 10 { mp + 3 } else { mp - 9 };
386 let year = if month <= 2 { year + 1 } else { year };
387
388 #[allow(clippy::cast_possible_truncation)]
390 (
391 year as u32,
392 month as u32,
393 day as u32,
394 hour as u32,
395 min as u32,
396 sec as u32,
397 )
398}
399
400#[cfg(test)]
401mod tests {
402 use zeph_llm::provider::{Message, MessageMetadata, Role};
403
404 use super::*;
405
406 fn test_message(role: Role, content: &str) -> Message {
407 Message {
408 role,
409 content: content.to_owned(),
410 parts: vec![],
411 metadata: MessageMetadata::default(),
412 }
413 }
414
415 fn test_meta(agent_id: &str) -> TranscriptMeta {
416 TranscriptMeta {
417 agent_id: agent_id.to_owned(),
418 agent_name: "bot".to_owned(),
419 def_name: "bot".to_owned(),
420 status: SubAgentState::Completed,
421 started_at: "2026-01-01T00:00:00Z".to_owned(),
422 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
423 resumed_from: None,
424 turns_used: 2,
425 mcp_tool_names: Vec::new(),
426 }
427 }
428
429 #[test]
430 fn writer_reader_roundtrip() {
431 let dir = tempfile::tempdir().unwrap();
432 let path = dir.path().join("test.jsonl");
433
434 let msg1 = test_message(Role::User, "hello");
435 let msg2 = test_message(Role::Assistant, "world");
436
437 let mut writer = TranscriptWriter::new(&path).unwrap();
438 writer.append(0, &msg1).unwrap();
439 writer.append(1, &msg2).unwrap();
440 drop(writer);
441
442 let messages = TranscriptReader::load(&path).unwrap();
443 assert_eq!(messages.len(), 2);
444 assert_eq!(messages[0].content, "hello");
445 assert_eq!(messages[1].content, "world");
446 }
447
448 #[test]
449 fn load_missing_file_no_meta_returns_empty() {
450 let dir = tempfile::tempdir().unwrap();
451 let path = dir.path().join("ghost.jsonl");
452 let messages = TranscriptReader::load(&path).unwrap();
453 assert!(messages.is_empty());
454 }
455
456 #[test]
457 fn load_missing_file_with_meta_returns_error() {
458 let dir = tempfile::tempdir().unwrap();
459 let meta_path = dir.path().join("ghost.meta.json");
460 std::fs::write(&meta_path, "{}").unwrap();
461 let jsonl_path = dir.path().join("ghost.jsonl");
462 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
463 assert!(matches!(err, SubAgentError::Transcript(_)));
464 }
465
466 #[test]
467 fn load_skips_malformed_lines() {
468 let dir = tempfile::tempdir().unwrap();
469 let path = dir.path().join("mixed.jsonl");
470
471 let good = test_message(Role::User, "good");
472 let entry = TranscriptEntry {
473 seq: 0,
474 timestamp: "2026-01-01T00:00:00Z".to_owned(),
475 message: good.clone(),
476 };
477 let good_line = serde_json::to_string(&entry).unwrap();
478 let content = format!("{good_line}\nnot valid json\n{good_line}\n");
479 std::fs::write(&path, &content).unwrap();
480
481 let messages = TranscriptReader::load(&path).unwrap();
482 assert_eq!(messages.len(), 2);
483 }
484
485 #[test]
486 fn meta_roundtrip() {
487 let dir = tempfile::tempdir().unwrap();
488 let meta = test_meta("abc-123");
489 TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
490 let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
491 assert_eq!(loaded.agent_id, "abc-123");
492 assert_eq!(loaded.turns_used, 2);
493 }
494
495 #[test]
496 fn meta_not_found_returns_not_found_error() {
497 let dir = tempfile::tempdir().unwrap();
498 let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
499 assert!(matches!(err, SubAgentError::NotFound(_)));
500 }
501
502 #[test]
503 fn find_by_prefix_exact() {
504 let dir = tempfile::tempdir().unwrap();
505 let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
506 TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
507 .unwrap();
508 let id =
509 TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
510 .unwrap();
511 assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
512 }
513
514 #[test]
515 fn find_by_prefix_short_prefix() {
516 let dir = tempfile::tempdir().unwrap();
517 let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
518 TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
519 .unwrap();
520 let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
521 assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
522 }
523
524 #[test]
525 fn find_by_prefix_not_found() {
526 let dir = tempfile::tempdir().unwrap();
527 let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
528 assert!(matches!(err, SubAgentError::NotFound(_)));
529 }
530
531 #[test]
532 fn find_by_prefix_ambiguous() {
533 let dir = tempfile::tempdir().unwrap();
534 TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
535 TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
536 let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
537 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
538 }
539
540 #[test]
541 fn sweep_old_transcripts_removes_oldest() {
542 let dir = tempfile::tempdir().unwrap();
543
544 for i in 0..5u32 {
545 let path = dir.path().join(format!("file{i:02}.jsonl"));
546 std::fs::write(&path, b"").unwrap();
547 }
552
553 let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
554 assert_eq!(deleted, 2);
555
556 let remaining: Vec<_> = std::fs::read_dir(dir.path())
557 .unwrap()
558 .filter_map(std::result::Result::ok)
559 .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
560 .collect();
561 assert_eq!(remaining.len(), 3);
562 }
563
564 #[test]
565 fn sweep_with_zero_max_does_nothing() {
566 let dir = tempfile::tempdir().unwrap();
567 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
568 let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
569 assert_eq!(deleted, 0);
570 }
571
572 #[test]
573 fn sweep_below_max_does_nothing() {
574 let dir = tempfile::tempdir().unwrap();
575 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
576 let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
577 assert_eq!(deleted, 0);
578 }
579
580 #[test]
581 fn utc_now_format() {
582 let ts = utc_now();
583 assert_eq!(ts.len(), 20);
585 assert!(ts.ends_with('Z'));
586 assert!(ts.contains('T'));
587 }
588
589 #[test]
590 fn load_empty_file_returns_empty() {
591 let dir = tempfile::tempdir().unwrap();
592 let path = dir.path().join("empty.jsonl");
593 std::fs::write(&path, b"").unwrap();
594 let messages = TranscriptReader::load(&path).unwrap();
595 assert!(messages.is_empty());
596 }
597
598 #[test]
599 fn load_meta_invalid_json_returns_transcript_error() {
600 let dir = tempfile::tempdir().unwrap();
601 std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
602 let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
603 assert!(matches!(err, SubAgentError::Transcript(_)));
604 }
605
606 #[test]
607 fn sweep_removes_companion_meta() {
608 let dir = tempfile::tempdir().unwrap();
609 for i in 0..4u32 {
611 let stem = format!("file{i:02}");
612 std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
613 std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
614 }
615 let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
616 assert_eq!(deleted, 2);
617 let meta_count = std::fs::read_dir(dir.path())
619 .unwrap()
620 .filter_map(std::result::Result::ok)
621 .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
622 .count();
623 assert_eq!(
624 meta_count, 2,
625 "orphaned meta sidecars should have been removed"
626 );
627 }
628
629 #[test]
630 fn data_loss_guard_uses_stem_based_meta_path() {
631 let dir = tempfile::tempdir().unwrap();
634 let agent_id = "deadbeef-0000-0000-0000-000000000000";
635 std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
637 let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
638 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
639 assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
640 }
641
642 #[test]
643 fn meta_roundtrip_preserves_mcp_tool_names() {
644 let dir = tempfile::tempdir().unwrap();
645 let agent_id = "abc-123";
646 let mut meta = test_meta(agent_id);
647 meta.mcp_tool_names = vec!["search".into(), "write_file".into()];
648 TranscriptWriter::write_meta(dir.path(), agent_id, &meta).unwrap();
649 let loaded = TranscriptReader::load_meta(dir.path(), agent_id).unwrap();
650 assert_eq!(loaded.mcp_tool_names, vec!["search", "write_file"]);
651 }
652}