1use std::fs::{self, File, OpenOptions};
5use std::io::{self, BufRead, BufReader, Write as _};
6use std::path::{Path, PathBuf};
7
8use serde::{Deserialize, Serialize};
9use zeph_llm::provider::Message;
10
11use super::error::SubAgentError;
12use super::state::SubAgentState;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TranscriptEntry {
17 pub seq: u32,
18 pub timestamp: String,
20 pub message: Message,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TranscriptMeta {
26 pub agent_id: String,
27 pub agent_name: String,
28 pub def_name: String,
29 pub status: SubAgentState,
30 pub started_at: String,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub finished_at: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub resumed_from: Option<String>,
36 pub turns_used: u32,
37}
38
39pub struct TranscriptWriter {
44 file: File,
45}
46
47impl TranscriptWriter {
48 pub fn new(path: &Path) -> io::Result<Self> {
56 if let Some(parent) = path.parent() {
57 fs::create_dir_all(parent)?;
58 }
59 let file = open_private(path)?;
60 Ok(Self { file })
61 }
62
63 pub fn append(&mut self, seq: u32, message: &Message) -> io::Result<()> {
69 let entry = TranscriptEntry {
70 seq,
71 timestamp: utc_now(),
72 message: message.clone(),
73 };
74 let line = serde_json::to_string(&entry)
75 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
76 self.file.write_all(line.as_bytes())?;
77 self.file.write_all(b"\n")?;
78 self.file.flush()
79 }
80
81 pub fn write_meta(dir: &Path, agent_id: &str, meta: &TranscriptMeta) -> io::Result<()> {
87 let path = dir.join(format!("{agent_id}.meta.json"));
88 let content = serde_json::to_string_pretty(meta)
89 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
90 write_private(&path, content.as_bytes())
91 }
92}
93
94pub struct TranscriptReader;
96
97impl TranscriptReader {
98 pub fn load(path: &Path) -> Result<Vec<Message>, SubAgentError> {
111 let file = match File::open(path) {
112 Ok(f) => f,
113 Err(e) if e.kind() == io::ErrorKind::NotFound => {
114 let meta_path =
118 if let (Some(parent), Some(stem)) = (path.parent(), path.file_stem()) {
119 parent.join(format!("{}.meta.json", stem.to_string_lossy()))
120 } else {
121 path.with_extension("meta.json")
122 };
123 if meta_path.exists() {
124 return Err(SubAgentError::Transcript(format!(
125 "transcript file '{}' is missing but meta sidecar exists — \
126 transcript data may have been deleted",
127 path.display()
128 )));
129 }
130 return Ok(vec![]);
131 }
132 Err(e) => {
133 return Err(SubAgentError::Transcript(format!(
134 "failed to open transcript '{}': {e}",
135 path.display()
136 )));
137 }
138 };
139
140 let reader = BufReader::new(file);
141 let mut messages = Vec::new();
142 for (line_no, line_result) in reader.lines().enumerate() {
143 let line = match line_result {
144 Ok(l) => l,
145 Err(e) => {
146 tracing::warn!(
147 path = %path.display(),
148 line = line_no + 1,
149 error = %e,
150 "failed to read transcript line — skipping"
151 );
152 continue;
153 }
154 };
155 let trimmed = line.trim();
156 if trimmed.is_empty() {
157 continue;
158 }
159 match serde_json::from_str::<TranscriptEntry>(trimmed) {
160 Ok(entry) => messages.push(entry.message),
161 Err(e) => {
162 tracing::warn!(
163 path = %path.display(),
164 line = line_no + 1,
165 error = %e,
166 "malformed transcript entry — skipping"
167 );
168 }
169 }
170 }
171 Ok(messages)
172 }
173
174 pub fn load_meta(dir: &Path, agent_id: &str) -> Result<TranscriptMeta, SubAgentError> {
181 let path = dir.join(format!("{agent_id}.meta.json"));
182 let content = fs::read_to_string(&path).map_err(|e| {
183 if e.kind() == io::ErrorKind::NotFound {
184 SubAgentError::NotFound(agent_id.to_owned())
185 } else {
186 SubAgentError::Transcript(format!("failed to read meta '{}': {e}", path.display()))
187 }
188 })?;
189 serde_json::from_str(&content).map_err(|e| {
190 SubAgentError::Transcript(format!("failed to parse meta '{}': {e}", path.display()))
191 })
192 }
193
194 pub fn find_by_prefix(dir: &Path, prefix: &str) -> Result<String, SubAgentError> {
203 let entries = fs::read_dir(dir).map_err(|e| {
204 SubAgentError::Transcript(format!(
205 "failed to read transcript dir '{}': {e}",
206 dir.display()
207 ))
208 })?;
209
210 let mut matches: Vec<String> = Vec::new();
211 for entry in entries {
212 let entry = entry
213 .map_err(|e| SubAgentError::Transcript(format!("failed to read dir entry: {e}")))?;
214 let name = entry.file_name();
215 let name_str = name.to_string_lossy();
216 if let Some(agent_id) = name_str.strip_suffix(".meta.json")
217 && agent_id.starts_with(prefix)
218 {
219 matches.push(agent_id.to_owned());
220 }
221 }
222
223 match matches.len() {
224 0 => Err(SubAgentError::NotFound(prefix.to_owned())),
225 1 => Ok(matches.remove(0)),
226 n => Err(SubAgentError::AmbiguousId(prefix.to_owned(), n)),
227 }
228 }
229}
230
231pub fn sweep_old_transcripts(dir: &Path, max_files: usize) -> io::Result<usize> {
240 if max_files == 0 {
241 return Ok(0);
242 }
243
244 let mut jsonl_files: Vec<(PathBuf, std::time::SystemTime)> = Vec::new();
245 for entry in fs::read_dir(dir)? {
246 let entry = entry?;
247 let path = entry.path();
248 if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
249 let mtime = entry
250 .metadata()
251 .and_then(|m| m.modified())
252 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
253 jsonl_files.push((path, mtime));
254 }
255 }
256
257 if jsonl_files.len() <= max_files {
258 return Ok(0);
259 }
260
261 jsonl_files.sort_by_key(|(_, mtime)| *mtime);
263
264 let to_delete = jsonl_files.len() - max_files;
265 let mut deleted = 0;
266 for (path, _) in jsonl_files.into_iter().take(to_delete) {
267 let meta = path.with_extension("meta.json");
269 if meta.exists() {
270 let _ = fs::remove_file(&meta);
271 }
272 fs::remove_file(&path)?;
273 deleted += 1;
274 }
275 Ok(deleted)
276}
277
278fn open_private(path: &Path) -> io::Result<File> {
282 #[cfg(unix)]
283 {
284 use std::os::unix::fs::OpenOptionsExt as _;
285 OpenOptions::new()
286 .create(true)
287 .append(true)
288 .mode(0o600)
289 .open(path)
290 }
291 #[cfg(not(unix))]
292 {
293 OpenOptions::new().create(true).append(true).open(path)
294 }
295}
296
297fn write_private(path: &Path, contents: &[u8]) -> io::Result<()> {
301 #[cfg(unix)]
302 {
303 use std::os::unix::fs::OpenOptionsExt as _;
304 let mut file = OpenOptions::new()
305 .create(true)
306 .write(true)
307 .truncate(true)
308 .mode(0o600)
309 .open(path)?;
310 file.write_all(contents)?;
311 file.flush()
312 }
313 #[cfg(not(unix))]
314 {
315 fs::write(path, contents)
316 }
317}
318
319#[must_use]
321pub fn utc_now_pub() -> String {
322 utc_now()
323}
324
325fn utc_now() -> String {
326 let secs = std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .unwrap_or_default()
331 .as_secs();
332 let (y, mo, d, h, mi, s) = epoch_to_parts(secs);
333 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}Z")
334}
335
336fn epoch_to_parts(epoch: u64) -> (u32, u32, u32, u32, u32, u32) {
342 let sec = epoch % 60;
343 let epoch = epoch / 60;
344 let min = epoch % 60;
345 let epoch = epoch / 60;
346 let hour = epoch % 24;
347 let days = epoch / 24;
348
349 let z = days + 719_468;
351 let era = z / 146_097;
352 let doe = z - era * 146_097;
353 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
354 let year = yoe + era * 400;
355 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
356 let mp = (5 * doy + 2) / 153;
357 let day = doy - (153 * mp + 2) / 5 + 1;
358 let month = if mp < 10 { mp + 3 } else { mp - 9 };
359 let year = if month <= 2 { year + 1 } else { year };
360
361 #[allow(clippy::cast_possible_truncation)]
363 (
364 year as u32,
365 month as u32,
366 day as u32,
367 hour as u32,
368 min as u32,
369 sec as u32,
370 )
371}
372
373#[cfg(test)]
374mod tests {
375 use zeph_llm::provider::{Message, MessageMetadata, Role};
376
377 use super::*;
378
379 fn test_message(role: Role, content: &str) -> Message {
380 Message {
381 role,
382 content: content.to_owned(),
383 parts: vec![],
384 metadata: MessageMetadata::default(),
385 }
386 }
387
388 fn test_meta(agent_id: &str) -> TranscriptMeta {
389 TranscriptMeta {
390 agent_id: agent_id.to_owned(),
391 agent_name: "bot".to_owned(),
392 def_name: "bot".to_owned(),
393 status: SubAgentState::Completed,
394 started_at: "2026-01-01T00:00:00Z".to_owned(),
395 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
396 resumed_from: None,
397 turns_used: 2,
398 }
399 }
400
401 #[test]
402 fn writer_reader_roundtrip() {
403 let dir = tempfile::tempdir().unwrap();
404 let path = dir.path().join("test.jsonl");
405
406 let msg1 = test_message(Role::User, "hello");
407 let msg2 = test_message(Role::Assistant, "world");
408
409 let mut writer = TranscriptWriter::new(&path).unwrap();
410 writer.append(0, &msg1).unwrap();
411 writer.append(1, &msg2).unwrap();
412 drop(writer);
413
414 let messages = TranscriptReader::load(&path).unwrap();
415 assert_eq!(messages.len(), 2);
416 assert_eq!(messages[0].content, "hello");
417 assert_eq!(messages[1].content, "world");
418 }
419
420 #[test]
421 fn load_missing_file_no_meta_returns_empty() {
422 let dir = tempfile::tempdir().unwrap();
423 let path = dir.path().join("ghost.jsonl");
424 let messages = TranscriptReader::load(&path).unwrap();
425 assert!(messages.is_empty());
426 }
427
428 #[test]
429 fn load_missing_file_with_meta_returns_error() {
430 let dir = tempfile::tempdir().unwrap();
431 let meta_path = dir.path().join("ghost.meta.json");
432 std::fs::write(&meta_path, "{}").unwrap();
433 let jsonl_path = dir.path().join("ghost.jsonl");
434 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
435 assert!(matches!(err, SubAgentError::Transcript(_)));
436 }
437
438 #[test]
439 fn load_skips_malformed_lines() {
440 let dir = tempfile::tempdir().unwrap();
441 let path = dir.path().join("mixed.jsonl");
442
443 let good = test_message(Role::User, "good");
444 let entry = TranscriptEntry {
445 seq: 0,
446 timestamp: "2026-01-01T00:00:00Z".to_owned(),
447 message: good.clone(),
448 };
449 let good_line = serde_json::to_string(&entry).unwrap();
450 let content = format!("{good_line}\nnot valid json\n{good_line}\n");
451 std::fs::write(&path, &content).unwrap();
452
453 let messages = TranscriptReader::load(&path).unwrap();
454 assert_eq!(messages.len(), 2);
455 }
456
457 #[test]
458 fn meta_roundtrip() {
459 let dir = tempfile::tempdir().unwrap();
460 let meta = test_meta("abc-123");
461 TranscriptWriter::write_meta(dir.path(), "abc-123", &meta).unwrap();
462 let loaded = TranscriptReader::load_meta(dir.path(), "abc-123").unwrap();
463 assert_eq!(loaded.agent_id, "abc-123");
464 assert_eq!(loaded.turns_used, 2);
465 }
466
467 #[test]
468 fn meta_not_found_returns_not_found_error() {
469 let dir = tempfile::tempdir().unwrap();
470 let err = TranscriptReader::load_meta(dir.path(), "ghost").unwrap_err();
471 assert!(matches!(err, SubAgentError::NotFound(_)));
472 }
473
474 #[test]
475 fn find_by_prefix_exact() {
476 let dir = tempfile::tempdir().unwrap();
477 let meta = test_meta("abcdef01-0000-0000-0000-000000000000");
478 TranscriptWriter::write_meta(dir.path(), "abcdef01-0000-0000-0000-000000000000", &meta)
479 .unwrap();
480 let id =
481 TranscriptReader::find_by_prefix(dir.path(), "abcdef01-0000-0000-0000-000000000000")
482 .unwrap();
483 assert_eq!(id, "abcdef01-0000-0000-0000-000000000000");
484 }
485
486 #[test]
487 fn find_by_prefix_short_prefix() {
488 let dir = tempfile::tempdir().unwrap();
489 let meta = test_meta("deadbeef-0000-0000-0000-000000000000");
490 TranscriptWriter::write_meta(dir.path(), "deadbeef-0000-0000-0000-000000000000", &meta)
491 .unwrap();
492 let id = TranscriptReader::find_by_prefix(dir.path(), "deadbeef").unwrap();
493 assert_eq!(id, "deadbeef-0000-0000-0000-000000000000");
494 }
495
496 #[test]
497 fn find_by_prefix_not_found() {
498 let dir = tempfile::tempdir().unwrap();
499 let err = TranscriptReader::find_by_prefix(dir.path(), "xxxxxxxx").unwrap_err();
500 assert!(matches!(err, SubAgentError::NotFound(_)));
501 }
502
503 #[test]
504 fn find_by_prefix_ambiguous() {
505 let dir = tempfile::tempdir().unwrap();
506 TranscriptWriter::write_meta(dir.path(), "aabb0001-x", &test_meta("aabb0001-x")).unwrap();
507 TranscriptWriter::write_meta(dir.path(), "aabb0002-y", &test_meta("aabb0002-y")).unwrap();
508 let err = TranscriptReader::find_by_prefix(dir.path(), "aabb").unwrap_err();
509 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
510 }
511
512 #[test]
513 fn sweep_old_transcripts_removes_oldest() {
514 let dir = tempfile::tempdir().unwrap();
515
516 for i in 0..5u32 {
517 let path = dir.path().join(format!("file{i:02}.jsonl"));
518 std::fs::write(&path, b"").unwrap();
519 }
524
525 let deleted = sweep_old_transcripts(dir.path(), 3).unwrap();
526 assert_eq!(deleted, 2);
527
528 let remaining: Vec<_> = std::fs::read_dir(dir.path())
529 .unwrap()
530 .filter_map(|e| e.ok())
531 .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("jsonl"))
532 .collect();
533 assert_eq!(remaining.len(), 3);
534 }
535
536 #[test]
537 fn sweep_with_zero_max_does_nothing() {
538 let dir = tempfile::tempdir().unwrap();
539 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
540 let deleted = sweep_old_transcripts(dir.path(), 0).unwrap();
541 assert_eq!(deleted, 0);
542 }
543
544 #[test]
545 fn sweep_below_max_does_nothing() {
546 let dir = tempfile::tempdir().unwrap();
547 std::fs::write(dir.path().join("a.jsonl"), b"").unwrap();
548 let deleted = sweep_old_transcripts(dir.path(), 50).unwrap();
549 assert_eq!(deleted, 0);
550 }
551
552 #[test]
553 fn utc_now_format() {
554 let ts = utc_now();
555 assert_eq!(ts.len(), 20);
557 assert!(ts.ends_with('Z'));
558 assert!(ts.contains('T'));
559 }
560
561 #[test]
562 fn load_empty_file_returns_empty() {
563 let dir = tempfile::tempdir().unwrap();
564 let path = dir.path().join("empty.jsonl");
565 std::fs::write(&path, b"").unwrap();
566 let messages = TranscriptReader::load(&path).unwrap();
567 assert!(messages.is_empty());
568 }
569
570 #[test]
571 fn load_meta_invalid_json_returns_transcript_error() {
572 let dir = tempfile::tempdir().unwrap();
573 std::fs::write(dir.path().join("bad.meta.json"), b"not json at all {{{{").unwrap();
574 let err = TranscriptReader::load_meta(dir.path(), "bad").unwrap_err();
575 assert!(matches!(err, SubAgentError::Transcript(_)));
576 }
577
578 #[test]
579 fn sweep_removes_companion_meta() {
580 let dir = tempfile::tempdir().unwrap();
581 for i in 0..4u32 {
583 let stem = format!("file{i:02}");
584 std::fs::write(dir.path().join(format!("{stem}.jsonl")), b"").unwrap();
585 std::fs::write(dir.path().join(format!("{stem}.meta.json")), b"{}").unwrap();
586 }
587 let deleted = sweep_old_transcripts(dir.path(), 2).unwrap();
588 assert_eq!(deleted, 2);
589 let meta_count = std::fs::read_dir(dir.path())
591 .unwrap()
592 .filter_map(|e| e.ok())
593 .filter(|e| e.path().to_string_lossy().ends_with(".meta.json"))
594 .count();
595 assert_eq!(
596 meta_count, 2,
597 "orphaned meta sidecars should have been removed"
598 );
599 }
600
601 #[test]
602 fn data_loss_guard_uses_stem_based_meta_path() {
603 let dir = tempfile::tempdir().unwrap();
606 let agent_id = "deadbeef-0000-0000-0000-000000000000";
607 std::fs::write(dir.path().join(format!("{agent_id}.meta.json")), b"{}").unwrap();
609 let jsonl_path = dir.path().join(format!("{agent_id}.jsonl"));
610 let err = TranscriptReader::load(&jsonl_path).unwrap_err();
611 assert!(matches!(err, SubAgentError::Transcript(ref m) if m.contains("missing")));
612 }
613}