Skip to main content

zag_agent/
session_log.rs

1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18    Full,
19    Partial,
20    MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26    Wrapper,
27    ProviderFile,
28    ProviderLog,
29    Stdout,
30    Stderr,
31    Backfill,
32}
33
34/// Normalized tool category — provider-agnostic classification of tool calls.
35///
36/// Providers map their native tool names (e.g. Claude's `Read`, Copilot's `view`)
37/// to this enum so consumers can distinguish tool types without hardcoding
38/// provider-specific strings.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42    /// Shell/command execution (Claude `Bash`, Copilot `bash`, Codex shell calls)
43    Shell,
44    /// File read operations (Claude `Read`, Copilot `view`)
45    FileRead,
46    /// File creation/overwrite (Claude `Write`, Codex `write_file`)
47    FileWrite,
48    /// File modification/patching (Claude `Edit`, Codex `apply_patch`, Copilot `edit`)
49    FileEdit,
50    /// File/content search (Claude `Glob`/`Grep`)
51    Search,
52    /// Sub-agent delegation (Claude `Agent`)
53    SubAgent,
54    /// Web/network operations
55    Web,
56    /// Notebook operations
57    Notebook,
58    /// Tool kind could not be determined from the provider's tool name
59    Other,
60}
61
62impl ToolKind {
63    /// Best-effort classification from any tool name (case-insensitive heuristic).
64    ///
65    /// Provider-specific classifiers (which map exact tool names) should live in
66    /// their respective provider modules in the binary crate. This generic fallback
67    /// is for cases where the provider is unknown or for wrapper-level code.
68    pub fn infer(name: &str) -> Self {
69        let lower = name.to_lowercase();
70        // Check compound/specific categories first to avoid false positives
71        if lower.contains("notebook") {
72            Self::Notebook
73        } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74            Self::Shell
75        } else if lower.contains("read") || lower == "view" || lower == "cat" {
76            Self::FileRead
77        } else if lower.contains("write") {
78            Self::FileWrite
79        } else if lower.contains("edit") || lower.contains("patch") {
80            Self::FileEdit
81        } else if lower.contains("grep")
82            || lower.contains("glob")
83            || lower.contains("search")
84            || lower == "find"
85        {
86            Self::Search
87        } else if lower.contains("agent") {
88            Self::SubAgent
89        } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90            Self::Web
91        } else {
92            Self::Other
93        }
94    }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100    SessionStarted {
101        command: String,
102        model: Option<String>,
103        cwd: Option<String>,
104        resumed: bool,
105        backfilled: bool,
106    },
107    UserMessage {
108        role: String,
109        content: String,
110        message_id: Option<String>,
111    },
112    AssistantMessage {
113        content: String,
114        message_id: Option<String>,
115    },
116    Reasoning {
117        content: String,
118        message_id: Option<String>,
119    },
120    ToolCall {
121        tool_name: String,
122        #[serde(default, skip_serializing_if = "Option::is_none")]
123        tool_kind: Option<ToolKind>,
124        tool_id: Option<String>,
125        input: Option<Value>,
126    },
127    ToolResult {
128        tool_name: Option<String>,
129        #[serde(default, skip_serializing_if = "Option::is_none")]
130        tool_kind: Option<ToolKind>,
131        tool_id: Option<String>,
132        success: Option<bool>,
133        output: Option<String>,
134        error: Option<String>,
135        data: Option<Value>,
136    },
137    Permission {
138        tool_name: String,
139        description: String,
140        granted: bool,
141    },
142    ProviderStatus {
143        message: String,
144        data: Option<Value>,
145    },
146    Stderr {
147        message: String,
148    },
149    ParseWarning {
150        message: String,
151        raw: Option<String>,
152    },
153    SessionCleared {
154        old_session_id: Option<String>,
155        new_session_id: Option<String>,
156    },
157    SessionEnded {
158        success: bool,
159        error: Option<String>,
160    },
161    Heartbeat {
162        interval_secs: Option<u64>,
163    },
164    Usage {
165        input_tokens: u64,
166        output_tokens: u64,
167        #[serde(default, skip_serializing_if = "Option::is_none")]
168        cache_read_tokens: Option<u64>,
169        #[serde(default, skip_serializing_if = "Option::is_none")]
170        cache_creation_tokens: Option<u64>,
171        #[serde(default, skip_serializing_if = "Option::is_none")]
172        total_cost_usd: Option<f64>,
173    },
174    UserEvent {
175        level: String,
176        message: String,
177        #[serde(default, skip_serializing_if = "Option::is_none")]
178        data: Option<Value>,
179    },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184    pub seq: u64,
185    pub ts: String,
186    pub provider: String,
187    pub wrapper_session_id: String,
188    #[serde(default)]
189    pub provider_session_id: Option<String>,
190    pub source_kind: LogSourceKind,
191    pub completeness: LogCompleteness,
192    #[serde(flatten)]
193    pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198    pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203    pub wrapper_session_id: String,
204    pub provider: String,
205    #[serde(default)]
206    pub provider_session_id: Option<String>,
207    pub log_path: String,
208    pub completeness: LogCompleteness,
209    pub started_at: String,
210    #[serde(default)]
211    pub ended_at: Option<String>,
212    #[serde(default)]
213    pub workspace_path: Option<String>,
214    #[serde(default)]
215    pub command: Option<String>,
216    #[serde(default)]
217    pub source_paths: Vec<String>,
218    #[serde(default)]
219    pub backfilled: bool,
220}
221
222/// Global session index — maps session IDs to their project-scoped log paths
223/// so that `agent listen` can find sessions from any directory.
224#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226    pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231    pub session_id: String,
232    pub project: String,
233    pub log_path: String,
234    pub provider: String,
235    pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239    let path = base_dir.join("sessions_index.json");
240    if !path.exists() {
241        return Ok(GlobalSessionIndex::default());
242    }
243    let content = std::fs::read_to_string(&path)
244        .with_context(|| format!("Failed to read {}", path.display()))?;
245    Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249    let path = base_dir.join("sessions_index.json");
250    let content = serde_json::to_string_pretty(index)?;
251    crate::file_util::atomic_write_str(&path, &content)
252        .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256    let mut index = load_global_index(base_dir)?;
257    if let Some(existing) = index
258        .sessions
259        .iter_mut()
260        .find(|e| e.session_id == entry.session_id)
261    {
262        existing.log_path = entry.log_path;
263        existing.provider = entry.provider;
264        existing.started_at = entry.started_at;
265        existing.project = entry.project;
266    } else {
267        index.sessions.push(entry);
268    }
269    save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274    #[serde(default)]
275    pub version: u32,
276    #[serde(default)]
277    pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282    pub provider: String,
283    pub wrapper_session_id: String,
284    pub provider_session_id: Option<String>,
285    pub workspace_path: Option<String>,
286    pub command: String,
287    pub model: Option<String>,
288    pub resumed: bool,
289    pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294    pub root: Option<String>,
295    pub provider_session_id: Option<String>,
296    pub workspace_path: Option<String>,
297    pub started_at: DateTime<Utc>,
298    /// When true, the session is running in a unique worktree and the adapter
299    /// can reliably detect session clears by watching for new files.
300    pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305    pub metadata: SessionLogMetadata,
306    pub completeness: LogCompleteness,
307    pub source_paths: Vec<String>,
308    pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315    async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316        self.poll(writer).await
317    }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321    fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324#[derive(Clone)]
325pub struct SessionLogWriter {
326    state: Arc<Mutex<WriterState>>,
327}
328
329struct WriterState {
330    metadata: SessionLogMetadata,
331    log_path: PathBuf,
332    index_path: PathBuf,
333    next_seq: u64,
334    completeness: LogCompleteness,
335    global_index_dir: Option<PathBuf>,
336}
337
338pub struct SessionLogCoordinator {
339    writer: SessionLogWriter,
340    stop_tx: Option<watch::Sender<bool>>,
341    task: Option<JoinHandle<Result<()>>>,
342}
343
344impl SessionLogWriter {
345    /// Create a new session log writer.
346    ///
347    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
348    /// The writer will create a `sessions/` subdirectory under it for JSONL log files
349    /// and an `index.json` file for session metadata.
350    pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
351        let sessions_dir = logs_dir.join("sessions");
352        std::fs::create_dir_all(&sessions_dir).with_context(|| {
353            format!(
354                "Failed to create session log directory: {}",
355                sessions_dir.display()
356            )
357        })?;
358        let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
359        if let Some(parent) = log_path.parent() {
360            std::fs::create_dir_all(parent)
361                .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
362        }
363        if !log_path.exists() {
364            File::create(&log_path)
365                .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
366        }
367
368        let next_seq = next_sequence(&log_path)?;
369        let index_path = logs_dir.join("index.json");
370        let writer = Self {
371            state: Arc::new(Mutex::new(WriterState {
372                metadata: metadata.clone(),
373                log_path: log_path.clone(),
374                index_path,
375                next_seq,
376                completeness: LogCompleteness::Full,
377                global_index_dir: None,
378            })),
379        };
380
381        writer.upsert_index()?;
382        Ok(writer)
383    }
384
385    /// Set the global index directory so that session entries are also
386    /// written to `~/.zag/sessions_index.json` for cross-project lookup.
387    pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
388        let mut state = self
389            .state
390            .lock()
391            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
392        state.global_index_dir = Some(dir);
393        Ok(())
394    }
395
396    pub fn log_path(&self) -> Result<PathBuf> {
397        let state = self
398            .state
399            .lock()
400            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
401        Ok(state.log_path.clone())
402    }
403
404    pub fn get_provider_session_id(&self) -> Option<String> {
405        self.state.lock().ok()?.metadata.provider_session_id.clone()
406    }
407
408    pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
409        let mut state = self
410            .state
411            .lock()
412            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
413        state.metadata.provider_session_id = provider_session_id;
414        drop(state);
415        self.upsert_index()
416    }
417
418    pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
419        let mut state = self
420            .state
421            .lock()
422            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
423        if rank_completeness(completeness) < rank_completeness(state.completeness) {
424            state.completeness = completeness;
425        }
426        drop(state);
427        self.upsert_index()
428    }
429
430    pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
431        let path = path.into();
432        let state = self
433            .state
434            .lock()
435            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
436        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
437        let index_path = state.index_path.clone();
438        drop(state);
439
440        let mut index = load_index(&index_path)?;
441        if let Some(entry) = index
442            .sessions
443            .iter_mut()
444            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
445            && !entry.source_paths.contains(&path)
446        {
447            entry.source_paths.push(path);
448            save_index(&index_path, &index)?;
449        }
450        Ok(())
451    }
452
453    pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
454        let mut state = self
455            .state
456            .lock()
457            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
458        let event = AgentLogEvent {
459            seq: state.next_seq,
460            ts: Utc::now().to_rfc3339(),
461            provider: state.metadata.provider.clone(),
462            wrapper_session_id: state.metadata.wrapper_session_id.clone(),
463            provider_session_id: state.metadata.provider_session_id.clone(),
464            source_kind,
465            completeness: state.completeness,
466            kind,
467        };
468        state.next_seq += 1;
469
470        let mut file = OpenOptions::new()
471            .append(true)
472            .open(&state.log_path)
473            .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
474        writeln!(file, "{}", serde_json::to_string(&event)?)
475            .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
476        Ok(())
477    }
478
479    pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
480        self.emit(
481            LogSourceKind::Wrapper,
482            LogEventKind::SessionEnded { success, error },
483        )?;
484        let state = self
485            .state
486            .lock()
487            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488        let index_path = state.index_path.clone();
489        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
490        drop(state);
491        let mut index = load_index(&index_path)?;
492        if let Some(entry) = index
493            .sessions
494            .iter_mut()
495            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
496        {
497            entry.ended_at = Some(Utc::now().to_rfc3339());
498        }
499        save_index(&index_path, &index)
500    }
501
502    fn upsert_index(&self) -> Result<()> {
503        let state = self
504            .state
505            .lock()
506            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
507        let mut index = load_index(&state.index_path)?;
508        let started_at;
509        let existing = index
510            .sessions
511            .iter_mut()
512            .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
513        if let Some(entry) = existing {
514            entry.provider_session_id = state.metadata.provider_session_id.clone();
515            entry.log_path = state.log_path.to_string_lossy().to_string();
516            entry.workspace_path = state.metadata.workspace_path.clone();
517            entry.command = Some(state.metadata.command.clone());
518            entry.completeness = state.completeness;
519            entry.backfilled = state.metadata.backfilled;
520            started_at = entry.started_at.clone();
521        } else {
522            started_at = Utc::now().to_rfc3339();
523            index.sessions.push(SessionLogIndexEntry {
524                wrapper_session_id: state.metadata.wrapper_session_id.clone(),
525                provider: state.metadata.provider.clone(),
526                provider_session_id: state.metadata.provider_session_id.clone(),
527                log_path: state.log_path.to_string_lossy().to_string(),
528                completeness: state.completeness,
529                started_at: started_at.clone(),
530                ended_at: None,
531                workspace_path: state.metadata.workspace_path.clone(),
532                command: Some(state.metadata.command.clone()),
533                source_paths: Vec::new(),
534                backfilled: state.metadata.backfilled,
535            });
536        }
537        save_index(&state.index_path, &index)?;
538
539        // Also upsert into global session index if configured
540        if let Some(ref global_dir) = state.global_index_dir {
541            // Derive project name from the index_path (parent of logs/index.json is the project dir)
542            let project = state
543                .index_path
544                .parent()
545                .and_then(|logs| logs.parent())
546                .and_then(|proj| proj.file_name())
547                .map(|n| n.to_string_lossy().to_string())
548                .unwrap_or_default();
549            let _ = upsert_global_entry(
550                global_dir,
551                GlobalSessionEntry {
552                    session_id: state.metadata.wrapper_session_id.clone(),
553                    project,
554                    log_path: state.log_path.to_string_lossy().to_string(),
555                    provider: state.metadata.provider.clone(),
556                    started_at,
557                },
558            );
559        }
560
561        Ok(())
562    }
563}
564
565impl SessionLogCoordinator {
566    /// Start a new session log coordinator.
567    ///
568    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
569    pub fn start(
570        logs_dir: &Path,
571        metadata: SessionLogMetadata,
572        live_adapter: Option<Box<dyn LiveLogAdapter>>,
573    ) -> Result<Self> {
574        let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
575        writer.emit(
576            if metadata.backfilled {
577                LogSourceKind::Backfill
578            } else {
579                LogSourceKind::Wrapper
580            },
581            LogEventKind::SessionStarted {
582                command: metadata.command.clone(),
583                model: metadata.model.clone(),
584                cwd: metadata.workspace_path.clone(),
585                resumed: metadata.resumed,
586                backfilled: metadata.backfilled,
587            },
588        )?;
589
590        if let Some(adapter) = live_adapter {
591            let (stop_tx, stop_rx) = watch::channel(false);
592            let writer_clone = writer.clone();
593            let task =
594                tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
595            Ok(Self {
596                writer,
597                stop_tx: Some(stop_tx),
598                task: Some(task),
599            })
600        } else {
601            // No live adapter — start a standalone heartbeat loop
602            let (stop_tx, stop_rx) = watch::channel(false);
603            let writer_clone = writer.clone();
604            let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
605            Ok(Self {
606                writer,
607                stop_tx: Some(stop_tx),
608                task: Some(task),
609            })
610        }
611    }
612
613    pub fn writer(&self) -> &SessionLogWriter {
614        &self.writer
615    }
616
617    pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
618        if let Some(stop_tx) = self.stop_tx.take() {
619            let _ = stop_tx.send(true);
620        }
621        if let Some(task) = self.task.take() {
622            task.await??;
623        }
624        self.writer.finish(success, error)
625    }
626}
627
628pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
629    if let Some(prompt) = prompt
630        && !prompt.trim().is_empty()
631    {
632        writer.emit(
633            LogSourceKind::Wrapper,
634            LogEventKind::UserMessage {
635                role: "user".to_string(),
636                content: prompt.to_string(),
637                message_id: None,
638            },
639        )?;
640    }
641    Ok(())
642}
643
644pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
645    if !output.session_id.is_empty() && output.session_id != "unknown" {
646        writer.set_provider_session_id(Some(output.session_id.clone()))?;
647    }
648    for event in &output.events {
649        match event {
650            Event::AssistantMessage { content, .. } => {
651                for block in content {
652                    match block {
653                        ContentBlock::Text { text } => {
654                            writer.emit(
655                                LogSourceKind::Wrapper,
656                                LogEventKind::AssistantMessage {
657                                    content: text.clone(),
658                                    message_id: None,
659                                },
660                            )?;
661                        }
662                        ContentBlock::ToolUse { id, name, input } => {
663                            writer.emit(
664                                LogSourceKind::Wrapper,
665                                LogEventKind::ToolCall {
666                                    tool_kind: Some(ToolKind::infer(name)),
667                                    tool_name: name.clone(),
668                                    tool_id: Some(id.clone()),
669                                    input: Some(input.clone()),
670                                },
671                            )?;
672                        }
673                    }
674                }
675            }
676            Event::ToolExecution {
677                tool_name,
678                tool_id,
679                result,
680                ..
681            } => {
682                writer.emit(
683                    LogSourceKind::Wrapper,
684                    LogEventKind::ToolResult {
685                        tool_kind: Some(ToolKind::infer(tool_name)),
686                        tool_name: Some(tool_name.clone()),
687                        tool_id: Some(tool_id.clone()),
688                        success: Some(result.success),
689                        output: result.output.clone(),
690                        error: result.error.clone(),
691                        data: result.data.clone(),
692                    },
693                )?;
694            }
695            Event::PermissionRequest {
696                tool_name,
697                description,
698                granted,
699            } => {
700                writer.emit(
701                    LogSourceKind::Wrapper,
702                    LogEventKind::Permission {
703                        tool_name: tool_name.clone(),
704                        description: description.clone(),
705                        granted: *granted,
706                    },
707                )?;
708            }
709            Event::Error { message, details } => {
710                writer.emit(
711                    LogSourceKind::Wrapper,
712                    LogEventKind::ProviderStatus {
713                        message: message.clone(),
714                        data: details.clone(),
715                    },
716                )?;
717            }
718            Event::Init {
719                model,
720                working_directory,
721                metadata,
722                ..
723            } => {
724                writer.emit(
725                    LogSourceKind::Wrapper,
726                    LogEventKind::ProviderStatus {
727                        message: format!("Initialized {}", model),
728                        data: Some(serde_json::json!({
729                            "working_directory": working_directory,
730                            "metadata": metadata,
731                        })),
732                    },
733                )?;
734            }
735            Event::UserMessage { content } => {
736                for block in content {
737                    if let ContentBlock::Text { text } = block {
738                        writer.emit(
739                            LogSourceKind::Wrapper,
740                            LogEventKind::UserMessage {
741                                role: "user".to_string(),
742                                content: text.clone(),
743                                message_id: None,
744                            },
745                        )?;
746                    }
747                }
748            }
749            Event::Result {
750                success,
751                message,
752                duration_ms,
753                num_turns,
754            } => {
755                writer.emit(
756                    LogSourceKind::Wrapper,
757                    LogEventKind::ProviderStatus {
758                        message: message
759                            .clone()
760                            .unwrap_or_else(|| "Result emitted".to_string()),
761                        data: Some(serde_json::json!({
762                            "success": success,
763                            "duration_ms": duration_ms,
764                            "num_turns": num_turns,
765                        })),
766                    },
767                )?;
768            }
769        }
770    }
771
772    // Emit usage/cost event if available
773    if let Some(ref usage) = output.usage {
774        writer.emit(
775            LogSourceKind::Wrapper,
776            LogEventKind::Usage {
777                input_tokens: usage.input_tokens,
778                output_tokens: usage.output_tokens,
779                cache_read_tokens: usage.cache_read_tokens,
780                cache_creation_tokens: usage.cache_creation_tokens,
781                total_cost_usd: output.total_cost_usd,
782            },
783        )?;
784    } else if let Some(cost) = output.total_cost_usd {
785        // Cost without detailed usage breakdown
786        writer.emit(
787            LogSourceKind::Wrapper,
788            LogEventKind::Usage {
789                input_tokens: 0,
790                output_tokens: 0,
791                cache_read_tokens: None,
792                cache_creation_tokens: None,
793                total_cost_usd: Some(cost),
794            },
795        )?;
796    }
797
798    Ok(())
799}
800
801/// Run historical log backfill from the given provider adapters.
802///
803/// `logs_dir` is the base directory for session logs.
804pub fn run_backfill(
805    logs_dir: &Path,
806    root: Option<&str>,
807    providers: &[&dyn HistoricalLogAdapter],
808) -> Result<usize> {
809    let state_path = logs_dir.join("backfill_state.json");
810    let mut state = load_backfill_state(&state_path)?;
811    let current_version = 1;
812    if state.version == current_version {
813        info!(
814            "Historical log import already completed for version {}",
815            current_version
816        );
817        return Ok(0);
818    }
819
820    info!("Starting historical log import");
821    let mut imported = 0;
822    for provider in providers {
823        for session in provider.backfill(root)? {
824            let key = session_key(&session.metadata);
825            if state.imported_session_keys.contains(&key) {
826                info!(
827                    "Skipping already imported historical session: {} {}",
828                    session.metadata.provider,
829                    session
830                        .metadata
831                        .provider_session_id
832                        .as_deref()
833                        .unwrap_or(&session.metadata.wrapper_session_id)
834                );
835                continue;
836            }
837
838            info!(
839                "Importing historical session: {} {}",
840                session.metadata.provider,
841                session
842                    .metadata
843                    .provider_session_id
844                    .as_deref()
845                    .unwrap_or(&session.metadata.wrapper_session_id)
846            );
847
848            let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
849            writer.set_completeness(session.completeness)?;
850            for source_path in session.source_paths {
851                info!("  source: {}", source_path);
852                let _ = writer.add_source_path(source_path);
853            }
854            for (source_kind, event) in session.events {
855                writer.emit(source_kind, event)?;
856            }
857            writer.finish(true, None)?;
858            state.imported_session_keys.push(key);
859            imported += 1;
860        }
861    }
862
863    state.version = current_version;
864    save_backfill_state(&state_path, &state)?;
865    info!(
866        "Historical log import finished: {} session(s) imported",
867        imported
868    );
869    Ok(imported)
870}
871
872/// Heartbeat interval for session liveness detection.
873const HEARTBEAT_INTERVAL_SECS: u64 = 10;
874
875async fn run_live_adapter(
876    mut adapter: Box<dyn LiveLogAdapter>,
877    writer: SessionLogWriter,
878    mut stop_rx: watch::Receiver<bool>,
879) -> Result<()> {
880    let mut last_heartbeat = tokio::time::Instant::now();
881    loop {
882        adapter.poll(&writer).await?;
883
884        // Emit periodic heartbeats for liveness detection
885        if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
886            let _ = writer.emit(
887                LogSourceKind::Wrapper,
888                LogEventKind::Heartbeat {
889                    interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
890                },
891            );
892            last_heartbeat = tokio::time::Instant::now();
893        }
894
895        tokio::select! {
896            changed = stop_rx.changed() => {
897                if changed.is_ok() && *stop_rx.borrow() {
898                    break;
899                }
900            }
901            _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
902        }
903    }
904    adapter.finalize(&writer).await
905}
906
907/// Run a standalone heartbeat loop for sessions without a live adapter.
908async fn run_heartbeat_loop(
909    writer: SessionLogWriter,
910    mut stop_rx: watch::Receiver<bool>,
911) -> Result<()> {
912    loop {
913        tokio::select! {
914            changed = stop_rx.changed() => {
915                if changed.is_ok() && *stop_rx.borrow() {
916                    break;
917                }
918            }
919            _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
920                let _ = writer.emit(
921                    LogSourceKind::Wrapper,
922                    LogEventKind::Heartbeat {
923                        interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
924                    },
925                );
926            }
927        }
928    }
929    Ok(())
930}
931
932fn next_sequence(path: &Path) -> Result<u64> {
933    if !path.exists() {
934        return Ok(1);
935    }
936    let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
937    let reader = BufReader::new(file);
938    let mut last_seq = 0;
939    for line in reader.lines() {
940        let line = line?;
941        if line.trim().is_empty() {
942            continue;
943        }
944        if let Ok(value) = serde_json::from_str::<Value>(&line)
945            && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
946        {
947            last_seq = seq;
948        }
949    }
950    Ok(last_seq + 1)
951}
952
953fn load_index(path: &Path) -> Result<SessionLogIndex> {
954    if !path.exists() {
955        return Ok(SessionLogIndex::default());
956    }
957    let content = std::fs::read_to_string(path)
958        .with_context(|| format!("Failed to read {}", path.display()))?;
959    Ok(serde_json::from_str(&content).unwrap_or_default())
960}
961
962fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
963    let content = serde_json::to_string_pretty(index)?;
964    crate::file_util::atomic_write_str(path, &content)
965        .with_context(|| format!("Failed to write {}", path.display()))
966}
967
968fn load_backfill_state(path: &Path) -> Result<BackfillState> {
969    if !path.exists() {
970        return Ok(BackfillState::default());
971    }
972    let content = std::fs::read_to_string(path)
973        .with_context(|| format!("Failed to read {}", path.display()))?;
974    Ok(serde_json::from_str(&content).unwrap_or_default())
975}
976
977fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
978    let content = serde_json::to_string_pretty(state)?;
979    crate::file_util::atomic_write_str(path, &content)
980        .with_context(|| format!("Failed to write {}", path.display()))
981}
982
983fn rank_completeness(completeness: LogCompleteness) -> u8 {
984    match completeness {
985        LogCompleteness::Full => 3,
986        LogCompleteness::Partial => 2,
987        LogCompleteness::MetadataOnly => 1,
988    }
989}
990
991fn session_key(metadata: &SessionLogMetadata) -> String {
992    format!(
993        "{}:{}",
994        metadata.provider,
995        metadata
996            .provider_session_id
997            .as_deref()
998            .unwrap_or(&metadata.wrapper_session_id)
999    )
1000}
1001
1002#[cfg(test)]
1003#[path = "session_log_tests.rs"]
1004mod tests;