1use std::fs::{self, File, OpenOptions};
5use std::io::{self, BufRead, BufReader, Write as _};
6use std::path::{Path, PathBuf};
7
8use serde::{Deserialize, Serialize};
9use zeph_llm::provider::Message;
10
11use super::error::SubAgentError;
12use super::state::SubAgentState;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TranscriptEntry {
17 pub seq: u32,
18 pub timestamp: String,
20 pub message: Message,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TranscriptMeta {
26 pub agent_id: String,
27 pub agent_name: String,
28 pub def_name: String,
29 pub status: SubAgentState,
30 pub started_at: String,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub finished_at: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub resumed_from: Option<String>,
36 pub turns_used: u32,
37}
38
39pub struct TranscriptWriter {
44 file: File,
45}
46
47impl TranscriptWriter {
48 pub fn new(path: &Path) -> io::Result<Self> {
56 if let Some(parent) = path.parent() {
57 fs::create_dir_all(parent)?;
58 }
59 let file = open_private(path)?;
60 Ok(Self { file })
61 }
62
63 pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
69 let entry = TranscriptEntry {
70 seq,
71 timestamp: utc_now(),
72 message: message.clone(),
73 };
74 let line = serde_json::to_string(&entry)
75 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
76 self.file.write_all(line.as_bytes())?;
77 self.file.write_all(b"\n")?;
78 self.file.flush()
79 }
80
81 pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
87 let path = dir.join(format!("{agent_id}.meta.json"));
88 let content = serde_json::to_string_pretty(meta)
89 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
90 write_private(&path, content.as_bytes())
91 }
92}
93
94pub struct TranscriptReader;
96
97impl TranscriptReader {
98 pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
111 let file = match File::open(path) {
112 Ok(f) => f,
113 Err(e) if e.kind() == io::ErrorKind::NotFound => {
114 let meta_path =
118 if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
119 parent.join(format!("{}.meta.json", stem.to_string_lossy()))
120 } else {
121 path.with_extension("meta.json")
122 };
123 if meta_path.exists() {
124 return Err(SubAgentError::Transcript(format!(
125 "transcript file '{}' is missing but meta sidecar exists — \
126 transcript data may have been deleted",
127 path.display()
128 )));
129 }
130 return Ok(vec![]);
131 }
132 Err(e) => {
133 return Err(SubAgentError::Transcript(format!(
134 "failed to open transcript '{}': {e}",
135 path.display()
136 )));
137 }
138 };
139
140 let reader = BufReader::new(file);
141 let mut messages = Vec::new();
142 for (line_no, line_result) in reader.lines().enumerate() {
143 let line = match line_result {
144 Ok(l) => l,
145 Err(e) => {
146 tracing::warn!(
147 path = %path.display(),
148 line = line_no + 1,
149 error = %e,
150 "failed to read transcript line — skipping"
151 );
152 continue;
153 }
154 };
155 let trimmed = line.trim();
156 if trimmed.is_empty() {
157 continue;
158 }
159 match serde_json::from_str::<TranscriptEntry>(trimmed) {
160 Ok(entry) => messages.push(entry.message),
161 Err(e) => {
162 tracing::warn!(
163 path = %path.display(),
164 line = line_no + 1,
165 error = %e,
166 "malformed transcript entry — skipping"
167 );
168 }
169 }
170 }
171 Ok(messages)
172 }
173
174 pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
181 let path = dir.join(format!("{agent_id}.meta.json"));
182 let content = fs::read_to_string(&path).map_err(|e| {
183 if e.kind() == io::ErrorKind::NotFound {
184 SubAgentError::NotFound(agent_id.to_owned())
185 } else {
186 SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
187 }
188 })?;
189 serde_json::from_str(&content).map_err(|e| {
190 SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
191 })
192 }
193
194 pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
203 let entries = fs::read_dir(dir).map_err(|e| {
204 SubAgentError::Transcript(format!(
205 "failed to read transcript dir '{}': {e}",
206 dir.display()
207 ))
208 })?;
209
210 let mut matches: Vec<String> = Vec::new();
211 for entry in entries {
212 let entry = entry
213 .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
214 let name = entry.file_name();
215 let name_str = name.to_string_lossy();
216 if let Some(agent_id) = name_str.strip_suffix(".meta.json")
217 && agent_id.starts_with(prefix)
218 {
219 matches.push(agent_id.to_owned());
220 }
221 }
222
223 match matches.len() {
224 0 => Err(SubAgentError::NotFound(prefix.to_owned())),
225 1 => Ok(matches.remove(0)),
226 n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
227 }
228 }
229}
230
231pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
240 if max_files == 0 {
241 return Ok(0);
242 }
243
244 if !dir.exists() {
246 fs::create_dir_all(dir)?;
247 return Ok(0);
248 }
249
250 let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
251 for entry in fs::read_dir(dir)? {
252 let entry = entry?;
253 let path = entry.path();
254 if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
255 let mtime = entry
256 .metadata()
257 .and_then(|m| m.modified())
258 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
259 jsonl_files.push((path, mtime));
260 }
261 }
262
263 if jsonl_files.len() <= max_files {
264 return Ok(0);
265 }
266
267 jsonl_files.sort_by_key(|(_, mtime)| *mtime);
269
270 let to_delete = jsonl_files.len() - max_files;
271 let mut deleted = 0;
272 for (path, _) in jsonl_files.into_iter().take(to_delete) {
273 let meta = path.with_extension("meta.json");
275 if meta.exists() {
276 let _ = fs::remove_file(&meta);
277 }
278 fs::remove_file(&path)?;
279 deleted += 1;
280 }
281 Ok(deleted)
282}
283
284fn open_private(path: &Path) -> io::Result<File> {
288 #[cfg(unix)]
289 {
290 use std::os::unix::fs::OpenOptionsExt as _;
291 OpenOptions::new()
292 .create(true)
293 .append(true)
294 .mode(0o600)
295 .open(path)
296 }
297 #[cfg(not(unix))]
298 {
299 OpenOptions::new().create(true).append(true).open(path)
300 }
301}
302
303fn write_private(path: &Path, contents: &[u8]) -> io::Result<()> {
307 #[cfg(unix)]
308 {
309 use std::os::unix::fs::OpenOptionsExt as _;
310 let mut file = OpenOptions::new()
311 .create(true)
312 .write(true)
313 .truncate(true)
314 .mode(0o600)
315 .open(path)?;
316 file.write_all(contents)?;
317 file.flush()
318 }
319 #[cfg(not(unix))]
320 {
321 fs::write(path, contents)
322 }
323}
324
325#[must_use]
327pub fn utc_now_pub() -> String {
328 utc_now()
329}
330
331fn utc_now() -> String {
332 let secs = std::time::SystemTime::now()
335 .duration_since(std::time::UNIX_EPOCH)
336 .unwrap_or_default()
337 .as_secs();
338 let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
339 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
340}
341
342fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
348 let sec = epoch % 60;
349 let epoch = epoch / 60;
350 let min = epoch % 60;
351 let epoch = epoch / 60;
352 let hour = epoch % 24;
353 let days = epoch / 24;
354
355 let z = days + 719_468;
357 let era = z / 146_097;
358 let doe = z - era * 146_097;
359 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
360 let year = yoe + era * 400;
361 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
362 let mp = (5 * doy + 2) / 153;
363 let day = doy - (153 * mp + 2) / 5 + 1;
364 let month = if mp < 10 { mp + 3 } else { mp - 9 };
365 let year = if month <= 2 { year + 1 } else { year };
366
367 #[allow(clippy::cast_possible_truncation)]
369 (
370 year as u32,
371 month as u32,
372 day as u32,
373 hour as u32,
374 min as u32,
375 sec as u32,
376 )
377}
378
379#[cfg(test)]
380mod tests {
381 use zeph_llm::provider::{Message, MessageMetadata, Role};
382
383 use super::*;
384
385 fn test_message(role: Role, content: &str) -> Message {
386 Message {
387 role,
388 content: content.to_owned(),
389 parts: vec![],
390 metadata: MessageMetadata::default(),
391 }
392 }
393
394 fn test_meta(agent_id: &str) -> TranscriptMeta {
395 TranscriptMeta {
396 agent_id: agent_id.to_owned(),
397 agent_name: "bot".to_owned(),
398 def_name: "bot".to_owned(),
399 status: SubAgentState::Completed,
400 started_at: "2026-01-01T00:00:00Z".to_owned(),
401 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
402 resumed_from: None,
403 turns_used: 2,
404 }
405 }
406
407 #[test]
408 fn writer_reader_roundtrip() {
409 let dir = tempfile::tempdir().unwrap();
410 let path = dir.path().join("test.jsonl");
411
412 let msg1 = test_message(Role::User, "hello");
413 let msg2 = test_message(Role::Assistant, "world");
414
415 let mut writer = TranscriptWriter::new(&path).unwrap();
416 writer.append(0, &msg1).unwrap();
417 writer.append(1, &msg2).unwrap();
418 drop(writer);
419
420 let messages = TranscriptReader::load(&path).unwrap();
421 assert_eq!(messages.len(), 2);
422 assert_eq!(messages[0].content, "hello");
423 assert_eq!(messages[1].content, "world");
424 }
425
426 #[test]
427 fn load_missing_file_no_meta_returns_empty() {
428 let dir = tempfile::tempdir().unwrap();
429 let path = dir.path().join("ghost.jsonl");
430 let messages = TranscriptReader::load(&path).unwrap();
431 assert!(messages.is_empty());
432 }
433
434 #[test]
435 fn load_missing_file_with_meta_returns_error() {
436 let dir = tempfile::tempdir().unwrap();
437 let meta_path = dir.path().join("ghost.meta.json");
438 std::fs::write(&meta_path, "{}").unwrap();
439 let jsonl_path = dir.path().join("ghost.jsonl");
440 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
441 assert!(matches!(err, SubAgentError::Transcript(_)));
442 }
443
444 #[test]
445 fn load_skips_malformed_lines() {
446 let dir = tempfile::tempdir().unwrap();
447 let path = dir.path().join("mixed.jsonl");
448
449 let good = test_message(Role::User, "good");
450 let entry = TranscriptEntry {
451 seq: 0,
452 timestamp: "2026-01-01T00:00:00Z".to_owned(),
453 message: good.clone(),
454 };
455 let good_line = serde_json::to_string(&entry).unwrap();
456 let content = format!("{good_line}\nnot valid json\n{good_line}\n");
457 std::fs::write(&path, &content).unwrap();
458
459 let messages = TranscriptReader::load(&path).unwrap();
460 assert_eq!(messages.len(), 2);
461 }
462
463 #[test]
464 fn meta_roundtrip() {
465 let dir = tempfile::tempdir().unwrap();
466 let meta = test_meta("abc-123");
467 TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
468 let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
469 assert_eq!(loaded.agent_id, "abc-123");
470 assert_eq!(loaded.turns_used, 2);
471 }
472
473 #[test]
474 fn meta_not_found_returns_not_found_error() {
475 let dir = tempfile::tempdir().unwrap();
476 let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
477 assert!(matches!(err, SubAgentError::NotFound(_)));
478 }
479
480 #[test]
481 fn find_by_prefix_exact() {
482 let dir = tempfile::tempdir().unwrap();
483 let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
484 TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
485 .unwrap();
486 let id =
487 TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
488 .unwrap();
489 assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
490 }
491
492 #[test]
493 fn find_by_prefix_short_prefix() {
494 let dir = tempfile::tempdir().unwrap();
495 let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
496 TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
497 .unwrap();
498 let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
499 assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
500 }
501
502 #[test]
503 fn find_by_prefix_not_found() {
504 let dir = tempfile::tempdir().unwrap();
505 let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
506 assert!(matches!(err, SubAgentError::NotFound(_)));
507 }
508
509 #[test]
510 fn find_by_prefix_ambiguous() {
511 let dir = tempfile::tempdir().unwrap();
512 TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
513 TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
514 let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
515 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
516 }
517
518 #[test]
519 fn sweep_old_transcripts_removes_oldest() {
520 let dir = tempfile::tempdir().unwrap();
521
522 for i in 0..5u32 {
523 let path = dir.path().join(format!("file{i:02}.jsonl"));
524 std::fs::write(&path, b"").unwrap();
525 }
530
531 let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
532 assert_eq!(deleted, 2);
533
534 let remaining: Vec<_> = std::fs::read_dir(dir.path())
535 .unwrap()
536 .filter_map(std::result::Result::ok)
537 .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
538 .collect();
539 assert_eq!(remaining.len(), 3);
540 }
541
542 #[test]
543 fn sweep_with_zero_max_does_nothing() {
544 let dir = tempfile::tempdir().unwrap();
545 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
546 let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
547 assert_eq!(deleted, 0);
548 }
549
550 #[test]
551 fn sweep_below_max_does_nothing() {
552 let dir = tempfile::tempdir().unwrap();
553 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
554 let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
555 assert_eq!(deleted, 0);
556 }
557
558 #[test]
559 fn utc_now_format() {
560 let ts = utc_now();
561 assert_eq!(ts.len(), 20);
563 assert!(ts.ends_with('Z'));
564 assert!(ts.contains('T'));
565 }
566
567 #[test]
568 fn load_empty_file_returns_empty() {
569 let dir = tempfile::tempdir().unwrap();
570 let path = dir.path().join("empty.jsonl");
571 std::fs::write(&path, b"").unwrap();
572 let messages = TranscriptReader::load(&path).unwrap();
573 assert!(messages.is_empty());
574 }
575
576 #[test]
577 fn load_meta_invalid_json_returns_transcript_error() {
578 let dir = tempfile::tempdir().unwrap();
579 std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
580 let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
581 assert!(matches!(err, SubAgentError::Transcript(_)));
582 }
583
584 #[test]
585 fn sweep_removes_companion_meta() {
586 let dir = tempfile::tempdir().unwrap();
587 for i in 0..4u32 {
589 let stem = format!("file{i:02}");
590 std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
591 std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
592 }
593 let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
594 assert_eq!(deleted, 2);
595 let meta_count = std::fs::read_dir(dir.path())
597 .unwrap()
598 .filter_map(std::result::Result::ok)
599 .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
600 .count();
601 assert_eq!(
602 meta_count, 2,
603 "orphaned meta sidecars should have been removed"
604 );
605 }
606
607 #[test]
608 fn data_loss_guard_uses_stem_based_meta_path() {
609 let dir = tempfile::tempdir().unwrap();
612 let agent_id = "deadbeef-0000-0000-0000-000000000000";
613 std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
615 let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
616 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
617 assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
618 }
619}