Skip to main content

zag_agent/
session_log.rs

1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18    Full,
19    Partial,
20    MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26    Wrapper,
27    ProviderFile,
28    ProviderLog,
29    Stdout,
30    Stderr,
31    Backfill,
32}
33
34/// Normalized tool category — provider-agnostic classification of tool calls.
35///
36/// Providers map their native tool names (e.g. Claude's `Read`, Copilot's `view`)
37/// to this enum so consumers can distinguish tool types without hardcoding
38/// provider-specific strings.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42    /// Shell/command execution (Claude `Bash`, Copilot `bash`, Codex shell calls)
43    Shell,
44    /// File read operations (Claude `Read`, Copilot `view`)
45    FileRead,
46    /// File creation/overwrite (Claude `Write`, Codex `write_file`)
47    FileWrite,
48    /// File modification/patching (Claude `Edit`, Codex `apply_patch`, Copilot `edit`)
49    FileEdit,
50    /// File/content search (Claude `Glob`/`Grep`)
51    Search,
52    /// Sub-agent delegation (Claude `Agent`)
53    SubAgent,
54    /// Web/network operations
55    Web,
56    /// Notebook operations
57    Notebook,
58    /// Tool kind could not be determined from the provider's tool name
59    Other,
60}
61
62impl ToolKind {
63    /// Best-effort classification from any tool name (case-insensitive heuristic).
64    ///
65    /// Provider-specific classifiers (which map exact tool names) should live in
66    /// their respective provider modules in the binary crate. This generic fallback
67    /// is for cases where the provider is unknown or for wrapper-level code.
68    pub fn infer(name: &str) -> Self {
69        let lower = name.to_lowercase();
70        // Check compound/specific categories first to avoid false positives
71        if lower.contains("notebook") {
72            Self::Notebook
73        } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74            Self::Shell
75        } else if lower.contains("read") || lower == "view" || lower == "cat" {
76            Self::FileRead
77        } else if lower.contains("write") {
78            Self::FileWrite
79        } else if lower.contains("edit") || lower.contains("patch") {
80            Self::FileEdit
81        } else if lower.contains("grep")
82            || lower.contains("glob")
83            || lower.contains("search")
84            || lower == "find"
85        {
86            Self::Search
87        } else if lower.contains("agent") {
88            Self::SubAgent
89        } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90            Self::Web
91        } else {
92            Self::Other
93        }
94    }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100    SessionStarted {
101        command: String,
102        model: Option<String>,
103        cwd: Option<String>,
104        resumed: bool,
105        backfilled: bool,
106    },
107    UserMessage {
108        role: String,
109        content: String,
110        message_id: Option<String>,
111    },
112    AssistantMessage {
113        content: String,
114        message_id: Option<String>,
115    },
116    Reasoning {
117        content: String,
118        message_id: Option<String>,
119    },
120    ToolCall {
121        tool_name: String,
122        #[serde(default, skip_serializing_if = "Option::is_none")]
123        tool_kind: Option<ToolKind>,
124        tool_id: Option<String>,
125        input: Option<Value>,
126    },
127    ToolResult {
128        tool_name: Option<String>,
129        #[serde(default, skip_serializing_if = "Option::is_none")]
130        tool_kind: Option<ToolKind>,
131        tool_id: Option<String>,
132        success: Option<bool>,
133        output: Option<String>,
134        error: Option<String>,
135        data: Option<Value>,
136    },
137    Permission {
138        tool_name: String,
139        description: String,
140        granted: bool,
141    },
142    ProviderStatus {
143        message: String,
144        data: Option<Value>,
145    },
146    Stderr {
147        message: String,
148    },
149    ParseWarning {
150        message: String,
151        raw: Option<String>,
152    },
153    SessionCleared {
154        old_session_id: Option<String>,
155        new_session_id: Option<String>,
156    },
157    SessionEnded {
158        success: bool,
159        error: Option<String>,
160    },
161    Heartbeat {
162        interval_secs: Option<u64>,
163    },
164    Usage {
165        input_tokens: u64,
166        output_tokens: u64,
167        #[serde(default, skip_serializing_if = "Option::is_none")]
168        cache_read_tokens: Option<u64>,
169        #[serde(default, skip_serializing_if = "Option::is_none")]
170        cache_creation_tokens: Option<u64>,
171        #[serde(default, skip_serializing_if = "Option::is_none")]
172        total_cost_usd: Option<f64>,
173    },
174    UserEvent {
175        level: String,
176        message: String,
177        #[serde(default, skip_serializing_if = "Option::is_none")]
178        data: Option<Value>,
179    },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184    pub seq: u64,
185    pub ts: String,
186    pub provider: String,
187    pub wrapper_session_id: String,
188    #[serde(default)]
189    pub provider_session_id: Option<String>,
190    pub source_kind: LogSourceKind,
191    pub completeness: LogCompleteness,
192    #[serde(flatten)]
193    pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198    pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203    pub wrapper_session_id: String,
204    pub provider: String,
205    #[serde(default)]
206    pub provider_session_id: Option<String>,
207    pub log_path: String,
208    pub completeness: LogCompleteness,
209    pub started_at: String,
210    #[serde(default)]
211    pub ended_at: Option<String>,
212    #[serde(default)]
213    pub workspace_path: Option<String>,
214    #[serde(default)]
215    pub command: Option<String>,
216    #[serde(default)]
217    pub source_paths: Vec<String>,
218    #[serde(default)]
219    pub backfilled: bool,
220}
221
222/// Global session index — maps session IDs to their project-scoped log paths
223/// so that `agent listen` can find sessions from any directory.
224#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226    pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231    pub session_id: String,
232    pub project: String,
233    pub log_path: String,
234    pub provider: String,
235    pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239    let path = base_dir.join("sessions_index.json");
240    if !path.exists() {
241        return Ok(GlobalSessionIndex::default());
242    }
243    let content = std::fs::read_to_string(&path)
244        .with_context(|| format!("Failed to read {}", path.display()))?;
245    Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249    let path = base_dir.join("sessions_index.json");
250    let content = serde_json::to_string_pretty(index)?;
251    crate::file_util::atomic_write_str(&path, &content)
252        .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256    let mut index = load_global_index(base_dir)?;
257    if let Some(existing) = index
258        .sessions
259        .iter_mut()
260        .find(|e| e.session_id == entry.session_id)
261    {
262        existing.log_path = entry.log_path;
263        existing.provider = entry.provider;
264        existing.started_at = entry.started_at;
265        existing.project = entry.project;
266    } else {
267        index.sessions.push(entry);
268    }
269    save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274    #[serde(default)]
275    pub version: u32,
276    #[serde(default)]
277    pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282    pub provider: String,
283    pub wrapper_session_id: String,
284    pub provider_session_id: Option<String>,
285    pub workspace_path: Option<String>,
286    pub command: String,
287    pub model: Option<String>,
288    pub resumed: bool,
289    pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294    pub root: Option<String>,
295    pub provider_session_id: Option<String>,
296    pub workspace_path: Option<String>,
297    pub started_at: DateTime<Utc>,
298    /// When true, the session is running in a unique worktree and the adapter
299    /// can reliably detect session clears by watching for new files.
300    pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305    pub metadata: SessionLogMetadata,
306    pub completeness: LogCompleteness,
307    pub source_paths: Vec<String>,
308    pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315    async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316        self.poll(writer).await
317    }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321    fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324/// Canonical session log directory for a given project root.
325///
326/// Respects the `ZAG_USER_LOG_DIR` environment variable (set by `zag serve`
327/// in user-account mode) before falling back to `<agent_dir>/logs`.
328pub fn logs_dir(root: Option<&str>) -> PathBuf {
329    if let Ok(user_log_dir) = std::env::var("ZAG_USER_LOG_DIR") {
330        return PathBuf::from(user_log_dir);
331    }
332    crate::config::Config::agent_dir(root).join("logs")
333}
334
335/// Build a provider-specific live log adapter.
336///
337/// Returns `None` when either the provider has no live adapter or `enable_live`
338/// is false. Kept in `zag-agent` so library-side callers (`AgentBuilder`) can
339/// use it without depending on `zag-cli`.
340pub fn live_adapter_for_provider(
341    provider: &str,
342    ctx: LiveLogContext,
343    enable_live: bool,
344) -> Option<Box<dyn LiveLogAdapter>> {
345    if !enable_live {
346        return None;
347    }
348
349    match provider {
350        "claude" => Some(Box::new(
351            crate::providers::claude::logs::ClaudeLiveLogAdapter::new(ctx),
352        )),
353        "codex" => Some(Box::new(crate::providers::codex::CodexLiveLogAdapter::new(
354            ctx,
355        ))),
356        "gemini" => Some(Box::new(
357            crate::providers::gemini::GeminiLiveLogAdapter::new(ctx),
358        )),
359        "copilot" => Some(Box::new(
360            crate::providers::copilot::CopilotLiveLogAdapter::new(ctx),
361        )),
362        _ => None,
363    }
364}
365
366/// Callback invoked for each event after it has been written to the session
367/// log. Used by `AgentBuilder::on_log_event` /
368/// `AgentBuilder::stream_events_to_stderr` to give library callers live
369/// visibility without re-reading the JSONL file.
370pub type LogEventCallback = Arc<dyn Fn(&AgentLogEvent) + Send + Sync>;
371
372#[derive(Clone)]
373pub struct SessionLogWriter {
374    state: Arc<Mutex<WriterState>>,
375}
376
377struct WriterState {
378    metadata: SessionLogMetadata,
379    log_path: PathBuf,
380    index_path: PathBuf,
381    next_seq: u64,
382    completeness: LogCompleteness,
383    global_index_dir: Option<PathBuf>,
384    event_callback: Option<LogEventCallback>,
385}
386
387pub struct SessionLogCoordinator {
388    writer: SessionLogWriter,
389    stop_tx: Option<watch::Sender<bool>>,
390    task: Option<JoinHandle<Result<()>>>,
391}
392
393impl SessionLogWriter {
394    /// Create a new session log writer.
395    ///
396    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
397    /// The writer will create a `sessions/` subdirectory under it for JSONL log files
398    /// and an `index.json` file for session metadata.
399    pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
400        let sessions_dir = logs_dir.join("sessions");
401        std::fs::create_dir_all(&sessions_dir).with_context(|| {
402            format!(
403                "Failed to create session log directory: {}",
404                sessions_dir.display()
405            )
406        })?;
407        let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
408        if let Some(parent) = log_path.parent() {
409            std::fs::create_dir_all(parent)
410                .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
411        }
412        if !log_path.exists() {
413            File::create(&log_path)
414                .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
415        }
416
417        let next_seq = next_sequence(&log_path)?;
418        let index_path = logs_dir.join("index.json");
419        let writer = Self {
420            state: Arc::new(Mutex::new(WriterState {
421                metadata: metadata.clone(),
422                log_path: log_path.clone(),
423                index_path,
424                next_seq,
425                completeness: LogCompleteness::Full,
426                global_index_dir: None,
427                event_callback: None,
428            })),
429        };
430
431        writer.upsert_index()?;
432        Ok(writer)
433    }
434
435    /// Set the global index directory so that session entries are also
436    /// written to `~/.zag/sessions_index.json` for cross-project lookup.
437    pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
438        let mut state = self
439            .state
440            .lock()
441            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
442        state.global_index_dir = Some(dir);
443        Ok(())
444    }
445
446    /// Register a callback fired after each event is successfully written.
447    ///
448    /// The callback runs **outside** the internal mutex and receives a
449    /// reference to the freshly-serialised event. It is safe to call
450    /// `emit` from another task while the callback runs, but do not call
451    /// back into this writer from within the callback itself.
452    pub fn set_event_callback(&self, cb: LogEventCallback) -> Result<()> {
453        let mut state = self
454            .state
455            .lock()
456            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
457        state.event_callback = Some(cb);
458        Ok(())
459    }
460
461    /// Clear any previously registered event callback.
462    pub fn clear_event_callback(&self) -> Result<()> {
463        let mut state = self
464            .state
465            .lock()
466            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
467        state.event_callback = None;
468        Ok(())
469    }
470
471    pub fn log_path(&self) -> Result<PathBuf> {
472        let state = self
473            .state
474            .lock()
475            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
476        Ok(state.log_path.clone())
477    }
478
479    pub fn get_provider_session_id(&self) -> Option<String> {
480        self.state.lock().ok()?.metadata.provider_session_id.clone()
481    }
482
483    pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
484        let mut state = self
485            .state
486            .lock()
487            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488        state.metadata.provider_session_id = provider_session_id;
489        drop(state);
490        self.upsert_index()
491    }
492
493    pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
494        let mut state = self
495            .state
496            .lock()
497            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
498        if rank_completeness(completeness) < rank_completeness(state.completeness) {
499            state.completeness = completeness;
500        }
501        drop(state);
502        self.upsert_index()
503    }
504
505    pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
506        let path = path.into();
507        let state = self
508            .state
509            .lock()
510            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
511        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
512        let index_path = state.index_path.clone();
513        drop(state);
514
515        let mut index = load_index(&index_path)?;
516        if let Some(entry) = index
517            .sessions
518            .iter_mut()
519            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
520            && !entry.source_paths.contains(&path)
521        {
522            entry.source_paths.push(path);
523            save_index(&index_path, &index)?;
524        }
525        Ok(())
526    }
527
528    pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
529        let (event, callback) = {
530            let mut state = self
531                .state
532                .lock()
533                .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
534            let event = AgentLogEvent {
535                seq: state.next_seq,
536                ts: Utc::now().to_rfc3339(),
537                provider: state.metadata.provider.clone(),
538                wrapper_session_id: state.metadata.wrapper_session_id.clone(),
539                provider_session_id: state.metadata.provider_session_id.clone(),
540                source_kind,
541                completeness: state.completeness,
542                kind,
543            };
544            state.next_seq += 1;
545
546            let mut file = OpenOptions::new()
547                .append(true)
548                .open(&state.log_path)
549                .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
550            writeln!(file, "{}", serde_json::to_string(&event)?)
551                .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
552
553            (event, state.event_callback.clone())
554        };
555
556        // Invoke callback outside the lock so it can't deadlock by re-entering
557        // the writer and so a slow subscriber doesn't block other emitters.
558        if let Some(cb) = callback {
559            cb(&event);
560        }
561        Ok(())
562    }
563
564    pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
565        self.emit(
566            LogSourceKind::Wrapper,
567            LogEventKind::SessionEnded { success, error },
568        )?;
569        let state = self
570            .state
571            .lock()
572            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
573        let index_path = state.index_path.clone();
574        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
575        drop(state);
576        let mut index = load_index(&index_path)?;
577        if let Some(entry) = index
578            .sessions
579            .iter_mut()
580            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
581        {
582            entry.ended_at = Some(Utc::now().to_rfc3339());
583        }
584        save_index(&index_path, &index)
585    }
586
587    fn upsert_index(&self) -> Result<()> {
588        let state = self
589            .state
590            .lock()
591            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
592        let mut index = load_index(&state.index_path)?;
593        let started_at;
594        let existing = index
595            .sessions
596            .iter_mut()
597            .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
598        if let Some(entry) = existing {
599            entry.provider_session_id = state.metadata.provider_session_id.clone();
600            entry.log_path = state.log_path.to_string_lossy().to_string();
601            entry.workspace_path = state.metadata.workspace_path.clone();
602            entry.command = Some(state.metadata.command.clone());
603            entry.completeness = state.completeness;
604            entry.backfilled = state.metadata.backfilled;
605            started_at = entry.started_at.clone();
606        } else {
607            started_at = Utc::now().to_rfc3339();
608            index.sessions.push(SessionLogIndexEntry {
609                wrapper_session_id: state.metadata.wrapper_session_id.clone(),
610                provider: state.metadata.provider.clone(),
611                provider_session_id: state.metadata.provider_session_id.clone(),
612                log_path: state.log_path.to_string_lossy().to_string(),
613                completeness: state.completeness,
614                started_at: started_at.clone(),
615                ended_at: None,
616                workspace_path: state.metadata.workspace_path.clone(),
617                command: Some(state.metadata.command.clone()),
618                source_paths: Vec::new(),
619                backfilled: state.metadata.backfilled,
620            });
621        }
622        save_index(&state.index_path, &index)?;
623
624        // Also upsert into global session index if configured
625        if let Some(ref global_dir) = state.global_index_dir {
626            // Derive project name from the index_path (parent of logs/index.json is the project dir)
627            let project = state
628                .index_path
629                .parent()
630                .and_then(|logs| logs.parent())
631                .and_then(|proj| proj.file_name())
632                .map(|n| n.to_string_lossy().to_string())
633                .unwrap_or_default();
634            let _ = upsert_global_entry(
635                global_dir,
636                GlobalSessionEntry {
637                    session_id: state.metadata.wrapper_session_id.clone(),
638                    project,
639                    log_path: state.log_path.to_string_lossy().to_string(),
640                    provider: state.metadata.provider.clone(),
641                    started_at,
642                },
643            );
644        }
645
646        Ok(())
647    }
648}
649
650impl SessionLogCoordinator {
651    /// Start a new session log coordinator.
652    ///
653    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
654    pub fn start(
655        logs_dir: &Path,
656        metadata: SessionLogMetadata,
657        live_adapter: Option<Box<dyn LiveLogAdapter>>,
658    ) -> Result<Self> {
659        Self::start_with_callback(logs_dir, metadata, live_adapter, None)
660    }
661
662    /// Like [`start`], but registers an event callback on the writer before
663    /// emitting the initial `SessionStarted` event, so subscribers see the
664    /// full lifecycle from the very first event.
665    pub fn start_with_callback(
666        logs_dir: &Path,
667        metadata: SessionLogMetadata,
668        live_adapter: Option<Box<dyn LiveLogAdapter>>,
669        event_callback: Option<LogEventCallback>,
670    ) -> Result<Self> {
671        let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
672        if let Some(cb) = event_callback {
673            writer.set_event_callback(cb)?;
674        }
675        writer.emit(
676            if metadata.backfilled {
677                LogSourceKind::Backfill
678            } else {
679                LogSourceKind::Wrapper
680            },
681            LogEventKind::SessionStarted {
682                command: metadata.command.clone(),
683                model: metadata.model.clone(),
684                cwd: metadata.workspace_path.clone(),
685                resumed: metadata.resumed,
686                backfilled: metadata.backfilled,
687            },
688        )?;
689
690        if let Some(adapter) = live_adapter {
691            let (stop_tx, stop_rx) = watch::channel(false);
692            let writer_clone = writer.clone();
693            let task =
694                tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
695            Ok(Self {
696                writer,
697                stop_tx: Some(stop_tx),
698                task: Some(task),
699            })
700        } else {
701            // No live adapter — start a standalone heartbeat loop
702            let (stop_tx, stop_rx) = watch::channel(false);
703            let writer_clone = writer.clone();
704            let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
705            Ok(Self {
706                writer,
707                stop_tx: Some(stop_tx),
708                task: Some(task),
709            })
710        }
711    }
712
713    pub fn writer(&self) -> &SessionLogWriter {
714        &self.writer
715    }
716
717    pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
718        if let Some(stop_tx) = self.stop_tx.take() {
719            let _ = stop_tx.send(true);
720        }
721        if let Some(task) = self.task.take() {
722            task.await??;
723        }
724        self.writer.finish(success, error)
725    }
726}
727
728pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
729    if let Some(prompt) = prompt
730        && !prompt.trim().is_empty()
731    {
732        writer.emit(
733            LogSourceKind::Wrapper,
734            LogEventKind::UserMessage {
735                role: "user".to_string(),
736                content: prompt.to_string(),
737                message_id: None,
738            },
739        )?;
740    }
741    Ok(())
742}
743
744pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
745    if !output.session_id.is_empty() && output.session_id != "unknown" {
746        writer.set_provider_session_id(Some(output.session_id.clone()))?;
747    }
748    for event in &output.events {
749        match event {
750            Event::AssistantMessage { content, .. } => {
751                for block in content {
752                    match block {
753                        ContentBlock::Text { text } => {
754                            writer.emit(
755                                LogSourceKind::Wrapper,
756                                LogEventKind::AssistantMessage {
757                                    content: text.clone(),
758                                    message_id: None,
759                                },
760                            )?;
761                        }
762                        ContentBlock::ToolUse { id, name, input } => {
763                            writer.emit(
764                                LogSourceKind::Wrapper,
765                                LogEventKind::ToolCall {
766                                    tool_kind: Some(ToolKind::infer(name)),
767                                    tool_name: name.clone(),
768                                    tool_id: Some(id.clone()),
769                                    input: Some(input.clone()),
770                                },
771                            )?;
772                        }
773                    }
774                }
775            }
776            Event::ToolExecution {
777                tool_name,
778                tool_id,
779                result,
780                ..
781            } => {
782                writer.emit(
783                    LogSourceKind::Wrapper,
784                    LogEventKind::ToolResult {
785                        tool_kind: Some(ToolKind::infer(tool_name)),
786                        tool_name: Some(tool_name.clone()),
787                        tool_id: Some(tool_id.clone()),
788                        success: Some(result.success),
789                        output: result.output.clone(),
790                        error: result.error.clone(),
791                        data: result.data.clone(),
792                    },
793                )?;
794            }
795            Event::PermissionRequest {
796                tool_name,
797                description,
798                granted,
799            } => {
800                writer.emit(
801                    LogSourceKind::Wrapper,
802                    LogEventKind::Permission {
803                        tool_name: tool_name.clone(),
804                        description: description.clone(),
805                        granted: *granted,
806                    },
807                )?;
808            }
809            Event::Error { message, details } => {
810                writer.emit(
811                    LogSourceKind::Wrapper,
812                    LogEventKind::ProviderStatus {
813                        message: message.clone(),
814                        data: details.clone(),
815                    },
816                )?;
817            }
818            Event::Init {
819                model,
820                working_directory,
821                metadata,
822                ..
823            } => {
824                writer.emit(
825                    LogSourceKind::Wrapper,
826                    LogEventKind::ProviderStatus {
827                        message: format!("Initialized {model}"),
828                        data: Some(serde_json::json!({
829                            "working_directory": working_directory,
830                            "metadata": metadata,
831                        })),
832                    },
833                )?;
834            }
835            Event::UserMessage { content } => {
836                for block in content {
837                    if let ContentBlock::Text { text } = block {
838                        writer.emit(
839                            LogSourceKind::Wrapper,
840                            LogEventKind::UserMessage {
841                                role: "user".to_string(),
842                                content: text.clone(),
843                                message_id: None,
844                            },
845                        )?;
846                    }
847                }
848            }
849            Event::Result {
850                success,
851                message,
852                duration_ms,
853                num_turns,
854            } => {
855                writer.emit(
856                    LogSourceKind::Wrapper,
857                    LogEventKind::ProviderStatus {
858                        message: message
859                            .clone()
860                            .unwrap_or_else(|| "Result emitted".to_string()),
861                        data: Some(serde_json::json!({
862                            "success": success,
863                            "duration_ms": duration_ms,
864                            "num_turns": num_turns,
865                        })),
866                    },
867                )?;
868            }
869            Event::TurnComplete {
870                stop_reason,
871                turn_index,
872                usage,
873            } => {
874                writer.emit(
875                    LogSourceKind::Wrapper,
876                    LogEventKind::ProviderStatus {
877                        message: format!("Turn {turn_index} complete"),
878                        data: Some(serde_json::json!({
879                            "stop_reason": stop_reason,
880                            "turn_index": turn_index,
881                            "usage": usage,
882                        })),
883                    },
884                )?;
885            }
886        }
887    }
888
889    // Emit usage/cost event if available
890    if let Some(ref usage) = output.usage {
891        writer.emit(
892            LogSourceKind::Wrapper,
893            LogEventKind::Usage {
894                input_tokens: usage.input_tokens,
895                output_tokens: usage.output_tokens,
896                cache_read_tokens: usage.cache_read_tokens,
897                cache_creation_tokens: usage.cache_creation_tokens,
898                total_cost_usd: output.total_cost_usd,
899            },
900        )?;
901    } else if let Some(cost) = output.total_cost_usd {
902        // Cost without detailed usage breakdown
903        writer.emit(
904            LogSourceKind::Wrapper,
905            LogEventKind::Usage {
906                input_tokens: 0,
907                output_tokens: 0,
908                cache_read_tokens: None,
909                cache_creation_tokens: None,
910                total_cost_usd: Some(cost),
911            },
912        )?;
913    }
914
915    Ok(())
916}
917
918/// Run historical log backfill from the given provider adapters.
919///
920/// `logs_dir` is the base directory for session logs.
921pub fn run_backfill(
922    logs_dir: &Path,
923    root: Option<&str>,
924    providers: &[&dyn HistoricalLogAdapter],
925) -> Result<usize> {
926    let state_path = logs_dir.join("backfill_state.json");
927    let mut state = load_backfill_state(&state_path)?;
928    let current_version = 1;
929    if state.version == current_version {
930        info!("Historical log import already completed for version {current_version}");
931        return Ok(0);
932    }
933
934    info!("Starting historical log import");
935    let mut imported = 0;
936    for provider in providers {
937        for session in provider.backfill(root)? {
938            let key = session_key(&session.metadata);
939            if state.imported_session_keys.contains(&key) {
940                info!(
941                    "Skipping already imported historical session: {} {}",
942                    session.metadata.provider,
943                    session
944                        .metadata
945                        .provider_session_id
946                        .as_deref()
947                        .unwrap_or(&session.metadata.wrapper_session_id)
948                );
949                continue;
950            }
951
952            info!(
953                "Importing historical session: {} {}",
954                session.metadata.provider,
955                session
956                    .metadata
957                    .provider_session_id
958                    .as_deref()
959                    .unwrap_or(&session.metadata.wrapper_session_id)
960            );
961
962            let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
963            writer.set_completeness(session.completeness)?;
964            for source_path in session.source_paths {
965                info!("  source: {source_path}");
966                let _ = writer.add_source_path(source_path);
967            }
968            for (source_kind, event) in session.events {
969                writer.emit(source_kind, event)?;
970            }
971            writer.finish(true, None)?;
972            state.imported_session_keys.push(key);
973            imported += 1;
974        }
975    }
976
977    state.version = current_version;
978    save_backfill_state(&state_path, &state)?;
979    info!("Historical log import finished: {imported} session(s) imported");
980    Ok(imported)
981}
982
983/// Heartbeat interval for session liveness detection.
984const HEARTBEAT_INTERVAL_SECS: u64 = 10;
985
986async fn run_live_adapter(
987    mut adapter: Box<dyn LiveLogAdapter>,
988    writer: SessionLogWriter,
989    mut stop_rx: watch::Receiver<bool>,
990) -> Result<()> {
991    let mut last_heartbeat = tokio::time::Instant::now();
992    loop {
993        adapter.poll(&writer).await?;
994
995        // Emit periodic heartbeats for liveness detection
996        if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
997            let _ = writer.emit(
998                LogSourceKind::Wrapper,
999                LogEventKind::Heartbeat {
1000                    interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1001                },
1002            );
1003            last_heartbeat = tokio::time::Instant::now();
1004        }
1005
1006        tokio::select! {
1007            changed = stop_rx.changed() => {
1008                if changed.is_ok() && *stop_rx.borrow() {
1009                    break;
1010                }
1011            }
1012            _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
1013        }
1014    }
1015    adapter.finalize(&writer).await
1016}
1017
1018/// Run a standalone heartbeat loop for sessions without a live adapter.
1019async fn run_heartbeat_loop(
1020    writer: SessionLogWriter,
1021    mut stop_rx: watch::Receiver<bool>,
1022) -> Result<()> {
1023    loop {
1024        tokio::select! {
1025            changed = stop_rx.changed() => {
1026                if changed.is_ok() && *stop_rx.borrow() {
1027                    break;
1028                }
1029            }
1030            _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
1031                let _ = writer.emit(
1032                    LogSourceKind::Wrapper,
1033                    LogEventKind::Heartbeat {
1034                        interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1035                    },
1036                );
1037            }
1038        }
1039    }
1040    Ok(())
1041}
1042
1043fn next_sequence(path: &Path) -> Result<u64> {
1044    if !path.exists() {
1045        return Ok(1);
1046    }
1047    let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
1048    let reader = BufReader::new(file);
1049    let mut last_seq = 0;
1050    for line in reader.lines() {
1051        let line = line?;
1052        if line.trim().is_empty() {
1053            continue;
1054        }
1055        if let Ok(value) = serde_json::from_str::<Value>(&line)
1056            && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
1057        {
1058            last_seq = seq;
1059        }
1060    }
1061    Ok(last_seq + 1)
1062}
1063
1064fn load_index(path: &Path) -> Result<SessionLogIndex> {
1065    if !path.exists() {
1066        return Ok(SessionLogIndex::default());
1067    }
1068    let content = std::fs::read_to_string(path)
1069        .with_context(|| format!("Failed to read {}", path.display()))?;
1070    Ok(serde_json::from_str(&content).unwrap_or_default())
1071}
1072
1073fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
1074    let content = serde_json::to_string_pretty(index)?;
1075    crate::file_util::atomic_write_str(path, &content)
1076        .with_context(|| format!("Failed to write {}", path.display()))
1077}
1078
1079fn load_backfill_state(path: &Path) -> Result<BackfillState> {
1080    if !path.exists() {
1081        return Ok(BackfillState::default());
1082    }
1083    let content = std::fs::read_to_string(path)
1084        .with_context(|| format!("Failed to read {}", path.display()))?;
1085    Ok(serde_json::from_str(&content).unwrap_or_default())
1086}
1087
1088fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
1089    let content = serde_json::to_string_pretty(state)?;
1090    crate::file_util::atomic_write_str(path, &content)
1091        .with_context(|| format!("Failed to write {}", path.display()))
1092}
1093
1094fn rank_completeness(completeness: LogCompleteness) -> u8 {
1095    match completeness {
1096        LogCompleteness::Full => 3,
1097        LogCompleteness::Partial => 2,
1098        LogCompleteness::MetadataOnly => 1,
1099    }
1100}
1101
1102fn session_key(metadata: &SessionLogMetadata) -> String {
1103    format!(
1104        "{}:{}",
1105        metadata.provider,
1106        metadata
1107            .provider_session_id
1108            .as_deref()
1109            .unwrap_or(&metadata.wrapper_session_id)
1110    )
1111}
1112
1113#[cfg(test)]
1114#[path = "session_log_tests.rs"]
1115mod tests;