1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18 Full,
19 Partial,
20 MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26 Wrapper,
27 ProviderFile,
28 ProviderLog,
29 Stdout,
30 Stderr,
31 Backfill,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42 Shell,
44 FileRead,
46 FileWrite,
48 FileEdit,
50 Search,
52 SubAgent,
54 Web,
56 Notebook,
58 Other,
60}
61
62impl ToolKind {
63 pub fn infer(name: &str) -> Self {
69 let lower = name.to_lowercase();
70 if lower.contains("notebook") {
72 Self::Notebook
73 } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74 Self::Shell
75 } else if lower.contains("read") || lower == "view" || lower == "cat" {
76 Self::FileRead
77 } else if lower.contains("write") {
78 Self::FileWrite
79 } else if lower.contains("edit") || lower.contains("patch") {
80 Self::FileEdit
81 } else if lower.contains("grep")
82 || lower.contains("glob")
83 || lower.contains("search")
84 || lower == "find"
85 {
86 Self::Search
87 } else if lower.contains("agent") {
88 Self::SubAgent
89 } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90 Self::Web
91 } else {
92 Self::Other
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100 SessionStarted {
101 command: String,
102 model: Option<String>,
103 cwd: Option<String>,
104 resumed: bool,
105 backfilled: bool,
106 },
107 UserMessage {
108 role: String,
109 content: String,
110 message_id: Option<String>,
111 },
112 AssistantMessage {
113 content: String,
114 message_id: Option<String>,
115 },
116 Reasoning {
117 content: String,
118 message_id: Option<String>,
119 },
120 ToolCall {
121 tool_name: String,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 tool_kind: Option<ToolKind>,
124 tool_id: Option<String>,
125 input: Option<Value>,
126 },
127 ToolResult {
128 tool_name: Option<String>,
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 tool_kind: Option<ToolKind>,
131 tool_id: Option<String>,
132 success: Option<bool>,
133 output: Option<String>,
134 error: Option<String>,
135 data: Option<Value>,
136 },
137 Permission {
138 tool_name: String,
139 description: String,
140 granted: bool,
141 },
142 ProviderStatus {
143 message: String,
144 data: Option<Value>,
145 },
146 Stderr {
147 message: String,
148 },
149 ParseWarning {
150 message: String,
151 raw: Option<String>,
152 },
153 SessionCleared {
154 old_session_id: Option<String>,
155 new_session_id: Option<String>,
156 },
157 SessionEnded {
158 success: bool,
159 error: Option<String>,
160 },
161 Heartbeat {
162 interval_secs: Option<u64>,
163 },
164 Usage {
165 input_tokens: u64,
166 output_tokens: u64,
167 #[serde(default, skip_serializing_if = "Option::is_none")]
168 cache_read_tokens: Option<u64>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 cache_creation_tokens: Option<u64>,
171 #[serde(default, skip_serializing_if = "Option::is_none")]
172 total_cost_usd: Option<f64>,
173 },
174 UserEvent {
175 level: String,
176 message: String,
177 #[serde(default, skip_serializing_if = "Option::is_none")]
178 data: Option<Value>,
179 },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184 pub seq: u64,
185 pub ts: String,
186 pub provider: String,
187 pub wrapper_session_id: String,
188 #[serde(default)]
189 pub provider_session_id: Option<String>,
190 pub source_kind: LogSourceKind,
191 pub completeness: LogCompleteness,
192 #[serde(flatten)]
193 pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198 pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203 pub wrapper_session_id: String,
204 pub provider: String,
205 #[serde(default)]
206 pub provider_session_id: Option<String>,
207 pub log_path: String,
208 pub completeness: LogCompleteness,
209 pub started_at: String,
210 #[serde(default)]
211 pub ended_at: Option<String>,
212 #[serde(default)]
213 pub workspace_path: Option<String>,
214 #[serde(default)]
215 pub command: Option<String>,
216 #[serde(default)]
217 pub source_paths: Vec<String>,
218 #[serde(default)]
219 pub backfilled: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226 pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231 pub session_id: String,
232 pub project: String,
233 pub log_path: String,
234 pub provider: String,
235 pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239 let path = base_dir.join("sessions_index.json");
240 if !path.exists() {
241 return Ok(GlobalSessionIndex::default());
242 }
243 let content = std::fs::read_to_string(&path)
244 .with_context(|| format!("Failed to read {}", path.display()))?;
245 Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249 let path = base_dir.join("sessions_index.json");
250 let content = serde_json::to_string_pretty(index)?;
251 crate::file_util::atomic_write_str(&path, &content)
252 .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256 let mut index = load_global_index(base_dir)?;
257 if let Some(existing) = index
258 .sessions
259 .iter_mut()
260 .find(|e| e.session_id == entry.session_id)
261 {
262 existing.log_path = entry.log_path;
263 existing.provider = entry.provider;
264 existing.started_at = entry.started_at;
265 existing.project = entry.project;
266 } else {
267 index.sessions.push(entry);
268 }
269 save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274 #[serde(default)]
275 pub version: u32,
276 #[serde(default)]
277 pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282 pub provider: String,
283 pub wrapper_session_id: String,
284 pub provider_session_id: Option<String>,
285 pub workspace_path: Option<String>,
286 pub command: String,
287 pub model: Option<String>,
288 pub resumed: bool,
289 pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294 pub root: Option<String>,
295 pub provider_session_id: Option<String>,
296 pub workspace_path: Option<String>,
297 pub started_at: DateTime<Utc>,
298 pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305 pub metadata: SessionLogMetadata,
306 pub completeness: LogCompleteness,
307 pub source_paths: Vec<String>,
308 pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313 async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315 async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316 self.poll(writer).await
317 }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321 fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324pub fn logs_dir(root: Option<&str>) -> PathBuf {
329 if let Ok(user_log_dir) = std::env::var("ZAG_USER_LOG_DIR") {
330 return PathBuf::from(user_log_dir);
331 }
332 crate::config::Config::agent_dir(root).join("logs")
333}
334
335pub fn live_adapter_for_provider(
341 provider: &str,
342 ctx: LiveLogContext,
343 enable_live: bool,
344) -> Option<Box<dyn LiveLogAdapter>> {
345 if !enable_live {
346 return None;
347 }
348
349 match provider {
350 "claude" => Some(Box::new(
351 crate::providers::claude::logs::ClaudeLiveLogAdapter::new(ctx),
352 )),
353 "codex" => Some(Box::new(crate::providers::codex::CodexLiveLogAdapter::new(
354 ctx,
355 ))),
356 "gemini" => Some(Box::new(
357 crate::providers::gemini::GeminiLiveLogAdapter::new(ctx),
358 )),
359 "copilot" => Some(Box::new(
360 crate::providers::copilot::CopilotLiveLogAdapter::new(ctx),
361 )),
362 _ => None,
363 }
364}
365
366pub type LogEventCallback = Arc<dyn Fn(&AgentLogEvent) + Send + Sync>;
371
372#[derive(Clone)]
373pub struct SessionLogWriter {
374 state: Arc<Mutex<WriterState>>,
375}
376
377struct WriterState {
378 metadata: SessionLogMetadata,
379 log_path: PathBuf,
380 index_path: PathBuf,
381 next_seq: u64,
382 completeness: LogCompleteness,
383 global_index_dir: Option<PathBuf>,
384 event_callback: Option<LogEventCallback>,
385}
386
387pub struct SessionLogCoordinator {
388 writer: SessionLogWriter,
389 stop_tx: Option<watch::Sender<bool>>,
390 task: Option<JoinHandle<Result<()>>>,
391}
392
393impl SessionLogWriter {
394 pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
400 let sessions_dir = logs_dir.join("sessions");
401 std::fs::create_dir_all(&sessions_dir).with_context(|| {
402 format!(
403 "Failed to create session log directory: {}",
404 sessions_dir.display()
405 )
406 })?;
407 let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
408 if let Some(parent) = log_path.parent() {
409 std::fs::create_dir_all(parent)
410 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
411 }
412 if !log_path.exists() {
413 File::create(&log_path)
414 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
415 }
416
417 let next_seq = next_sequence(&log_path)?;
418 let index_path = logs_dir.join("index.json");
419 let writer = Self {
420 state: Arc::new(Mutex::new(WriterState {
421 metadata: metadata.clone(),
422 log_path: log_path.clone(),
423 index_path,
424 next_seq,
425 completeness: LogCompleteness::Full,
426 global_index_dir: None,
427 event_callback: None,
428 })),
429 };
430
431 writer.upsert_index()?;
432 Ok(writer)
433 }
434
435 pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
438 let mut state = self
439 .state
440 .lock()
441 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
442 state.global_index_dir = Some(dir);
443 Ok(())
444 }
445
446 pub fn set_event_callback(&self, cb: LogEventCallback) -> Result<()> {
453 let mut state = self
454 .state
455 .lock()
456 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
457 state.event_callback = Some(cb);
458 Ok(())
459 }
460
461 pub fn clear_event_callback(&self) -> Result<()> {
463 let mut state = self
464 .state
465 .lock()
466 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
467 state.event_callback = None;
468 Ok(())
469 }
470
471 pub fn log_path(&self) -> Result<PathBuf> {
472 let state = self
473 .state
474 .lock()
475 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
476 Ok(state.log_path.clone())
477 }
478
479 pub fn get_provider_session_id(&self) -> Option<String> {
480 self.state.lock().ok()?.metadata.provider_session_id.clone()
481 }
482
483 pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
484 let mut state = self
485 .state
486 .lock()
487 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488 state.metadata.provider_session_id = provider_session_id;
489 drop(state);
490 self.upsert_index()
491 }
492
493 pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
494 let mut state = self
495 .state
496 .lock()
497 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
498 if rank_completeness(completeness) < rank_completeness(state.completeness) {
499 state.completeness = completeness;
500 }
501 drop(state);
502 self.upsert_index()
503 }
504
505 pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
506 let path = path.into();
507 let state = self
508 .state
509 .lock()
510 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
511 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
512 let index_path = state.index_path.clone();
513 drop(state);
514
515 let mut index = load_index(&index_path)?;
516 if let Some(entry) = index
517 .sessions
518 .iter_mut()
519 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
520 && !entry.source_paths.contains(&path)
521 {
522 entry.source_paths.push(path);
523 save_index(&index_path, &index)?;
524 }
525 Ok(())
526 }
527
528 pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
529 let (event, callback) = {
530 let mut state = self
531 .state
532 .lock()
533 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
534 let event = AgentLogEvent {
535 seq: state.next_seq,
536 ts: Utc::now().to_rfc3339(),
537 provider: state.metadata.provider.clone(),
538 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
539 provider_session_id: state.metadata.provider_session_id.clone(),
540 source_kind,
541 completeness: state.completeness,
542 kind,
543 };
544 state.next_seq += 1;
545
546 let mut file = OpenOptions::new()
547 .append(true)
548 .open(&state.log_path)
549 .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
550 writeln!(file, "{}", serde_json::to_string(&event)?)
551 .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
552
553 (event, state.event_callback.clone())
554 };
555
556 if let Some(cb) = callback {
559 cb(&event);
560 }
561 Ok(())
562 }
563
564 pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
565 self.emit(
566 LogSourceKind::Wrapper,
567 LogEventKind::SessionEnded { success, error },
568 )?;
569 let state = self
570 .state
571 .lock()
572 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
573 let index_path = state.index_path.clone();
574 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
575 drop(state);
576 let mut index = load_index(&index_path)?;
577 if let Some(entry) = index
578 .sessions
579 .iter_mut()
580 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
581 {
582 entry.ended_at = Some(Utc::now().to_rfc3339());
583 }
584 save_index(&index_path, &index)
585 }
586
587 fn upsert_index(&self) -> Result<()> {
588 let state = self
589 .state
590 .lock()
591 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
592 let mut index = load_index(&state.index_path)?;
593 let started_at;
594 let existing = index
595 .sessions
596 .iter_mut()
597 .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
598 if let Some(entry) = existing {
599 entry.provider_session_id = state.metadata.provider_session_id.clone();
600 entry.log_path = state.log_path.to_string_lossy().to_string();
601 entry.workspace_path = state.metadata.workspace_path.clone();
602 entry.command = Some(state.metadata.command.clone());
603 entry.completeness = state.completeness;
604 entry.backfilled = state.metadata.backfilled;
605 started_at = entry.started_at.clone();
606 } else {
607 started_at = Utc::now().to_rfc3339();
608 index.sessions.push(SessionLogIndexEntry {
609 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
610 provider: state.metadata.provider.clone(),
611 provider_session_id: state.metadata.provider_session_id.clone(),
612 log_path: state.log_path.to_string_lossy().to_string(),
613 completeness: state.completeness,
614 started_at: started_at.clone(),
615 ended_at: None,
616 workspace_path: state.metadata.workspace_path.clone(),
617 command: Some(state.metadata.command.clone()),
618 source_paths: Vec::new(),
619 backfilled: state.metadata.backfilled,
620 });
621 }
622 save_index(&state.index_path, &index)?;
623
624 if let Some(ref global_dir) = state.global_index_dir {
626 let project = state
628 .index_path
629 .parent()
630 .and_then(|logs| logs.parent())
631 .and_then(|proj| proj.file_name())
632 .map(|n| n.to_string_lossy().to_string())
633 .unwrap_or_default();
634 let _ = upsert_global_entry(
635 global_dir,
636 GlobalSessionEntry {
637 session_id: state.metadata.wrapper_session_id.clone(),
638 project,
639 log_path: state.log_path.to_string_lossy().to_string(),
640 provider: state.metadata.provider.clone(),
641 started_at,
642 },
643 );
644 }
645
646 Ok(())
647 }
648}
649
650impl SessionLogCoordinator {
651 pub fn start(
655 logs_dir: &Path,
656 metadata: SessionLogMetadata,
657 live_adapter: Option<Box<dyn LiveLogAdapter>>,
658 ) -> Result<Self> {
659 Self::start_with_callback(logs_dir, metadata, live_adapter, None)
660 }
661
662 pub fn start_with_callback(
666 logs_dir: &Path,
667 metadata: SessionLogMetadata,
668 live_adapter: Option<Box<dyn LiveLogAdapter>>,
669 event_callback: Option<LogEventCallback>,
670 ) -> Result<Self> {
671 let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
672 if let Some(cb) = event_callback {
673 writer.set_event_callback(cb)?;
674 }
675 writer.emit(
676 if metadata.backfilled {
677 LogSourceKind::Backfill
678 } else {
679 LogSourceKind::Wrapper
680 },
681 LogEventKind::SessionStarted {
682 command: metadata.command.clone(),
683 model: metadata.model.clone(),
684 cwd: metadata.workspace_path.clone(),
685 resumed: metadata.resumed,
686 backfilled: metadata.backfilled,
687 },
688 )?;
689
690 if let Some(adapter) = live_adapter {
691 let (stop_tx, stop_rx) = watch::channel(false);
692 let writer_clone = writer.clone();
693 let task =
694 tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
695 Ok(Self {
696 writer,
697 stop_tx: Some(stop_tx),
698 task: Some(task),
699 })
700 } else {
701 let (stop_tx, stop_rx) = watch::channel(false);
703 let writer_clone = writer.clone();
704 let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
705 Ok(Self {
706 writer,
707 stop_tx: Some(stop_tx),
708 task: Some(task),
709 })
710 }
711 }
712
713 pub fn writer(&self) -> &SessionLogWriter {
714 &self.writer
715 }
716
717 pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
718 if let Some(stop_tx) = self.stop_tx.take() {
719 let _ = stop_tx.send(true);
720 }
721 if let Some(task) = self.task.take() {
722 task.await??;
723 }
724 self.writer.finish(success, error)
725 }
726}
727
728pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
729 if let Some(prompt) = prompt
730 && !prompt.trim().is_empty()
731 {
732 writer.emit(
733 LogSourceKind::Wrapper,
734 LogEventKind::UserMessage {
735 role: "user".to_string(),
736 content: prompt.to_string(),
737 message_id: None,
738 },
739 )?;
740 }
741 Ok(())
742}
743
744pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
745 if !output.session_id.is_empty() && output.session_id != "unknown" {
746 writer.set_provider_session_id(Some(output.session_id.clone()))?;
747 }
748 for event in &output.events {
749 match event {
750 Event::AssistantMessage { content, .. } => {
751 for block in content {
752 match block {
753 ContentBlock::Text { text } => {
754 writer.emit(
755 LogSourceKind::Wrapper,
756 LogEventKind::AssistantMessage {
757 content: text.clone(),
758 message_id: None,
759 },
760 )?;
761 }
762 ContentBlock::ToolUse { id, name, input } => {
763 writer.emit(
764 LogSourceKind::Wrapper,
765 LogEventKind::ToolCall {
766 tool_kind: Some(ToolKind::infer(name)),
767 tool_name: name.clone(),
768 tool_id: Some(id.clone()),
769 input: Some(input.clone()),
770 },
771 )?;
772 }
773 }
774 }
775 }
776 Event::ToolExecution {
777 tool_name,
778 tool_id,
779 result,
780 ..
781 } => {
782 writer.emit(
783 LogSourceKind::Wrapper,
784 LogEventKind::ToolResult {
785 tool_kind: Some(ToolKind::infer(tool_name)),
786 tool_name: Some(tool_name.clone()),
787 tool_id: Some(tool_id.clone()),
788 success: Some(result.success),
789 output: result.output.clone(),
790 error: result.error.clone(),
791 data: result.data.clone(),
792 },
793 )?;
794 }
795 Event::PermissionRequest {
796 tool_name,
797 description,
798 granted,
799 } => {
800 writer.emit(
801 LogSourceKind::Wrapper,
802 LogEventKind::Permission {
803 tool_name: tool_name.clone(),
804 description: description.clone(),
805 granted: *granted,
806 },
807 )?;
808 }
809 Event::Error { message, details } => {
810 writer.emit(
811 LogSourceKind::Wrapper,
812 LogEventKind::ProviderStatus {
813 message: message.clone(),
814 data: details.clone(),
815 },
816 )?;
817 }
818 Event::Init {
819 model,
820 working_directory,
821 metadata,
822 ..
823 } => {
824 writer.emit(
825 LogSourceKind::Wrapper,
826 LogEventKind::ProviderStatus {
827 message: format!("Initialized {model}"),
828 data: Some(serde_json::json!({
829 "working_directory": working_directory,
830 "metadata": metadata,
831 })),
832 },
833 )?;
834 }
835 Event::UserMessage { content } => {
836 for block in content {
837 if let ContentBlock::Text { text } = block {
838 writer.emit(
839 LogSourceKind::Wrapper,
840 LogEventKind::UserMessage {
841 role: "user".to_string(),
842 content: text.clone(),
843 message_id: None,
844 },
845 )?;
846 }
847 }
848 }
849 Event::Result {
850 success,
851 message,
852 duration_ms,
853 num_turns,
854 } => {
855 writer.emit(
856 LogSourceKind::Wrapper,
857 LogEventKind::ProviderStatus {
858 message: message
859 .clone()
860 .unwrap_or_else(|| "Result emitted".to_string()),
861 data: Some(serde_json::json!({
862 "success": success,
863 "duration_ms": duration_ms,
864 "num_turns": num_turns,
865 })),
866 },
867 )?;
868 }
869 Event::TurnComplete {
870 stop_reason,
871 turn_index,
872 usage,
873 } => {
874 writer.emit(
875 LogSourceKind::Wrapper,
876 LogEventKind::ProviderStatus {
877 message: format!("Turn {turn_index} complete"),
878 data: Some(serde_json::json!({
879 "stop_reason": stop_reason,
880 "turn_index": turn_index,
881 "usage": usage,
882 })),
883 },
884 )?;
885 }
886 }
887 }
888
889 if let Some(ref usage) = output.usage {
891 writer.emit(
892 LogSourceKind::Wrapper,
893 LogEventKind::Usage {
894 input_tokens: usage.input_tokens,
895 output_tokens: usage.output_tokens,
896 cache_read_tokens: usage.cache_read_tokens,
897 cache_creation_tokens: usage.cache_creation_tokens,
898 total_cost_usd: output.total_cost_usd,
899 },
900 )?;
901 } else if let Some(cost) = output.total_cost_usd {
902 writer.emit(
904 LogSourceKind::Wrapper,
905 LogEventKind::Usage {
906 input_tokens: 0,
907 output_tokens: 0,
908 cache_read_tokens: None,
909 cache_creation_tokens: None,
910 total_cost_usd: Some(cost),
911 },
912 )?;
913 }
914
915 Ok(())
916}
917
918pub fn run_backfill(
922 logs_dir: &Path,
923 root: Option<&str>,
924 providers: &[&dyn HistoricalLogAdapter],
925) -> Result<usize> {
926 let state_path = logs_dir.join("backfill_state.json");
927 let mut state = load_backfill_state(&state_path)?;
928 let current_version = 1;
929 if state.version == current_version {
930 info!("Historical log import already completed for version {current_version}");
931 return Ok(0);
932 }
933
934 info!("Starting historical log import");
935 let mut imported = 0;
936 for provider in providers {
937 for session in provider.backfill(root)? {
938 let key = session_key(&session.metadata);
939 if state.imported_session_keys.contains(&key) {
940 info!(
941 "Skipping already imported historical session: {} {}",
942 session.metadata.provider,
943 session
944 .metadata
945 .provider_session_id
946 .as_deref()
947 .unwrap_or(&session.metadata.wrapper_session_id)
948 );
949 continue;
950 }
951
952 info!(
953 "Importing historical session: {} {}",
954 session.metadata.provider,
955 session
956 .metadata
957 .provider_session_id
958 .as_deref()
959 .unwrap_or(&session.metadata.wrapper_session_id)
960 );
961
962 let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
963 writer.set_completeness(session.completeness)?;
964 for source_path in session.source_paths {
965 info!(" source: {source_path}");
966 let _ = writer.add_source_path(source_path);
967 }
968 for (source_kind, event) in session.events {
969 writer.emit(source_kind, event)?;
970 }
971 writer.finish(true, None)?;
972 state.imported_session_keys.push(key);
973 imported += 1;
974 }
975 }
976
977 state.version = current_version;
978 save_backfill_state(&state_path, &state)?;
979 info!("Historical log import finished: {imported} session(s) imported");
980 Ok(imported)
981}
982
983const HEARTBEAT_INTERVAL_SECS: u64 = 10;
985
986async fn run_live_adapter(
987 mut adapter: Box<dyn LiveLogAdapter>,
988 writer: SessionLogWriter,
989 mut stop_rx: watch::Receiver<bool>,
990) -> Result<()> {
991 let mut last_heartbeat = tokio::time::Instant::now();
992 loop {
993 adapter.poll(&writer).await?;
994
995 if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
997 let _ = writer.emit(
998 LogSourceKind::Wrapper,
999 LogEventKind::Heartbeat {
1000 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1001 },
1002 );
1003 last_heartbeat = tokio::time::Instant::now();
1004 }
1005
1006 tokio::select! {
1007 changed = stop_rx.changed() => {
1008 if changed.is_ok() && *stop_rx.borrow() {
1009 break;
1010 }
1011 }
1012 _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
1013 }
1014 }
1015 adapter.finalize(&writer).await
1016}
1017
1018async fn run_heartbeat_loop(
1020 writer: SessionLogWriter,
1021 mut stop_rx: watch::Receiver<bool>,
1022) -> Result<()> {
1023 loop {
1024 tokio::select! {
1025 changed = stop_rx.changed() => {
1026 if changed.is_ok() && *stop_rx.borrow() {
1027 break;
1028 }
1029 }
1030 _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
1031 let _ = writer.emit(
1032 LogSourceKind::Wrapper,
1033 LogEventKind::Heartbeat {
1034 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1035 },
1036 );
1037 }
1038 }
1039 }
1040 Ok(())
1041}
1042
1043fn next_sequence(path: &Path) -> Result<u64> {
1044 if !path.exists() {
1045 return Ok(1);
1046 }
1047 let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
1048 let reader = BufReader::new(file);
1049 let mut last_seq = 0;
1050 for line in reader.lines() {
1051 let line = line?;
1052 if line.trim().is_empty() {
1053 continue;
1054 }
1055 if let Ok(value) = serde_json::from_str::<Value>(&line)
1056 && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
1057 {
1058 last_seq = seq;
1059 }
1060 }
1061 Ok(last_seq + 1)
1062}
1063
1064fn load_index(path: &Path) -> Result<SessionLogIndex> {
1065 if !path.exists() {
1066 return Ok(SessionLogIndex::default());
1067 }
1068 let content = std::fs::read_to_string(path)
1069 .with_context(|| format!("Failed to read {}", path.display()))?;
1070 Ok(serde_json::from_str(&content).unwrap_or_default())
1071}
1072
1073fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
1074 let content = serde_json::to_string_pretty(index)?;
1075 crate::file_util::atomic_write_str(path, &content)
1076 .with_context(|| format!("Failed to write {}", path.display()))
1077}
1078
1079fn load_backfill_state(path: &Path) -> Result<BackfillState> {
1080 if !path.exists() {
1081 return Ok(BackfillState::default());
1082 }
1083 let content = std::fs::read_to_string(path)
1084 .with_context(|| format!("Failed to read {}", path.display()))?;
1085 Ok(serde_json::from_str(&content).unwrap_or_default())
1086}
1087
1088fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
1089 let content = serde_json::to_string_pretty(state)?;
1090 crate::file_util::atomic_write_str(path, &content)
1091 .with_context(|| format!("Failed to write {}", path.display()))
1092}
1093
1094fn rank_completeness(completeness: LogCompleteness) -> u8 {
1095 match completeness {
1096 LogCompleteness::Full => 3,
1097 LogCompleteness::Partial => 2,
1098 LogCompleteness::MetadataOnly => 1,
1099 }
1100}
1101
1102fn session_key(metadata: &SessionLogMetadata) -> String {
1103 format!(
1104 "{}:{}",
1105 metadata.provider,
1106 metadata
1107 .provider_session_id
1108 .as_deref()
1109 .unwrap_or(&metadata.wrapper_session_id)
1110 )
1111}
1112
1113#[cfg(test)]
1114#[path = "session_log_tests.rs"]
1115mod tests;