1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18 Full,
19 Partial,
20 MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26 Wrapper,
27 ProviderFile,
28 ProviderLog,
29 Stdout,
30 Stderr,
31 Backfill,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42 Shell,
44 FileRead,
46 FileWrite,
48 FileEdit,
50 Search,
52 SubAgent,
54 Web,
56 Notebook,
58 Other,
60}
61
62impl ToolKind {
63 pub fn infer(name: &str) -> Self {
69 let lower = name.to_lowercase();
70 if lower.contains("notebook") {
72 Self::Notebook
73 } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74 Self::Shell
75 } else if lower.contains("read") || lower == "view" || lower == "cat" {
76 Self::FileRead
77 } else if lower.contains("write") {
78 Self::FileWrite
79 } else if lower.contains("edit") || lower.contains("patch") {
80 Self::FileEdit
81 } else if lower.contains("grep")
82 || lower.contains("glob")
83 || lower.contains("search")
84 || lower == "find"
85 {
86 Self::Search
87 } else if lower.contains("agent") {
88 Self::SubAgent
89 } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90 Self::Web
91 } else {
92 Self::Other
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100 SessionStarted {
101 command: String,
102 model: Option<String>,
103 cwd: Option<String>,
104 resumed: bool,
105 backfilled: bool,
106 },
107 UserMessage {
108 role: String,
109 content: String,
110 message_id: Option<String>,
111 },
112 AssistantMessage {
113 content: String,
114 message_id: Option<String>,
115 },
116 Reasoning {
117 content: String,
118 message_id: Option<String>,
119 },
120 ToolCall {
121 tool_name: String,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 tool_kind: Option<ToolKind>,
124 tool_id: Option<String>,
125 input: Option<Value>,
126 },
127 ToolResult {
128 tool_name: Option<String>,
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 tool_kind: Option<ToolKind>,
131 tool_id: Option<String>,
132 success: Option<bool>,
133 output: Option<String>,
134 error: Option<String>,
135 data: Option<Value>,
136 },
137 Permission {
138 tool_name: String,
139 description: String,
140 granted: bool,
141 },
142 ProviderStatus {
143 message: String,
144 data: Option<Value>,
145 },
146 Stderr {
147 message: String,
148 },
149 ParseWarning {
150 message: String,
151 raw: Option<String>,
152 },
153 SessionCleared {
154 old_session_id: Option<String>,
155 new_session_id: Option<String>,
156 },
157 SessionEnded {
158 success: bool,
159 error: Option<String>,
160 },
161 SessionResult {
164 result: String,
165 },
166 Heartbeat {
167 interval_secs: Option<u64>,
168 },
169 Usage {
170 input_tokens: u64,
171 output_tokens: u64,
172 #[serde(default, skip_serializing_if = "Option::is_none")]
173 cache_read_tokens: Option<u64>,
174 #[serde(default, skip_serializing_if = "Option::is_none")]
175 cache_creation_tokens: Option<u64>,
176 #[serde(default, skip_serializing_if = "Option::is_none")]
177 total_cost_usd: Option<f64>,
178 },
179 UserEvent {
180 level: String,
181 message: String,
182 #[serde(default, skip_serializing_if = "Option::is_none")]
183 data: Option<Value>,
184 },
185 UsageLimitHit {
193 provider: String,
195 scope: String,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
200 reset_at: Option<String>,
201 #[serde(default, skip_serializing_if = "Option::is_none")]
203 scheduled_resume_at: Option<String>,
204 #[serde(default)]
206 fallback_used: bool,
207 incident_id: String,
210 #[serde(default, skip_serializing_if = "Option::is_none")]
213 raw: Option<String>,
214 },
215 UsageLimitResumed {
217 incident_id: String,
218 resume_message: String,
219 #[serde(default = "default_attempt")]
222 attempt: u32,
223 },
224 UsageLimitResumeFailed {
226 incident_id: String,
227 error: String,
228 #[serde(default = "default_attempt")]
229 attempt: u32,
230 },
231}
232
233fn default_attempt() -> u32 {
234 1
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct AgentLogEvent {
239 pub seq: u64,
240 pub ts: String,
241 pub provider: String,
242 pub wrapper_session_id: String,
243 #[serde(default)]
244 pub provider_session_id: Option<String>,
245 pub source_kind: LogSourceKind,
246 pub completeness: LogCompleteness,
247 #[serde(flatten)]
248 pub kind: LogEventKind,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, Default)]
252pub struct SessionLogIndex {
253 pub sessions: Vec<SessionLogIndexEntry>,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct SessionLogIndexEntry {
258 pub wrapper_session_id: String,
259 pub provider: String,
260 #[serde(default)]
261 pub provider_session_id: Option<String>,
262 pub log_path: String,
263 pub completeness: LogCompleteness,
264 pub started_at: String,
265 #[serde(default)]
266 pub ended_at: Option<String>,
267 #[serde(default)]
268 pub workspace_path: Option<String>,
269 #[serde(default)]
270 pub command: Option<String>,
271 #[serde(default)]
272 pub source_paths: Vec<String>,
273 #[serde(default)]
274 pub backfilled: bool,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, Default)]
280pub struct GlobalSessionIndex {
281 pub sessions: Vec<GlobalSessionEntry>,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct GlobalSessionEntry {
286 pub session_id: String,
287 pub project: String,
288 pub log_path: String,
289 pub provider: String,
290 pub started_at: String,
291}
292
293pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
294 let path = base_dir.join("sessions_index.json");
295 if !path.exists() {
296 return Ok(GlobalSessionIndex::default());
297 }
298 let content = std::fs::read_to_string(&path)
299 .with_context(|| format!("Failed to read {}", path.display()))?;
300 Ok(serde_json::from_str(&content).unwrap_or_default())
301}
302
303pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
304 let path = base_dir.join("sessions_index.json");
305 let content = serde_json::to_string_pretty(index)?;
306 crate::file_util::atomic_write_str(&path, &content)
307 .with_context(|| format!("Failed to write {}", path.display()))
308}
309
310pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
311 let mut index = load_global_index(base_dir)?;
312 if let Some(existing) = index
313 .sessions
314 .iter_mut()
315 .find(|e| e.session_id == entry.session_id)
316 {
317 existing.log_path = entry.log_path;
318 existing.provider = entry.provider;
319 existing.started_at = entry.started_at;
320 existing.project = entry.project;
321 } else {
322 index.sessions.push(entry);
323 }
324 save_global_index(base_dir, &index)
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize, Default)]
328pub struct BackfillState {
329 #[serde(default)]
330 pub version: u32,
331 #[serde(default)]
332 pub imported_session_keys: Vec<String>,
333}
334
335#[derive(Debug, Clone)]
336pub struct SessionLogMetadata {
337 pub provider: String,
338 pub wrapper_session_id: String,
339 pub provider_session_id: Option<String>,
340 pub workspace_path: Option<String>,
341 pub command: String,
342 pub model: Option<String>,
343 pub resumed: bool,
344 pub backfilled: bool,
345}
346
347#[derive(Debug, Clone)]
348pub struct LiveLogContext {
349 pub root: Option<String>,
350 pub provider_session_id: Option<String>,
351 pub workspace_path: Option<String>,
352 pub started_at: DateTime<Utc>,
353 pub is_worktree: bool,
356}
357
358#[derive(Debug, Clone)]
359pub struct BackfilledSession {
360 pub metadata: SessionLogMetadata,
361 pub completeness: LogCompleteness,
362 pub source_paths: Vec<String>,
363 pub events: Vec<(LogSourceKind, LogEventKind)>,
364}
365
366#[async_trait]
367pub trait LiveLogAdapter: Send {
368 async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
369
370 async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
371 self.poll(writer).await
372 }
373}
374
375pub trait HistoricalLogAdapter: Send + Sync {
376 fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
377}
378
379pub fn logs_dir(root: Option<&str>) -> PathBuf {
384 if let Ok(user_log_dir) = std::env::var("ZAG_USER_LOG_DIR") {
385 return PathBuf::from(user_log_dir);
386 }
387 crate::config::Config::agent_dir(root).join("logs")
388}
389
390pub fn live_adapter_for_provider(
396 provider: &str,
397 ctx: LiveLogContext,
398 enable_live: bool,
399) -> Option<Box<dyn LiveLogAdapter>> {
400 if !enable_live {
401 return None;
402 }
403
404 match provider {
405 "claude" => Some(Box::new(
406 crate::providers::claude::logs::ClaudeLiveLogAdapter::new(ctx),
407 )),
408 "codex" => Some(Box::new(crate::providers::codex::CodexLiveLogAdapter::new(
409 ctx,
410 ))),
411 "gemini" => Some(Box::new(
412 crate::providers::gemini::GeminiLiveLogAdapter::new(ctx),
413 )),
414 "copilot" => Some(Box::new(
415 crate::providers::copilot::CopilotLiveLogAdapter::new(ctx),
416 )),
417 _ => None,
418 }
419}
420
421pub type LogEventCallback = Arc<dyn Fn(&AgentLogEvent) + Send + Sync>;
426
427#[derive(Clone)]
428pub struct SessionLogWriter {
429 state: Arc<Mutex<WriterState>>,
430}
431
432struct WriterState {
433 metadata: SessionLogMetadata,
434 log_path: PathBuf,
435 index_path: PathBuf,
436 next_seq: u64,
437 completeness: LogCompleteness,
438 global_index_dir: Option<PathBuf>,
439 event_callback: Option<LogEventCallback>,
440}
441
442pub struct SessionLogCoordinator {
443 writer: SessionLogWriter,
444 stop_tx: Option<watch::Sender<bool>>,
445 task: Option<JoinHandle<Result<()>>>,
446}
447
448impl SessionLogWriter {
449 pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
455 let sessions_dir = logs_dir.join("sessions");
456 std::fs::create_dir_all(&sessions_dir).with_context(|| {
457 format!(
458 "Failed to create session log directory: {}",
459 sessions_dir.display()
460 )
461 })?;
462 let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
463 if let Some(parent) = log_path.parent() {
464 std::fs::create_dir_all(parent)
465 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
466 }
467 if !log_path.exists() {
468 File::create(&log_path)
469 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
470 }
471
472 let next_seq = next_sequence(&log_path)?;
473 let index_path = logs_dir.join("index.json");
474 let writer = Self {
475 state: Arc::new(Mutex::new(WriterState {
476 metadata: metadata.clone(),
477 log_path: log_path.clone(),
478 index_path,
479 next_seq,
480 completeness: LogCompleteness::Full,
481 global_index_dir: None,
482 event_callback: None,
483 })),
484 };
485
486 writer.upsert_index()?;
487 Ok(writer)
488 }
489
490 pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
493 let mut state = self
494 .state
495 .lock()
496 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
497 state.global_index_dir = Some(dir);
498 Ok(())
499 }
500
501 pub fn set_event_callback(&self, cb: LogEventCallback) -> Result<()> {
508 let mut state = self
509 .state
510 .lock()
511 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
512 state.event_callback = Some(cb);
513 Ok(())
514 }
515
516 pub fn clear_event_callback(&self) -> Result<()> {
518 let mut state = self
519 .state
520 .lock()
521 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
522 state.event_callback = None;
523 Ok(())
524 }
525
526 pub fn log_path(&self) -> Result<PathBuf> {
527 let state = self
528 .state
529 .lock()
530 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
531 Ok(state.log_path.clone())
532 }
533
534 pub fn get_provider_session_id(&self) -> Option<String> {
535 self.state.lock().ok()?.metadata.provider_session_id.clone()
536 }
537
538 pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
539 let mut state = self
540 .state
541 .lock()
542 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
543 state.metadata.provider_session_id = provider_session_id;
544 drop(state);
545 self.upsert_index()
546 }
547
548 pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
549 let mut state = self
550 .state
551 .lock()
552 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
553 if rank_completeness(completeness) < rank_completeness(state.completeness) {
554 state.completeness = completeness;
555 }
556 drop(state);
557 self.upsert_index()
558 }
559
560 pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
561 let path = path.into();
562 let state = self
563 .state
564 .lock()
565 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
566 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
567 let index_path = state.index_path.clone();
568 drop(state);
569
570 let mut index = load_index(&index_path)?;
571 if let Some(entry) = index
572 .sessions
573 .iter_mut()
574 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
575 && !entry.source_paths.contains(&path)
576 {
577 entry.source_paths.push(path);
578 save_index(&index_path, &index)?;
579 }
580 Ok(())
581 }
582
583 pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
584 let (event, callback) = {
585 let mut state = self
586 .state
587 .lock()
588 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
589 let event = AgentLogEvent {
590 seq: state.next_seq,
591 ts: Utc::now().to_rfc3339(),
592 provider: state.metadata.provider.clone(),
593 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
594 provider_session_id: state.metadata.provider_session_id.clone(),
595 source_kind,
596 completeness: state.completeness,
597 kind,
598 };
599 state.next_seq += 1;
600
601 let mut file = OpenOptions::new()
602 .append(true)
603 .open(&state.log_path)
604 .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
605 writeln!(file, "{}", serde_json::to_string(&event)?)
606 .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
607
608 (event, state.event_callback.clone())
609 };
610
611 if let Some(cb) = callback {
614 cb(&event);
615 }
616 Ok(())
617 }
618
619 pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
620 self.emit(
621 LogSourceKind::Wrapper,
622 LogEventKind::SessionEnded { success, error },
623 )?;
624 let state = self
625 .state
626 .lock()
627 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
628 let index_path = state.index_path.clone();
629 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
630 drop(state);
631 let mut index = load_index(&index_path)?;
632 if let Some(entry) = index
633 .sessions
634 .iter_mut()
635 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
636 {
637 entry.ended_at = Some(Utc::now().to_rfc3339());
638 }
639 save_index(&index_path, &index)
640 }
641
642 fn upsert_index(&self) -> Result<()> {
643 let state = self
644 .state
645 .lock()
646 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
647 let mut index = load_index(&state.index_path)?;
648 let started_at;
649 let existing = index
650 .sessions
651 .iter_mut()
652 .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
653 if let Some(entry) = existing {
654 entry.provider_session_id = state.metadata.provider_session_id.clone();
655 entry.log_path = state.log_path.to_string_lossy().to_string();
656 entry.workspace_path = state.metadata.workspace_path.clone();
657 entry.command = Some(state.metadata.command.clone());
658 entry.completeness = state.completeness;
659 entry.backfilled = state.metadata.backfilled;
660 started_at = entry.started_at.clone();
661 } else {
662 started_at = Utc::now().to_rfc3339();
663 index.sessions.push(SessionLogIndexEntry {
664 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
665 provider: state.metadata.provider.clone(),
666 provider_session_id: state.metadata.provider_session_id.clone(),
667 log_path: state.log_path.to_string_lossy().to_string(),
668 completeness: state.completeness,
669 started_at: started_at.clone(),
670 ended_at: None,
671 workspace_path: state.metadata.workspace_path.clone(),
672 command: Some(state.metadata.command.clone()),
673 source_paths: Vec::new(),
674 backfilled: state.metadata.backfilled,
675 });
676 }
677 save_index(&state.index_path, &index)?;
678
679 if let Some(ref global_dir) = state.global_index_dir {
681 let project = state
683 .index_path
684 .parent()
685 .and_then(|logs| logs.parent())
686 .and_then(|proj| proj.file_name())
687 .map(|n| n.to_string_lossy().to_string())
688 .unwrap_or_default();
689 let _ = upsert_global_entry(
690 global_dir,
691 GlobalSessionEntry {
692 session_id: state.metadata.wrapper_session_id.clone(),
693 project,
694 log_path: state.log_path.to_string_lossy().to_string(),
695 provider: state.metadata.provider.clone(),
696 started_at,
697 },
698 );
699 }
700
701 Ok(())
702 }
703}
704
705impl SessionLogCoordinator {
706 pub fn start(
710 logs_dir: &Path,
711 metadata: SessionLogMetadata,
712 live_adapter: Option<Box<dyn LiveLogAdapter>>,
713 ) -> Result<Self> {
714 Self::start_with_callback(logs_dir, metadata, live_adapter, None)
715 }
716
717 pub fn start_with_callback(
721 logs_dir: &Path,
722 metadata: SessionLogMetadata,
723 live_adapter: Option<Box<dyn LiveLogAdapter>>,
724 event_callback: Option<LogEventCallback>,
725 ) -> Result<Self> {
726 let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
727 if let Some(cb) = event_callback {
728 writer.set_event_callback(cb)?;
729 }
730 writer.emit(
731 if metadata.backfilled {
732 LogSourceKind::Backfill
733 } else {
734 LogSourceKind::Wrapper
735 },
736 LogEventKind::SessionStarted {
737 command: metadata.command.clone(),
738 model: metadata.model.clone(),
739 cwd: metadata.workspace_path.clone(),
740 resumed: metadata.resumed,
741 backfilled: metadata.backfilled,
742 },
743 )?;
744
745 if let Some(adapter) = live_adapter {
746 let (stop_tx, stop_rx) = watch::channel(false);
747 let writer_clone = writer.clone();
748 let task =
749 tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
750 Ok(Self {
751 writer,
752 stop_tx: Some(stop_tx),
753 task: Some(task),
754 })
755 } else {
756 let (stop_tx, stop_rx) = watch::channel(false);
758 let writer_clone = writer.clone();
759 let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
760 Ok(Self {
761 writer,
762 stop_tx: Some(stop_tx),
763 task: Some(task),
764 })
765 }
766 }
767
768 pub fn writer(&self) -> &SessionLogWriter {
769 &self.writer
770 }
771
772 pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
773 if let Some(stop_tx) = self.stop_tx.take() {
774 let _ = stop_tx.send(true);
775 }
776 if let Some(task) = self.task.take() {
777 task.await??;
778 }
779 self.writer.finish(success, error)
780 }
781}
782
783pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
784 if let Some(prompt) = prompt
785 && !prompt.trim().is_empty()
786 {
787 writer.emit(
788 LogSourceKind::Wrapper,
789 LogEventKind::UserMessage {
790 role: "user".to_string(),
791 content: prompt.to_string(),
792 message_id: None,
793 },
794 )?;
795 }
796 Ok(())
797}
798
799pub fn append_event_to_log(
805 log_path: &Path,
806 provider: &str,
807 wrapper_session_id: &str,
808 provider_session_id: Option<&str>,
809 kind: LogEventKind,
810) -> Result<()> {
811 if let Some(parent) = log_path.parent() {
812 std::fs::create_dir_all(parent)
813 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
814 }
815 let next_seq = if log_path.exists() {
816 next_sequence(log_path)?
817 } else {
818 File::create(log_path)
819 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
820 0
821 };
822 let event = AgentLogEvent {
823 seq: next_seq,
824 ts: Utc::now().to_rfc3339(),
825 provider: provider.to_string(),
826 wrapper_session_id: wrapper_session_id.to_string(),
827 provider_session_id: provider_session_id.map(str::to_string),
828 source_kind: LogSourceKind::Wrapper,
829 completeness: LogCompleteness::Full,
830 kind,
831 };
832 let mut file = OpenOptions::new()
833 .append(true)
834 .open(log_path)
835 .with_context(|| format!("Failed to open {}", log_path.display()))?;
836 writeln!(file, "{}", serde_json::to_string(&event)?)
837 .with_context(|| format!("Failed to write {}", log_path.display()))?;
838 Ok(())
839}
840
841pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
842 if !output.session_id.is_empty() && output.session_id != "unknown" {
843 writer.set_provider_session_id(Some(output.session_id.clone()))?;
844 }
845 for event in &output.events {
846 match event {
847 Event::AssistantMessage { content, .. } => {
848 for block in content {
849 match block {
850 ContentBlock::Text { text } => {
851 writer.emit(
852 LogSourceKind::Wrapper,
853 LogEventKind::AssistantMessage {
854 content: text.clone(),
855 message_id: None,
856 },
857 )?;
858 }
859 ContentBlock::ToolUse { id, name, input } => {
860 writer.emit(
861 LogSourceKind::Wrapper,
862 LogEventKind::ToolCall {
863 tool_kind: Some(ToolKind::infer(name)),
864 tool_name: name.clone(),
865 tool_id: Some(id.clone()),
866 input: Some(input.clone()),
867 },
868 )?;
869 }
870 }
871 }
872 }
873 Event::ToolExecution {
874 tool_name,
875 tool_id,
876 result,
877 ..
878 } => {
879 writer.emit(
880 LogSourceKind::Wrapper,
881 LogEventKind::ToolResult {
882 tool_kind: Some(ToolKind::infer(tool_name)),
883 tool_name: Some(tool_name.clone()),
884 tool_id: Some(tool_id.clone()),
885 success: Some(result.success),
886 output: result.output.clone(),
887 error: result.error.clone(),
888 data: result.data.clone(),
889 },
890 )?;
891 }
892 Event::PermissionRequest {
893 tool_name,
894 description,
895 granted,
896 } => {
897 writer.emit(
898 LogSourceKind::Wrapper,
899 LogEventKind::Permission {
900 tool_name: tool_name.clone(),
901 description: description.clone(),
902 granted: *granted,
903 },
904 )?;
905 }
906 Event::Error { message, details } => {
907 writer.emit(
908 LogSourceKind::Wrapper,
909 LogEventKind::ProviderStatus {
910 message: message.clone(),
911 data: details.clone(),
912 },
913 )?;
914 }
915 Event::Init {
916 model,
917 working_directory,
918 metadata,
919 ..
920 } => {
921 writer.emit(
922 LogSourceKind::Wrapper,
923 LogEventKind::ProviderStatus {
924 message: format!("Initialized {model}"),
925 data: Some(serde_json::json!({
926 "working_directory": working_directory,
927 "metadata": metadata,
928 })),
929 },
930 )?;
931 }
932 Event::UserMessage { content } => {
933 for block in content {
934 if let ContentBlock::Text { text } = block {
935 writer.emit(
936 LogSourceKind::Wrapper,
937 LogEventKind::UserMessage {
938 role: "user".to_string(),
939 content: text.clone(),
940 message_id: None,
941 },
942 )?;
943 }
944 }
945 }
946 Event::Result {
947 success,
948 message,
949 duration_ms,
950 num_turns,
951 } => {
952 writer.emit(
953 LogSourceKind::Wrapper,
954 LogEventKind::ProviderStatus {
955 message: message
956 .clone()
957 .unwrap_or_else(|| "Result emitted".to_string()),
958 data: Some(serde_json::json!({
959 "success": success,
960 "duration_ms": duration_ms,
961 "num_turns": num_turns,
962 })),
963 },
964 )?;
965 }
966 Event::TurnComplete {
967 stop_reason,
968 turn_index,
969 usage,
970 } => {
971 writer.emit(
972 LogSourceKind::Wrapper,
973 LogEventKind::ProviderStatus {
974 message: format!("Turn {turn_index} complete"),
975 data: Some(serde_json::json!({
976 "stop_reason": stop_reason,
977 "turn_index": turn_index,
978 "usage": usage,
979 })),
980 },
981 )?;
982 }
983 Event::UsageLimitDetected {
984 provider,
985 scope,
986 reset_at,
987 raw,
988 } => {
989 writer.emit(
992 LogSourceKind::Wrapper,
993 LogEventKind::UsageLimitHit {
994 provider: provider.clone(),
995 scope: scope.clone(),
996 reset_at: reset_at.clone(),
997 scheduled_resume_at: None,
998 fallback_used: false,
999 incident_id: uuid::Uuid::new_v4().to_string(),
1000 raw: raw.clone(),
1001 },
1002 )?;
1003 }
1004 }
1005 }
1006
1007 if let Some(ref usage) = output.usage {
1009 writer.emit(
1010 LogSourceKind::Wrapper,
1011 LogEventKind::Usage {
1012 input_tokens: usage.input_tokens,
1013 output_tokens: usage.output_tokens,
1014 cache_read_tokens: usage.cache_read_tokens,
1015 cache_creation_tokens: usage.cache_creation_tokens,
1016 total_cost_usd: output.total_cost_usd,
1017 },
1018 )?;
1019 } else if let Some(cost) = output.total_cost_usd {
1020 writer.emit(
1022 LogSourceKind::Wrapper,
1023 LogEventKind::Usage {
1024 input_tokens: 0,
1025 output_tokens: 0,
1026 cache_read_tokens: None,
1027 cache_creation_tokens: None,
1028 total_cost_usd: Some(cost),
1029 },
1030 )?;
1031 }
1032
1033 Ok(())
1034}
1035
1036pub fn run_backfill(
1040 logs_dir: &Path,
1041 root: Option<&str>,
1042 providers: &[&dyn HistoricalLogAdapter],
1043) -> Result<usize> {
1044 let state_path = logs_dir.join("backfill_state.json");
1045 let mut state = load_backfill_state(&state_path)?;
1046 let current_version = 1;
1047 if state.version == current_version {
1048 info!("Historical log import already completed for version {current_version}");
1049 return Ok(0);
1050 }
1051
1052 info!("Starting historical log import");
1053 let mut imported = 0;
1054 for provider in providers {
1055 for session in provider.backfill(root)? {
1056 let key = session_key(&session.metadata);
1057 if state.imported_session_keys.contains(&key) {
1058 info!(
1059 "Skipping already imported historical session: {} {}",
1060 session.metadata.provider,
1061 session
1062 .metadata
1063 .provider_session_id
1064 .as_deref()
1065 .unwrap_or(&session.metadata.wrapper_session_id)
1066 );
1067 continue;
1068 }
1069
1070 info!(
1071 "Importing historical session: {} {}",
1072 session.metadata.provider,
1073 session
1074 .metadata
1075 .provider_session_id
1076 .as_deref()
1077 .unwrap_or(&session.metadata.wrapper_session_id)
1078 );
1079
1080 let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
1081 writer.set_completeness(session.completeness)?;
1082 for source_path in session.source_paths {
1083 info!(" source: {source_path}");
1084 let _ = writer.add_source_path(source_path);
1085 }
1086 for (source_kind, event) in session.events {
1087 writer.emit(source_kind, event)?;
1088 }
1089 writer.finish(true, None)?;
1090 state.imported_session_keys.push(key);
1091 imported += 1;
1092 }
1093 }
1094
1095 state.version = current_version;
1096 save_backfill_state(&state_path, &state)?;
1097 info!("Historical log import finished: {imported} session(s) imported");
1098 Ok(imported)
1099}
1100
1101const HEARTBEAT_INTERVAL_SECS: u64 = 10;
1103
1104async fn run_live_adapter(
1105 mut adapter: Box<dyn LiveLogAdapter>,
1106 writer: SessionLogWriter,
1107 mut stop_rx: watch::Receiver<bool>,
1108) -> Result<()> {
1109 let mut last_heartbeat = tokio::time::Instant::now();
1110 loop {
1111 adapter.poll(&writer).await?;
1112
1113 if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
1115 let _ = writer.emit(
1116 LogSourceKind::Wrapper,
1117 LogEventKind::Heartbeat {
1118 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1119 },
1120 );
1121 last_heartbeat = tokio::time::Instant::now();
1122 }
1123
1124 tokio::select! {
1125 changed = stop_rx.changed() => {
1126 if changed.is_ok() && *stop_rx.borrow() {
1127 break;
1128 }
1129 }
1130 _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
1131 }
1132 }
1133 adapter.finalize(&writer).await
1134}
1135
1136async fn run_heartbeat_loop(
1138 writer: SessionLogWriter,
1139 mut stop_rx: watch::Receiver<bool>,
1140) -> Result<()> {
1141 loop {
1142 tokio::select! {
1143 changed = stop_rx.changed() => {
1144 if changed.is_ok() && *stop_rx.borrow() {
1145 break;
1146 }
1147 }
1148 _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
1149 let _ = writer.emit(
1150 LogSourceKind::Wrapper,
1151 LogEventKind::Heartbeat {
1152 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1153 },
1154 );
1155 }
1156 }
1157 }
1158 Ok(())
1159}
1160
1161fn next_sequence(path: &Path) -> Result<u64> {
1162 if !path.exists() {
1163 return Ok(1);
1164 }
1165 let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
1166 let reader = BufReader::new(file);
1167 let mut last_seq = 0;
1168 for line in reader.lines() {
1169 let line = line?;
1170 if line.trim().is_empty() {
1171 continue;
1172 }
1173 if let Ok(value) = serde_json::from_str::<Value>(&line)
1174 && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
1175 {
1176 last_seq = seq;
1177 }
1178 }
1179 Ok(last_seq + 1)
1180}
1181
1182fn load_index(path: &Path) -> Result<SessionLogIndex> {
1183 if !path.exists() {
1184 return Ok(SessionLogIndex::default());
1185 }
1186 let content = std::fs::read_to_string(path)
1187 .with_context(|| format!("Failed to read {}", path.display()))?;
1188 Ok(serde_json::from_str(&content).unwrap_or_default())
1189}
1190
1191fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
1192 let content = serde_json::to_string_pretty(index)?;
1193 crate::file_util::atomic_write_str(path, &content)
1194 .with_context(|| format!("Failed to write {}", path.display()))
1195}
1196
1197fn load_backfill_state(path: &Path) -> Result<BackfillState> {
1198 if !path.exists() {
1199 return Ok(BackfillState::default());
1200 }
1201 let content = std::fs::read_to_string(path)
1202 .with_context(|| format!("Failed to read {}", path.display()))?;
1203 Ok(serde_json::from_str(&content).unwrap_or_default())
1204}
1205
1206fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
1207 let content = serde_json::to_string_pretty(state)?;
1208 crate::file_util::atomic_write_str(path, &content)
1209 .with_context(|| format!("Failed to write {}", path.display()))
1210}
1211
1212fn rank_completeness(completeness: LogCompleteness) -> u8 {
1213 match completeness {
1214 LogCompleteness::Full => 3,
1215 LogCompleteness::Partial => 2,
1216 LogCompleteness::MetadataOnly => 1,
1217 }
1218}
1219
1220fn session_key(metadata: &SessionLogMetadata) -> String {
1221 format!(
1222 "{}:{}",
1223 metadata.provider,
1224 metadata
1225 .provider_session_id
1226 .as_deref()
1227 .unwrap_or(&metadata.wrapper_session_id)
1228 )
1229}
1230
1231#[cfg(test)]
1232#[path = "session_log_tests.rs"]
1233mod tests;