Skip to main content

zig_core/
session.rs

1//! Zig session log: a recorded execution of `zig run`.
2//!
3//! A *zig session* is the parent layer above the zag session log: one zig
4//! session orchestrates many child zag sessions (one per workflow step).
5//! This module owns the writer, event schema, and on-disk index.
6//!
7//! Mirrors zag's session logging architecture so the two stay structurally
8//! aligned and conceptually transferable. Key analogs:
9//!
10//! - `SessionWriter`        ↔ `zag-agent/src/session_log.rs:344` `SessionLogWriter`
11//! - `SessionCoordinator`   ↔ `zag-agent/src/session_log.rs:565` `SessionLogCoordinator`
12//! - `SessionLogEvent`      ↔ `zag-agent/src/session_log.rs:182` `AgentLogEvent`
13//! - `SessionLogIndex`      ↔ `zag-agent/src/session_log.rs:197` `SessionLogIndex`
14//! - `GlobalSessionIndex`   ↔ `zag-agent/src/session_log.rs:225` `GlobalSessionIndex`
15//!
16//! On-disk layout (mirrors `~/.zag/...` byte-for-byte):
17//!
18//! ```text
19//! ~/.zig/
20//!   projects/<sanitized-project-path>/logs/
21//!     index.json
22//!     sessions/<zig_session_id>.jsonl
23//!   sessions_index.json
24//! ```
25
26use std::fs::{File, OpenOptions};
27use std::io::Write;
28use std::path::{Path, PathBuf};
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use chrono::Utc;
35use serde::{Deserialize, Serialize};
36use uuid::Uuid;
37
38use crate::error::ZigError;
39use crate::paths;
40
41/// Heartbeat interval — mirrors zag's 10s default
42/// (`zag-agent/src/session_log.rs:872`).
43const HEARTBEAT_INTERVAL_SECS: u64 = 10;
44
45/// Stream identifier for `step_output` events.
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum OutputStream {
49    Stdout,
50    Stderr,
51}
52
53/// Final status of a zig session.
54#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
55#[serde(rename_all = "snake_case")]
56pub enum SessionStatus {
57    Success,
58    Failure,
59}
60
61/// Event payload variants. The envelope (`SessionLogEvent`) carries `seq`,
62/// `ts`, and `zig_session_id`; this enum carries the type-specific fields.
63///
64/// Mirrors zag's `LogEventKind` (`zag-agent/src/session_log.rs:99`) in
65/// shape: `#[serde(tag = "type", rename_all = "snake_case")]`.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum SessionEventKind {
69    ZigSessionStarted {
70        workflow_name: String,
71        workflow_path: String,
72        workspace_path: Option<String>,
73        cwd: Option<String>,
74        prompt: Option<String>,
75        tier_count: usize,
76    },
77    TierStarted {
78        tier_index: usize,
79        step_names: Vec<String>,
80    },
81    StepStarted {
82        step_name: String,
83        tier_index: usize,
84        zag_session_id: String,
85        zag_command: String,
86        model: Option<String>,
87        prompt_preview: String,
88    },
89    StepOutput {
90        step_name: String,
91        stream: OutputStream,
92        line: String,
93    },
94    StepCompleted {
95        step_name: String,
96        exit_code: i32,
97        duration_ms: u64,
98        saved_vars: Vec<String>,
99    },
100    StepFailed {
101        step_name: String,
102        exit_code: Option<i32>,
103        attempt: u32,
104        error: String,
105    },
106    StepSkipped {
107        step_name: String,
108        reason: String,
109    },
110    /// Periodic liveness indicator. Mirrors zag's `Heartbeat`
111    /// (`zag-agent/src/session_log.rs:161`).
112    Heartbeat {
113        interval_secs: u64,
114    },
115    ZigSessionEnded {
116        status: SessionStatus,
117        duration_ms: u64,
118    },
119}
120
121/// Event envelope written to the JSONL log. Field naming mirrors zag's
122/// `AgentLogEvent` (`zag-agent/src/session_log.rs:182`): `seq`, `ts`, plus
123/// a session id and a flattened kind discriminator.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SessionLogEvent {
126    pub seq: u64,
127    pub ts: String,
128    pub zig_session_id: String,
129    #[serde(flatten)]
130    pub kind: SessionEventKind,
131}
132
133/// Per-project session index entry (`<project>/logs/index.json`).
134///
135/// Mirrors zag's `SessionLogIndexEntry` (`zag-agent/src/session_log.rs:201`).
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct SessionLogIndexEntry {
138    pub zig_session_id: String,
139    pub workflow_name: String,
140    pub workflow_path: String,
141    pub log_path: String,
142    pub started_at: String,
143    #[serde(default)]
144    pub ended_at: Option<String>,
145    #[serde(default)]
146    pub status: Option<SessionStatus>,
147    #[serde(default)]
148    pub workspace_path: Option<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, Default)]
152pub struct SessionLogIndex {
153    pub sessions: Vec<SessionLogIndexEntry>,
154}
155
156/// Global cross-project index entry (`~/.zig/sessions_index.json`).
157///
158/// Mirrors zag's `GlobalSessionEntry` (`zag-agent/src/session_log.rs:229`).
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct GlobalSessionEntry {
161    pub zig_session_id: String,
162    pub workflow_name: String,
163    pub project: String,
164    pub log_path: String,
165    pub started_at: String,
166    #[serde(default)]
167    pub ended_at: Option<String>,
168    #[serde(default)]
169    pub status: Option<SessionStatus>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, Default)]
173pub struct GlobalSessionIndex {
174    pub sessions: Vec<GlobalSessionEntry>,
175}
176
177// ---------------------------------------------------------------------
178// Index I/O
179// ---------------------------------------------------------------------
180
181pub fn load_project_index(path: &Path) -> SessionLogIndex {
182    if !path.exists() {
183        return SessionLogIndex::default();
184    }
185    std::fs::read_to_string(path)
186        .ok()
187        .and_then(|s| serde_json::from_str(&s).ok())
188        .unwrap_or_default()
189}
190
191pub fn save_project_index(path: &Path, index: &SessionLogIndex) -> Result<(), ZigError> {
192    if let Some(parent) = path.parent() {
193        std::fs::create_dir_all(parent)
194            .map_err(|e| ZigError::Io(format!("failed to create {}: {e}", parent.display())))?;
195    }
196    let json = serde_json::to_string_pretty(index)
197        .map_err(|e| ZigError::Io(format!("failed to serialize project index: {e}")))?;
198    std::fs::write(path, json)
199        .map_err(|e| ZigError::Io(format!("failed to write {}: {e}", path.display())))?;
200    Ok(())
201}
202
203pub fn load_global_index(path: &Path) -> GlobalSessionIndex {
204    if !path.exists() {
205        return GlobalSessionIndex::default();
206    }
207    std::fs::read_to_string(path)
208        .ok()
209        .and_then(|s| serde_json::from_str(&s).ok())
210        .unwrap_or_default()
211}
212
213pub fn save_global_index(path: &Path, index: &GlobalSessionIndex) -> Result<(), ZigError> {
214    if let Some(parent) = path.parent() {
215        std::fs::create_dir_all(parent)
216            .map_err(|e| ZigError::Io(format!("failed to create {}: {e}", parent.display())))?;
217    }
218    let json = serde_json::to_string_pretty(index)
219        .map_err(|e| ZigError::Io(format!("failed to serialize global index: {e}")))?;
220    std::fs::write(path, json)
221        .map_err(|e| ZigError::Io(format!("failed to write {}: {e}", path.display())))?;
222    Ok(())
223}
224
225// ---------------------------------------------------------------------
226// Query helpers
227// ---------------------------------------------------------------------
228
229/// List all sessions from the project index.
230pub fn list_sessions() -> Result<Vec<SessionLogIndexEntry>, ZigError> {
231    let index_path = paths::project_index_path(None)
232        .ok_or_else(|| ZigError::Io("HOME environment variable not set".into()))?;
233    let index = load_project_index(&index_path);
234    Ok(index.sessions)
235}
236
237/// Read all events from a session JSONL log file.
238pub fn read_session_events(log_path: &Path) -> Result<Vec<SessionLogEvent>, ZigError> {
239    let content = std::fs::read_to_string(log_path)
240        .map_err(|e| ZigError::Io(format!("failed to read {}: {e}", log_path.display())))?;
241
242    let mut events = Vec::new();
243    for line in content.lines() {
244        let line = line.trim();
245        if line.is_empty() {
246            continue;
247        }
248        let event: SessionLogEvent = serde_json::from_str(line)
249            .map_err(|e| ZigError::Parse(format!("failed to parse session event: {e}")))?;
250        events.push(event);
251    }
252    Ok(events)
253}
254
255/// Find a session by ID (or unique prefix) and return its index entry.
256pub fn find_session(session_id: &str) -> Result<SessionLogIndexEntry, ZigError> {
257    let sessions = list_sessions()?;
258    let matches: Vec<_> = sessions
259        .into_iter()
260        .filter(|s| s.zig_session_id.starts_with(session_id))
261        .collect();
262
263    match matches.len() {
264        0 => Err(ZigError::Io(format!("session not found: {session_id}"))),
265        1 => Ok(matches.into_iter().next().unwrap()),
266        _ => Err(ZigError::Io(format!(
267            "ambiguous session prefix '{session_id}' matches {} sessions",
268            matches.len()
269        ))),
270    }
271}
272
273// ---------------------------------------------------------------------
274// Writer
275// ---------------------------------------------------------------------
276
277/// Append-only writer for a single zig session log file.
278///
279/// Mirrors zag's `SessionLogWriter` (`zag-agent/src/session_log.rs:344`).
280/// Every emit increments `seq`, stamps `ts`, serializes one JSON line, and
281/// flushes — so a tailer reading the file sees the event within one poll
282/// cycle.
283pub struct SessionWriter {
284    zig_session_id: String,
285    log_path: PathBuf,
286    project_index_path: Option<PathBuf>,
287    global_index_path: Option<PathBuf>,
288    inner: Mutex<WriterInner>,
289}
290
291struct WriterInner {
292    file: File,
293    seq: u64,
294}
295
296impl SessionWriter {
297    /// Create a new session: generate a UUID, ensure the project sessions
298    /// dir, open the log file for append, emit `ZigSessionStarted`, and
299    /// upsert both indexes.
300    pub fn create(
301        workflow_name: &str,
302        workflow_path: &str,
303        prompt: Option<&str>,
304        tier_count: usize,
305    ) -> Result<Self, ZigError> {
306        let zig_session_id = Uuid::new_v4().to_string();
307
308        let sessions_dir = paths::ensure_project_sessions_dir(None)?;
309        let log_path = sessions_dir.join(format!("{zig_session_id}.jsonl"));
310
311        let file = OpenOptions::new()
312            .create(true)
313            .append(true)
314            .open(&log_path)
315            .map_err(|e| ZigError::Io(format!("failed to open {}: {e}", log_path.display())))?;
316
317        let writer = Self {
318            zig_session_id: zig_session_id.clone(),
319            log_path: log_path.clone(),
320            project_index_path: paths::project_index_path(None),
321            global_index_path: paths::global_sessions_index_path(),
322            inner: Mutex::new(WriterInner { file, seq: 0 }),
323        };
324
325        let cwd = std::env::current_dir()
326            .ok()
327            .map(|p| p.to_string_lossy().into_owned());
328        let workspace_path = paths::project_dir(None).map(|p| p.to_string_lossy().into_owned());
329        let started_at = now_rfc3339();
330
331        writer.emit(SessionEventKind::ZigSessionStarted {
332            workflow_name: workflow_name.to_string(),
333            workflow_path: workflow_path.to_string(),
334            workspace_path: workspace_path.clone(),
335            cwd,
336            prompt: prompt.map(str::to_string),
337            tier_count,
338        })?;
339
340        writer.upsert_indexes(
341            workflow_name,
342            workflow_path,
343            &workspace_path,
344            &started_at,
345            None,
346        )?;
347
348        Ok(writer)
349    }
350
351    /// The session id (UUID) for this writer.
352    pub fn session_id(&self) -> &str {
353        &self.zig_session_id
354    }
355
356    /// The on-disk log path.
357    pub fn log_path(&self) -> &Path {
358        &self.log_path
359    }
360
361    pub fn tier_started(&self, tier_index: usize, step_names: Vec<String>) -> Result<(), ZigError> {
362        self.emit(SessionEventKind::TierStarted {
363            tier_index,
364            step_names,
365        })
366    }
367
368    #[allow(clippy::too_many_arguments)]
369    pub fn step_started(
370        &self,
371        step_name: &str,
372        tier_index: usize,
373        zag_session_id: &str,
374        zag_command: &str,
375        model: Option<&str>,
376        prompt_preview: &str,
377    ) -> Result<(), ZigError> {
378        self.emit(SessionEventKind::StepStarted {
379            step_name: step_name.to_string(),
380            tier_index,
381            zag_session_id: zag_session_id.to_string(),
382            zag_command: zag_command.to_string(),
383            model: model.map(str::to_string),
384            prompt_preview: prompt_preview.to_string(),
385        })
386    }
387
388    pub fn step_output(
389        &self,
390        step_name: &str,
391        stream: OutputStream,
392        line: &str,
393    ) -> Result<(), ZigError> {
394        self.emit(SessionEventKind::StepOutput {
395            step_name: step_name.to_string(),
396            stream,
397            line: line.to_string(),
398        })
399    }
400
401    pub fn step_completed(
402        &self,
403        step_name: &str,
404        exit_code: i32,
405        duration_ms: u64,
406        saved_vars: Vec<String>,
407    ) -> Result<(), ZigError> {
408        self.emit(SessionEventKind::StepCompleted {
409            step_name: step_name.to_string(),
410            exit_code,
411            duration_ms,
412            saved_vars,
413        })
414    }
415
416    pub fn step_failed(
417        &self,
418        step_name: &str,
419        exit_code: Option<i32>,
420        attempt: u32,
421        error: &str,
422    ) -> Result<(), ZigError> {
423        self.emit(SessionEventKind::StepFailed {
424            step_name: step_name.to_string(),
425            exit_code,
426            attempt,
427            error: error.to_string(),
428        })
429    }
430
431    pub fn step_skipped(&self, step_name: &str, reason: &str) -> Result<(), ZigError> {
432        self.emit(SessionEventKind::StepSkipped {
433            step_name: step_name.to_string(),
434            reason: reason.to_string(),
435        })
436    }
437
438    pub fn heartbeat(&self) -> Result<(), ZigError> {
439        self.emit(SessionEventKind::Heartbeat {
440            interval_secs: HEARTBEAT_INTERVAL_SECS,
441        })
442    }
443
444    /// Emit `ZigSessionEnded` and stamp the indexes with `ended_at`/`status`.
445    pub fn ended(&self, status: SessionStatus, duration_ms: u64) -> Result<(), ZigError> {
446        self.emit(SessionEventKind::ZigSessionEnded {
447            status,
448            duration_ms,
449        })?;
450        self.stamp_ended(status)?;
451        Ok(())
452    }
453
454    fn emit(&self, kind: SessionEventKind) -> Result<(), ZigError> {
455        let mut inner = self
456            .inner
457            .lock()
458            .map_err(|_| ZigError::Io("session writer mutex poisoned".into()))?;
459        inner.seq += 1;
460        let event = SessionLogEvent {
461            seq: inner.seq,
462            ts: now_rfc3339(),
463            zig_session_id: self.zig_session_id.clone(),
464            kind,
465        };
466        let line = serde_json::to_string(&event)
467            .map_err(|e| ZigError::Io(format!("failed to serialize session event: {e}")))?;
468        writeln!(inner.file, "{line}")
469            .map_err(|e| ZigError::Io(format!("failed to write session event: {e}")))?;
470        inner
471            .file
472            .flush()
473            .map_err(|e| ZigError::Io(format!("failed to flush session log: {e}")))?;
474        Ok(())
475    }
476
477    fn upsert_indexes(
478        &self,
479        workflow_name: &str,
480        workflow_path: &str,
481        workspace_path: &Option<String>,
482        started_at: &str,
483        ended_at: Option<String>,
484    ) -> Result<(), ZigError> {
485        let log_path_str = self.log_path.to_string_lossy().into_owned();
486
487        if let Some(idx_path) = &self.project_index_path {
488            let mut index = load_project_index(idx_path);
489            index
490                .sessions
491                .retain(|e| e.zig_session_id != self.zig_session_id);
492            index.sessions.push(SessionLogIndexEntry {
493                zig_session_id: self.zig_session_id.clone(),
494                workflow_name: workflow_name.to_string(),
495                workflow_path: workflow_path.to_string(),
496                log_path: log_path_str.clone(),
497                started_at: started_at.to_string(),
498                ended_at,
499                status: None,
500                workspace_path: workspace_path.clone(),
501            });
502            save_project_index(idx_path, &index)?;
503        }
504
505        if let Some(idx_path) = &self.global_index_path {
506            let mut index = load_global_index(idx_path);
507            index
508                .sessions
509                .retain(|e| e.zig_session_id != self.zig_session_id);
510            index.sessions.push(GlobalSessionEntry {
511                zig_session_id: self.zig_session_id.clone(),
512                workflow_name: workflow_name.to_string(),
513                project: workspace_path.clone().unwrap_or_default(),
514                log_path: log_path_str,
515                started_at: started_at.to_string(),
516                ended_at: None,
517                status: None,
518            });
519            save_global_index(idx_path, &index)?;
520        }
521
522        Ok(())
523    }
524
525    fn stamp_ended(&self, status: SessionStatus) -> Result<(), ZigError> {
526        let ended_at = now_rfc3339();
527
528        if let Some(idx_path) = &self.project_index_path {
529            let mut index = load_project_index(idx_path);
530            for entry in &mut index.sessions {
531                if entry.zig_session_id == self.zig_session_id {
532                    entry.ended_at = Some(ended_at.clone());
533                    entry.status = Some(status);
534                }
535            }
536            save_project_index(idx_path, &index)?;
537        }
538
539        if let Some(idx_path) = &self.global_index_path {
540            let mut index = load_global_index(idx_path);
541            for entry in &mut index.sessions {
542                if entry.zig_session_id == self.zig_session_id {
543                    entry.ended_at = Some(ended_at.clone());
544                    entry.status = Some(status);
545                }
546            }
547            save_global_index(idx_path, &index)?;
548        }
549
550        Ok(())
551    }
552}
553
554// ---------------------------------------------------------------------
555// Coordinator
556// ---------------------------------------------------------------------
557
558/// Wraps a `SessionWriter` in an `Arc` and runs a background thread that
559/// emits a `Heartbeat` event every 10 seconds. The handle's `Drop` impl
560/// stops the heartbeat thread and stamps `ended_at` defensively if
561/// `finish()` was never called (crash/panic safety).
562///
563/// Mirrors zag's `SessionLogCoordinator`
564/// (`zag-agent/src/session_log.rs:565`).
565pub struct SessionCoordinator {
566    writer: Arc<SessionWriter>,
567    started: Instant,
568    stop_flag: Arc<AtomicBool>,
569    heartbeat: Option<JoinHandle<()>>,
570    finished: bool,
571}
572
573impl SessionCoordinator {
574    pub fn start(writer: SessionWriter) -> Self {
575        let writer = Arc::new(writer);
576        let stop_flag = Arc::new(AtomicBool::new(false));
577
578        let hb_writer = Arc::clone(&writer);
579        let hb_stop = Arc::clone(&stop_flag);
580        let heartbeat = thread::spawn(move || {
581            let interval = Duration::from_secs(HEARTBEAT_INTERVAL_SECS);
582            // Sleep in short ticks so shutdown is responsive.
583            let tick = Duration::from_millis(200);
584            let mut elapsed = Duration::ZERO;
585            while !hb_stop.load(Ordering::Relaxed) {
586                thread::sleep(tick);
587                elapsed += tick;
588                if elapsed >= interval {
589                    elapsed = Duration::ZERO;
590                    let _ = hb_writer.heartbeat();
591                }
592            }
593        });
594
595        Self {
596            writer,
597            started: Instant::now(),
598            stop_flag,
599            heartbeat: Some(heartbeat),
600            finished: false,
601        }
602    }
603
604    pub fn writer(&self) -> Arc<SessionWriter> {
605        Arc::clone(&self.writer)
606    }
607
608    /// Mark the session ended cleanly. Stops the heartbeat thread and
609    /// emits `ZigSessionEnded`.
610    pub fn finish(mut self, status: SessionStatus) -> Result<(), ZigError> {
611        self.stop_flag.store(true, Ordering::Relaxed);
612        if let Some(h) = self.heartbeat.take() {
613            let _ = h.join();
614        }
615        let duration_ms = self.started.elapsed().as_millis() as u64;
616        self.writer.ended(status, duration_ms)?;
617        self.finished = true;
618        Ok(())
619    }
620}
621
622impl Drop for SessionCoordinator {
623    fn drop(&mut self) {
624        if self.finished {
625            return;
626        }
627        // Crash/panic path: stop heartbeat and best-effort stamp the
628        // indexes so `--latest`/`--active` resolution stays consistent.
629        self.stop_flag.store(true, Ordering::Relaxed);
630        if let Some(h) = self.heartbeat.take() {
631            let _ = h.join();
632        }
633        let duration_ms = self.started.elapsed().as_millis() as u64;
634        let _ = self.writer.ended(SessionStatus::Failure, duration_ms);
635    }
636}
637
638fn now_rfc3339() -> String {
639    Utc::now().to_rfc3339()
640}
641
642#[cfg(test)]
643#[path = "session_tests.rs"]
644mod tests;