Skip to main content

zag_agent/
session_log.rs

1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18    Full,
19    Partial,
20    MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26    Wrapper,
27    ProviderFile,
28    ProviderLog,
29    Stdout,
30    Stderr,
31    Backfill,
32}
33
34/// Normalized tool category — provider-agnostic classification of tool calls.
35///
36/// Providers map their native tool names (e.g. Claude's `Read`, Copilot's `view`)
37/// to this enum so consumers can distinguish tool types without hardcoding
38/// provider-specific strings.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42    /// Shell/command execution (Claude `Bash`, Copilot `bash`, Codex shell calls)
43    Shell,
44    /// File read operations (Claude `Read`, Copilot `view`)
45    FileRead,
46    /// File creation/overwrite (Claude `Write`, Codex `write_file`)
47    FileWrite,
48    /// File modification/patching (Claude `Edit`, Codex `apply_patch`, Copilot `edit`)
49    FileEdit,
50    /// File/content search (Claude `Glob`/`Grep`)
51    Search,
52    /// Sub-agent delegation (Claude `Agent`)
53    SubAgent,
54    /// Web/network operations
55    Web,
56    /// Notebook operations
57    Notebook,
58    /// Tool kind could not be determined from the provider's tool name
59    Other,
60}
61
62impl ToolKind {
63    /// Best-effort classification from any tool name (case-insensitive heuristic).
64    ///
65    /// Provider-specific classifiers (which map exact tool names) should live in
66    /// their respective provider modules in the binary crate. This generic fallback
67    /// is for cases where the provider is unknown or for wrapper-level code.
68    pub fn infer(name: &str) -> Self {
69        let lower = name.to_lowercase();
70        // Check compound/specific categories first to avoid false positives
71        if lower.contains("notebook") {
72            Self::Notebook
73        } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74            Self::Shell
75        } else if lower.contains("read") || lower == "view" || lower == "cat" {
76            Self::FileRead
77        } else if lower.contains("write") {
78            Self::FileWrite
79        } else if lower.contains("edit") || lower.contains("patch") {
80            Self::FileEdit
81        } else if lower.contains("grep")
82            || lower.contains("glob")
83            || lower.contains("search")
84            || lower == "find"
85        {
86            Self::Search
87        } else if lower.contains("agent") {
88            Self::SubAgent
89        } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90            Self::Web
91        } else {
92            Self::Other
93        }
94    }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100    SessionStarted {
101        command: String,
102        model: Option<String>,
103        cwd: Option<String>,
104        resumed: bool,
105        backfilled: bool,
106    },
107    UserMessage {
108        role: String,
109        content: String,
110        message_id: Option<String>,
111    },
112    AssistantMessage {
113        content: String,
114        message_id: Option<String>,
115    },
116    Reasoning {
117        content: String,
118        message_id: Option<String>,
119    },
120    ToolCall {
121        tool_name: String,
122        #[serde(default, skip_serializing_if = "Option::is_none")]
123        tool_kind: Option<ToolKind>,
124        tool_id: Option<String>,
125        input: Option<Value>,
126    },
127    ToolResult {
128        tool_name: Option<String>,
129        #[serde(default, skip_serializing_if = "Option::is_none")]
130        tool_kind: Option<ToolKind>,
131        tool_id: Option<String>,
132        success: Option<bool>,
133        output: Option<String>,
134        error: Option<String>,
135        data: Option<Value>,
136    },
137    Permission {
138        tool_name: String,
139        description: String,
140        granted: bool,
141    },
142    ProviderStatus {
143        message: String,
144        data: Option<Value>,
145    },
146    Stderr {
147        message: String,
148    },
149    ParseWarning {
150        message: String,
151        raw: Option<String>,
152    },
153    SessionCleared {
154        old_session_id: Option<String>,
155        new_session_id: Option<String>,
156    },
157    SessionEnded {
158        success: bool,
159        error: Option<String>,
160    },
161    /// Final result of the session, captured via `zag ps kill <id> <result>`
162    /// (or `--file <path>`) after a session was launched with `--exit`.
163    SessionResult {
164        result: String,
165    },
166    Heartbeat {
167        interval_secs: Option<u64>,
168    },
169    Usage {
170        input_tokens: u64,
171        output_tokens: u64,
172        #[serde(default, skip_serializing_if = "Option::is_none")]
173        cache_read_tokens: Option<u64>,
174        #[serde(default, skip_serializing_if = "Option::is_none")]
175        cache_creation_tokens: Option<u64>,
176        #[serde(default, skip_serializing_if = "Option::is_none")]
177        total_cost_usd: Option<f64>,
178    },
179    UserEvent {
180        level: String,
181        message: String,
182        #[serde(default, skip_serializing_if = "Option::is_none")]
183        data: Option<Value>,
184    },
185    /// Upstream usage / rate limit detected in the live event stream.
186    ///
187    /// Emitted by each provider's `usage_limits` detector. When emitted by the
188    /// active relay, a wake-up timer is armed for `scheduled_resume_at` and a
189    /// matching `UsageLimitResumed` (or `UsageLimitResumeFailed`) event will
190    /// follow when the timer fires. Detection inside historical adapters
191    /// (backfill, `zag events`) emits the hit but does not arm a timer.
192    UsageLimitHit {
193        /// `"claude"` | `"codex"` | `"copilot"` | `"gemini"`
194        provider: String,
195        /// `"session"` | `"weekly"` | `"global"` | `"daily"` | `"unknown"`
196        scope: String,
197        /// RFC3339 UTC when usage resets, if the provider gave us a parseable
198        /// value. `None` means a fallback duration was used.
199        #[serde(default, skip_serializing_if = "Option::is_none")]
200        reset_at: Option<String>,
201        /// RFC3339 UTC when zag intends to attempt the resume.
202        #[serde(default, skip_serializing_if = "Option::is_none")]
203        scheduled_resume_at: Option<String>,
204        /// True when `reset_at` was `None` and we substituted `fallback_secs`.
205        #[serde(default)]
206        fallback_used: bool,
207        /// Stable ID joining this hit to its later `UsageLimitResumed` /
208        /// `UsageLimitResumeFailed` event.
209        incident_id: String,
210        /// Original matched substring / JSON snippet — invaluable when
211        /// patterns drift.
212        #[serde(default, skip_serializing_if = "Option::is_none")]
213        raw: Option<String>,
214    },
215    /// Auto-resume successfully delivered to the session after a usage limit.
216    UsageLimitResumed {
217        incident_id: String,
218        resume_message: String,
219        /// 1 for first resume; increments only within a single relay process
220        /// (across respawns each is a fresh attempt-1).
221        #[serde(default = "default_attempt")]
222        attempt: u32,
223    },
224    /// Auto-resume could not be delivered (FIFO closed, spawn failed, etc.).
225    UsageLimitResumeFailed {
226        incident_id: String,
227        error: String,
228        #[serde(default = "default_attempt")]
229        attempt: u32,
230    },
231}
232
233fn default_attempt() -> u32 {
234    1
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct AgentLogEvent {
239    pub seq: u64,
240    pub ts: String,
241    pub provider: String,
242    pub wrapper_session_id: String,
243    #[serde(default)]
244    pub provider_session_id: Option<String>,
245    pub source_kind: LogSourceKind,
246    pub completeness: LogCompleteness,
247    #[serde(flatten)]
248    pub kind: LogEventKind,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize, Default)]
252pub struct SessionLogIndex {
253    pub sessions: Vec<SessionLogIndexEntry>,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct SessionLogIndexEntry {
258    pub wrapper_session_id: String,
259    pub provider: String,
260    #[serde(default)]
261    pub provider_session_id: Option<String>,
262    pub log_path: String,
263    pub completeness: LogCompleteness,
264    pub started_at: String,
265    #[serde(default)]
266    pub ended_at: Option<String>,
267    #[serde(default)]
268    pub workspace_path: Option<String>,
269    #[serde(default)]
270    pub command: Option<String>,
271    #[serde(default)]
272    pub source_paths: Vec<String>,
273    #[serde(default)]
274    pub backfilled: bool,
275}
276
277/// Global session index — maps session IDs to their project-scoped log paths
278/// so that `agent listen` can find sessions from any directory.
279#[derive(Debug, Clone, Serialize, Deserialize, Default)]
280pub struct GlobalSessionIndex {
281    pub sessions: Vec<GlobalSessionEntry>,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct GlobalSessionEntry {
286    pub session_id: String,
287    pub project: String,
288    pub log_path: String,
289    pub provider: String,
290    pub started_at: String,
291}
292
293pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
294    let path = base_dir.join("sessions_index.json");
295    if !path.exists() {
296        return Ok(GlobalSessionIndex::default());
297    }
298    let content = std::fs::read_to_string(&path)
299        .with_context(|| format!("Failed to read {}", path.display()))?;
300    Ok(serde_json::from_str(&content).unwrap_or_default())
301}
302
303pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
304    let path = base_dir.join("sessions_index.json");
305    let content = serde_json::to_string_pretty(index)?;
306    crate::file_util::atomic_write_str(&path, &content)
307        .with_context(|| format!("Failed to write {}", path.display()))
308}
309
310pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
311    let mut index = load_global_index(base_dir)?;
312    if let Some(existing) = index
313        .sessions
314        .iter_mut()
315        .find(|e| e.session_id == entry.session_id)
316    {
317        existing.log_path = entry.log_path;
318        existing.provider = entry.provider;
319        existing.started_at = entry.started_at;
320        existing.project = entry.project;
321    } else {
322        index.sessions.push(entry);
323    }
324    save_global_index(base_dir, &index)
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize, Default)]
328pub struct BackfillState {
329    #[serde(default)]
330    pub version: u32,
331    #[serde(default)]
332    pub imported_session_keys: Vec<String>,
333}
334
335#[derive(Debug, Clone)]
336pub struct SessionLogMetadata {
337    pub provider: String,
338    pub wrapper_session_id: String,
339    pub provider_session_id: Option<String>,
340    pub workspace_path: Option<String>,
341    pub command: String,
342    pub model: Option<String>,
343    pub resumed: bool,
344    pub backfilled: bool,
345}
346
347#[derive(Debug, Clone)]
348pub struct LiveLogContext {
349    pub root: Option<String>,
350    pub provider_session_id: Option<String>,
351    pub workspace_path: Option<String>,
352    pub started_at: DateTime<Utc>,
353    /// When true, the session is running in a unique worktree and the adapter
354    /// can reliably detect session clears by watching for new files.
355    pub is_worktree: bool,
356}
357
358#[derive(Debug, Clone)]
359pub struct BackfilledSession {
360    pub metadata: SessionLogMetadata,
361    pub completeness: LogCompleteness,
362    pub source_paths: Vec<String>,
363    pub events: Vec<(LogSourceKind, LogEventKind)>,
364}
365
366#[async_trait]
367pub trait LiveLogAdapter: Send {
368    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
369
370    async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
371        self.poll(writer).await
372    }
373}
374
375pub trait HistoricalLogAdapter: Send + Sync {
376    fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
377}
378
379/// Canonical session log directory for a given project root.
380///
381/// Respects the `ZAG_USER_LOG_DIR` environment variable (set by `zag serve`
382/// in user-account mode) before falling back to `<agent_dir>/logs`.
383pub fn logs_dir(root: Option<&str>) -> PathBuf {
384    if let Ok(user_log_dir) = std::env::var("ZAG_USER_LOG_DIR") {
385        return PathBuf::from(user_log_dir);
386    }
387    crate::config::Config::agent_dir(root).join("logs")
388}
389
390/// Build a provider-specific live log adapter.
391///
392/// Returns `None` when either the provider has no live adapter or `enable_live`
393/// is false. Kept in `zag-agent` so library-side callers (`AgentBuilder`) can
394/// use it without depending on `zag-cli`.
395pub fn live_adapter_for_provider(
396    provider: &str,
397    ctx: LiveLogContext,
398    enable_live: bool,
399) -> Option<Box<dyn LiveLogAdapter>> {
400    if !enable_live {
401        return None;
402    }
403
404    match provider {
405        "claude" => Some(Box::new(
406            crate::providers::claude::logs::ClaudeLiveLogAdapter::new(ctx),
407        )),
408        "codex" => Some(Box::new(crate::providers::codex::CodexLiveLogAdapter::new(
409            ctx,
410        ))),
411        "gemini" => Some(Box::new(
412            crate::providers::gemini::GeminiLiveLogAdapter::new(ctx),
413        )),
414        "copilot" => Some(Box::new(
415            crate::providers::copilot::CopilotLiveLogAdapter::new(ctx),
416        )),
417        _ => None,
418    }
419}
420
421/// Callback invoked for each event after it has been written to the session
422/// log. Used by `AgentBuilder::on_log_event` /
423/// `AgentBuilder::stream_events_to_stderr` to give library callers live
424/// visibility without re-reading the JSONL file.
425pub type LogEventCallback = Arc<dyn Fn(&AgentLogEvent) + Send + Sync>;
426
427#[derive(Clone)]
428pub struct SessionLogWriter {
429    state: Arc<Mutex<WriterState>>,
430}
431
432struct WriterState {
433    metadata: SessionLogMetadata,
434    log_path: PathBuf,
435    index_path: PathBuf,
436    next_seq: u64,
437    completeness: LogCompleteness,
438    global_index_dir: Option<PathBuf>,
439    event_callback: Option<LogEventCallback>,
440}
441
442pub struct SessionLogCoordinator {
443    writer: SessionLogWriter,
444    stop_tx: Option<watch::Sender<bool>>,
445    task: Option<JoinHandle<Result<()>>>,
446}
447
448impl SessionLogWriter {
449    /// Create a new session log writer.
450    ///
451    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
452    /// The writer will create a `sessions/` subdirectory under it for JSONL log files
453    /// and an `index.json` file for session metadata.
454    pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
455        let sessions_dir = logs_dir.join("sessions");
456        std::fs::create_dir_all(&sessions_dir).with_context(|| {
457            format!(
458                "Failed to create session log directory: {}",
459                sessions_dir.display()
460            )
461        })?;
462        let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
463        if let Some(parent) = log_path.parent() {
464            std::fs::create_dir_all(parent)
465                .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
466        }
467        if !log_path.exists() {
468            File::create(&log_path)
469                .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
470        }
471
472        let next_seq = next_sequence(&log_path)?;
473        let index_path = logs_dir.join("index.json");
474        let writer = Self {
475            state: Arc::new(Mutex::new(WriterState {
476                metadata: metadata.clone(),
477                log_path: log_path.clone(),
478                index_path,
479                next_seq,
480                completeness: LogCompleteness::Full,
481                global_index_dir: None,
482                event_callback: None,
483            })),
484        };
485
486        writer.upsert_index()?;
487        Ok(writer)
488    }
489
490    /// Set the global index directory so that session entries are also
491    /// written to `~/.zag/sessions_index.json` for cross-project lookup.
492    pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
493        let mut state = self
494            .state
495            .lock()
496            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
497        state.global_index_dir = Some(dir);
498        Ok(())
499    }
500
501    /// Register a callback fired after each event is successfully written.
502    ///
503    /// The callback runs **outside** the internal mutex and receives a
504    /// reference to the freshly-serialised event. It is safe to call
505    /// `emit` from another task while the callback runs, but do not call
506    /// back into this writer from within the callback itself.
507    pub fn set_event_callback(&self, cb: LogEventCallback) -> Result<()> {
508        let mut state = self
509            .state
510            .lock()
511            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
512        state.event_callback = Some(cb);
513        Ok(())
514    }
515
516    /// Clear any previously registered event callback.
517    pub fn clear_event_callback(&self) -> Result<()> {
518        let mut state = self
519            .state
520            .lock()
521            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
522        state.event_callback = None;
523        Ok(())
524    }
525
526    pub fn log_path(&self) -> Result<PathBuf> {
527        let state = self
528            .state
529            .lock()
530            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
531        Ok(state.log_path.clone())
532    }
533
534    pub fn get_provider_session_id(&self) -> Option<String> {
535        self.state.lock().ok()?.metadata.provider_session_id.clone()
536    }
537
538    pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
539        let mut state = self
540            .state
541            .lock()
542            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
543        state.metadata.provider_session_id = provider_session_id;
544        drop(state);
545        self.upsert_index()
546    }
547
548    pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
549        let mut state = self
550            .state
551            .lock()
552            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
553        if rank_completeness(completeness) < rank_completeness(state.completeness) {
554            state.completeness = completeness;
555        }
556        drop(state);
557        self.upsert_index()
558    }
559
560    pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
561        let path = path.into();
562        let state = self
563            .state
564            .lock()
565            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
566        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
567        let index_path = state.index_path.clone();
568        drop(state);
569
570        let mut index = load_index(&index_path)?;
571        if let Some(entry) = index
572            .sessions
573            .iter_mut()
574            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
575            && !entry.source_paths.contains(&path)
576        {
577            entry.source_paths.push(path);
578            save_index(&index_path, &index)?;
579        }
580        Ok(())
581    }
582
583    pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
584        let (event, callback) = {
585            let mut state = self
586                .state
587                .lock()
588                .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
589            let event = AgentLogEvent {
590                seq: state.next_seq,
591                ts: Utc::now().to_rfc3339(),
592                provider: state.metadata.provider.clone(),
593                wrapper_session_id: state.metadata.wrapper_session_id.clone(),
594                provider_session_id: state.metadata.provider_session_id.clone(),
595                source_kind,
596                completeness: state.completeness,
597                kind,
598            };
599            state.next_seq += 1;
600
601            let mut file = OpenOptions::new()
602                .append(true)
603                .open(&state.log_path)
604                .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
605            writeln!(file, "{}", serde_json::to_string(&event)?)
606                .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
607
608            (event, state.event_callback.clone())
609        };
610
611        // Invoke callback outside the lock so it can't deadlock by re-entering
612        // the writer and so a slow subscriber doesn't block other emitters.
613        if let Some(cb) = callback {
614            cb(&event);
615        }
616        Ok(())
617    }
618
619    pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
620        self.emit(
621            LogSourceKind::Wrapper,
622            LogEventKind::SessionEnded { success, error },
623        )?;
624        let state = self
625            .state
626            .lock()
627            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
628        let index_path = state.index_path.clone();
629        let wrapper_session_id = state.metadata.wrapper_session_id.clone();
630        drop(state);
631        let mut index = load_index(&index_path)?;
632        if let Some(entry) = index
633            .sessions
634            .iter_mut()
635            .find(|entry| entry.wrapper_session_id == wrapper_session_id)
636        {
637            entry.ended_at = Some(Utc::now().to_rfc3339());
638        }
639        save_index(&index_path, &index)
640    }
641
642    fn upsert_index(&self) -> Result<()> {
643        let state = self
644            .state
645            .lock()
646            .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
647        let mut index = load_index(&state.index_path)?;
648        let started_at;
649        let existing = index
650            .sessions
651            .iter_mut()
652            .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
653        if let Some(entry) = existing {
654            entry.provider_session_id = state.metadata.provider_session_id.clone();
655            entry.log_path = state.log_path.to_string_lossy().to_string();
656            entry.workspace_path = state.metadata.workspace_path.clone();
657            entry.command = Some(state.metadata.command.clone());
658            entry.completeness = state.completeness;
659            entry.backfilled = state.metadata.backfilled;
660            started_at = entry.started_at.clone();
661        } else {
662            started_at = Utc::now().to_rfc3339();
663            index.sessions.push(SessionLogIndexEntry {
664                wrapper_session_id: state.metadata.wrapper_session_id.clone(),
665                provider: state.metadata.provider.clone(),
666                provider_session_id: state.metadata.provider_session_id.clone(),
667                log_path: state.log_path.to_string_lossy().to_string(),
668                completeness: state.completeness,
669                started_at: started_at.clone(),
670                ended_at: None,
671                workspace_path: state.metadata.workspace_path.clone(),
672                command: Some(state.metadata.command.clone()),
673                source_paths: Vec::new(),
674                backfilled: state.metadata.backfilled,
675            });
676        }
677        save_index(&state.index_path, &index)?;
678
679        // Also upsert into global session index if configured
680        if let Some(ref global_dir) = state.global_index_dir {
681            // Derive project name from the index_path (parent of logs/index.json is the project dir)
682            let project = state
683                .index_path
684                .parent()
685                .and_then(|logs| logs.parent())
686                .and_then(|proj| proj.file_name())
687                .map(|n| n.to_string_lossy().to_string())
688                .unwrap_or_default();
689            let _ = upsert_global_entry(
690                global_dir,
691                GlobalSessionEntry {
692                    session_id: state.metadata.wrapper_session_id.clone(),
693                    project,
694                    log_path: state.log_path.to_string_lossy().to_string(),
695                    provider: state.metadata.provider.clone(),
696                    started_at,
697                },
698            );
699        }
700
701        Ok(())
702    }
703}
704
705impl SessionLogCoordinator {
706    /// Start a new session log coordinator.
707    ///
708    /// `logs_dir` is the base directory for session logs (e.g. `~/.zag/projects/<path>/logs`).
709    pub fn start(
710        logs_dir: &Path,
711        metadata: SessionLogMetadata,
712        live_adapter: Option<Box<dyn LiveLogAdapter>>,
713    ) -> Result<Self> {
714        Self::start_with_callback(logs_dir, metadata, live_adapter, None)
715    }
716
717    /// Like [`start`], but registers an event callback on the writer before
718    /// emitting the initial `SessionStarted` event, so subscribers see the
719    /// full lifecycle from the very first event.
720    pub fn start_with_callback(
721        logs_dir: &Path,
722        metadata: SessionLogMetadata,
723        live_adapter: Option<Box<dyn LiveLogAdapter>>,
724        event_callback: Option<LogEventCallback>,
725    ) -> Result<Self> {
726        let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
727        if let Some(cb) = event_callback {
728            writer.set_event_callback(cb)?;
729        }
730        writer.emit(
731            if metadata.backfilled {
732                LogSourceKind::Backfill
733            } else {
734                LogSourceKind::Wrapper
735            },
736            LogEventKind::SessionStarted {
737                command: metadata.command.clone(),
738                model: metadata.model.clone(),
739                cwd: metadata.workspace_path.clone(),
740                resumed: metadata.resumed,
741                backfilled: metadata.backfilled,
742            },
743        )?;
744
745        if let Some(adapter) = live_adapter {
746            let (stop_tx, stop_rx) = watch::channel(false);
747            let writer_clone = writer.clone();
748            let task =
749                tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
750            Ok(Self {
751                writer,
752                stop_tx: Some(stop_tx),
753                task: Some(task),
754            })
755        } else {
756            // No live adapter — start a standalone heartbeat loop
757            let (stop_tx, stop_rx) = watch::channel(false);
758            let writer_clone = writer.clone();
759            let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
760            Ok(Self {
761                writer,
762                stop_tx: Some(stop_tx),
763                task: Some(task),
764            })
765        }
766    }
767
768    pub fn writer(&self) -> &SessionLogWriter {
769        &self.writer
770    }
771
772    pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
773        if let Some(stop_tx) = self.stop_tx.take() {
774            let _ = stop_tx.send(true);
775        }
776        if let Some(task) = self.task.take() {
777            task.await??;
778        }
779        self.writer.finish(success, error)
780    }
781}
782
783pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
784    if let Some(prompt) = prompt
785        && !prompt.trim().is_empty()
786    {
787        writer.emit(
788            LogSourceKind::Wrapper,
789            LogEventKind::UserMessage {
790                role: "user".to_string(),
791                content: prompt.to_string(),
792                message_id: None,
793            },
794        )?;
795    }
796    Ok(())
797}
798
799/// Append a single event to an existing session log file without going
800/// through the [`SessionLogWriter`] machinery (no index updates, no
801/// callbacks). Used by out-of-process tools like `zag ps kill` that
802/// need to record a `SessionResult` event into a session log owned by
803/// another running zag process.
804pub fn append_event_to_log(
805    log_path: &Path,
806    provider: &str,
807    wrapper_session_id: &str,
808    provider_session_id: Option<&str>,
809    kind: LogEventKind,
810) -> Result<()> {
811    if let Some(parent) = log_path.parent() {
812        std::fs::create_dir_all(parent)
813            .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
814    }
815    let next_seq = if log_path.exists() {
816        next_sequence(log_path)?
817    } else {
818        File::create(log_path)
819            .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
820        0
821    };
822    let event = AgentLogEvent {
823        seq: next_seq,
824        ts: Utc::now().to_rfc3339(),
825        provider: provider.to_string(),
826        wrapper_session_id: wrapper_session_id.to_string(),
827        provider_session_id: provider_session_id.map(str::to_string),
828        source_kind: LogSourceKind::Wrapper,
829        completeness: LogCompleteness::Full,
830        kind,
831    };
832    let mut file = OpenOptions::new()
833        .append(true)
834        .open(log_path)
835        .with_context(|| format!("Failed to open {}", log_path.display()))?;
836    writeln!(file, "{}", serde_json::to_string(&event)?)
837        .with_context(|| format!("Failed to write {}", log_path.display()))?;
838    Ok(())
839}
840
841pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
842    if !output.session_id.is_empty() && output.session_id != "unknown" {
843        writer.set_provider_session_id(Some(output.session_id.clone()))?;
844    }
845    for event in &output.events {
846        match event {
847            Event::AssistantMessage { content, .. } => {
848                for block in content {
849                    match block {
850                        ContentBlock::Text { text } => {
851                            writer.emit(
852                                LogSourceKind::Wrapper,
853                                LogEventKind::AssistantMessage {
854                                    content: text.clone(),
855                                    message_id: None,
856                                },
857                            )?;
858                        }
859                        ContentBlock::ToolUse { id, name, input } => {
860                            writer.emit(
861                                LogSourceKind::Wrapper,
862                                LogEventKind::ToolCall {
863                                    tool_kind: Some(ToolKind::infer(name)),
864                                    tool_name: name.clone(),
865                                    tool_id: Some(id.clone()),
866                                    input: Some(input.clone()),
867                                },
868                            )?;
869                        }
870                    }
871                }
872            }
873            Event::ToolExecution {
874                tool_name,
875                tool_id,
876                result,
877                ..
878            } => {
879                writer.emit(
880                    LogSourceKind::Wrapper,
881                    LogEventKind::ToolResult {
882                        tool_kind: Some(ToolKind::infer(tool_name)),
883                        tool_name: Some(tool_name.clone()),
884                        tool_id: Some(tool_id.clone()),
885                        success: Some(result.success),
886                        output: result.output.clone(),
887                        error: result.error.clone(),
888                        data: result.data.clone(),
889                    },
890                )?;
891            }
892            Event::PermissionRequest {
893                tool_name,
894                description,
895                granted,
896            } => {
897                writer.emit(
898                    LogSourceKind::Wrapper,
899                    LogEventKind::Permission {
900                        tool_name: tool_name.clone(),
901                        description: description.clone(),
902                        granted: *granted,
903                    },
904                )?;
905            }
906            Event::Error { message, details } => {
907                writer.emit(
908                    LogSourceKind::Wrapper,
909                    LogEventKind::ProviderStatus {
910                        message: message.clone(),
911                        data: details.clone(),
912                    },
913                )?;
914            }
915            Event::Init {
916                model,
917                working_directory,
918                metadata,
919                ..
920            } => {
921                writer.emit(
922                    LogSourceKind::Wrapper,
923                    LogEventKind::ProviderStatus {
924                        message: format!("Initialized {model}"),
925                        data: Some(serde_json::json!({
926                            "working_directory": working_directory,
927                            "metadata": metadata,
928                        })),
929                    },
930                )?;
931            }
932            Event::UserMessage { content } => {
933                for block in content {
934                    if let ContentBlock::Text { text } = block {
935                        writer.emit(
936                            LogSourceKind::Wrapper,
937                            LogEventKind::UserMessage {
938                                role: "user".to_string(),
939                                content: text.clone(),
940                                message_id: None,
941                            },
942                        )?;
943                    }
944                }
945            }
946            Event::Result {
947                success,
948                message,
949                duration_ms,
950                num_turns,
951            } => {
952                writer.emit(
953                    LogSourceKind::Wrapper,
954                    LogEventKind::ProviderStatus {
955                        message: message
956                            .clone()
957                            .unwrap_or_else(|| "Result emitted".to_string()),
958                        data: Some(serde_json::json!({
959                            "success": success,
960                            "duration_ms": duration_ms,
961                            "num_turns": num_turns,
962                        })),
963                    },
964                )?;
965            }
966            Event::TurnComplete {
967                stop_reason,
968                turn_index,
969                usage,
970            } => {
971                writer.emit(
972                    LogSourceKind::Wrapper,
973                    LogEventKind::ProviderStatus {
974                        message: format!("Turn {turn_index} complete"),
975                        data: Some(serde_json::json!({
976                            "stop_reason": stop_reason,
977                            "turn_index": turn_index,
978                            "usage": usage,
979                        })),
980                    },
981                )?;
982            }
983            Event::UsageLimitDetected {
984                provider,
985                scope,
986                reset_at,
987                raw,
988            } => {
989                // No relay context here, so no scheduled_resume_at / incident
990                // joining; this is the wrapper-level backfill path.
991                writer.emit(
992                    LogSourceKind::Wrapper,
993                    LogEventKind::UsageLimitHit {
994                        provider: provider.clone(),
995                        scope: scope.clone(),
996                        reset_at: reset_at.clone(),
997                        scheduled_resume_at: None,
998                        fallback_used: false,
999                        incident_id: uuid::Uuid::new_v4().to_string(),
1000                        raw: raw.clone(),
1001                    },
1002                )?;
1003            }
1004        }
1005    }
1006
1007    // Emit usage/cost event if available
1008    if let Some(ref usage) = output.usage {
1009        writer.emit(
1010            LogSourceKind::Wrapper,
1011            LogEventKind::Usage {
1012                input_tokens: usage.input_tokens,
1013                output_tokens: usage.output_tokens,
1014                cache_read_tokens: usage.cache_read_tokens,
1015                cache_creation_tokens: usage.cache_creation_tokens,
1016                total_cost_usd: output.total_cost_usd,
1017            },
1018        )?;
1019    } else if let Some(cost) = output.total_cost_usd {
1020        // Cost without detailed usage breakdown
1021        writer.emit(
1022            LogSourceKind::Wrapper,
1023            LogEventKind::Usage {
1024                input_tokens: 0,
1025                output_tokens: 0,
1026                cache_read_tokens: None,
1027                cache_creation_tokens: None,
1028                total_cost_usd: Some(cost),
1029            },
1030        )?;
1031    }
1032
1033    Ok(())
1034}
1035
1036/// Run historical log backfill from the given provider adapters.
1037///
1038/// `logs_dir` is the base directory for session logs.
1039pub fn run_backfill(
1040    logs_dir: &Path,
1041    root: Option<&str>,
1042    providers: &[&dyn HistoricalLogAdapter],
1043) -> Result<usize> {
1044    let state_path = logs_dir.join("backfill_state.json");
1045    let mut state = load_backfill_state(&state_path)?;
1046    let current_version = 1;
1047    if state.version == current_version {
1048        info!("Historical log import already completed for version {current_version}");
1049        return Ok(0);
1050    }
1051
1052    info!("Starting historical log import");
1053    let mut imported = 0;
1054    for provider in providers {
1055        for session in provider.backfill(root)? {
1056            let key = session_key(&session.metadata);
1057            if state.imported_session_keys.contains(&key) {
1058                info!(
1059                    "Skipping already imported historical session: {} {}",
1060                    session.metadata.provider,
1061                    session
1062                        .metadata
1063                        .provider_session_id
1064                        .as_deref()
1065                        .unwrap_or(&session.metadata.wrapper_session_id)
1066                );
1067                continue;
1068            }
1069
1070            info!(
1071                "Importing historical session: {} {}",
1072                session.metadata.provider,
1073                session
1074                    .metadata
1075                    .provider_session_id
1076                    .as_deref()
1077                    .unwrap_or(&session.metadata.wrapper_session_id)
1078            );
1079
1080            let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
1081            writer.set_completeness(session.completeness)?;
1082            for source_path in session.source_paths {
1083                info!("  source: {source_path}");
1084                let _ = writer.add_source_path(source_path);
1085            }
1086            for (source_kind, event) in session.events {
1087                writer.emit(source_kind, event)?;
1088            }
1089            writer.finish(true, None)?;
1090            state.imported_session_keys.push(key);
1091            imported += 1;
1092        }
1093    }
1094
1095    state.version = current_version;
1096    save_backfill_state(&state_path, &state)?;
1097    info!("Historical log import finished: {imported} session(s) imported");
1098    Ok(imported)
1099}
1100
1101/// Heartbeat interval for session liveness detection.
1102const HEARTBEAT_INTERVAL_SECS: u64 = 10;
1103
1104async fn run_live_adapter(
1105    mut adapter: Box<dyn LiveLogAdapter>,
1106    writer: SessionLogWriter,
1107    mut stop_rx: watch::Receiver<bool>,
1108) -> Result<()> {
1109    let mut last_heartbeat = tokio::time::Instant::now();
1110    loop {
1111        adapter.poll(&writer).await?;
1112
1113        // Emit periodic heartbeats for liveness detection
1114        if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
1115            let _ = writer.emit(
1116                LogSourceKind::Wrapper,
1117                LogEventKind::Heartbeat {
1118                    interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1119                },
1120            );
1121            last_heartbeat = tokio::time::Instant::now();
1122        }
1123
1124        tokio::select! {
1125            changed = stop_rx.changed() => {
1126                if changed.is_ok() && *stop_rx.borrow() {
1127                    break;
1128                }
1129            }
1130            _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
1131        }
1132    }
1133    adapter.finalize(&writer).await
1134}
1135
1136/// Run a standalone heartbeat loop for sessions without a live adapter.
1137async fn run_heartbeat_loop(
1138    writer: SessionLogWriter,
1139    mut stop_rx: watch::Receiver<bool>,
1140) -> Result<()> {
1141    loop {
1142        tokio::select! {
1143            changed = stop_rx.changed() => {
1144                if changed.is_ok() && *stop_rx.borrow() {
1145                    break;
1146                }
1147            }
1148            _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
1149                let _ = writer.emit(
1150                    LogSourceKind::Wrapper,
1151                    LogEventKind::Heartbeat {
1152                        interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
1153                    },
1154                );
1155            }
1156        }
1157    }
1158    Ok(())
1159}
1160
1161fn next_sequence(path: &Path) -> Result<u64> {
1162    if !path.exists() {
1163        return Ok(1);
1164    }
1165    let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
1166    let reader = BufReader::new(file);
1167    let mut last_seq = 0;
1168    for line in reader.lines() {
1169        let line = line?;
1170        if line.trim().is_empty() {
1171            continue;
1172        }
1173        if let Ok(value) = serde_json::from_str::<Value>(&line)
1174            && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
1175        {
1176            last_seq = seq;
1177        }
1178    }
1179    Ok(last_seq + 1)
1180}
1181
1182fn load_index(path: &Path) -> Result<SessionLogIndex> {
1183    if !path.exists() {
1184        return Ok(SessionLogIndex::default());
1185    }
1186    let content = std::fs::read_to_string(path)
1187        .with_context(|| format!("Failed to read {}", path.display()))?;
1188    Ok(serde_json::from_str(&content).unwrap_or_default())
1189}
1190
1191fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
1192    let content = serde_json::to_string_pretty(index)?;
1193    crate::file_util::atomic_write_str(path, &content)
1194        .with_context(|| format!("Failed to write {}", path.display()))
1195}
1196
1197fn load_backfill_state(path: &Path) -> Result<BackfillState> {
1198    if !path.exists() {
1199        return Ok(BackfillState::default());
1200    }
1201    let content = std::fs::read_to_string(path)
1202        .with_context(|| format!("Failed to read {}", path.display()))?;
1203    Ok(serde_json::from_str(&content).unwrap_or_default())
1204}
1205
1206fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
1207    let content = serde_json::to_string_pretty(state)?;
1208    crate::file_util::atomic_write_str(path, &content)
1209        .with_context(|| format!("Failed to write {}", path.display()))
1210}
1211
1212fn rank_completeness(completeness: LogCompleteness) -> u8 {
1213    match completeness {
1214        LogCompleteness::Full => 3,
1215        LogCompleteness::Partial => 2,
1216        LogCompleteness::MetadataOnly => 1,
1217    }
1218}
1219
1220fn session_key(metadata: &SessionLogMetadata) -> String {
1221    format!(
1222        "{}:{}",
1223        metadata.provider,
1224        metadata
1225            .provider_session_id
1226            .as_deref()
1227            .unwrap_or(&metadata.wrapper_session_id)
1228    )
1229}
1230
1231#[cfg(test)]
1232#[path = "session_log_tests.rs"]
1233mod tests;