1use std::fs::{File, OpenOptions};
27use std::io::Write;
28use std::path::{Path, PathBuf};
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use chrono::Utc;
35use serde::{Deserialize, Serialize};
36use uuid::Uuid;
37
38use crate::error::ZigError;
39use crate::paths;
40
41const HEARTBEAT_INTERVAL_SECS: u64 = 10;
44
45#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum OutputStream {
49 Stdout,
50 Stderr,
51}
52
53#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
55#[serde(rename_all = "snake_case")]
56pub enum SessionStatus {
57 Success,
58 Failure,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum SessionEventKind {
69 ZigSessionStarted {
70 workflow_name: String,
71 workflow_path: String,
72 workspace_path: Option<String>,
73 cwd: Option<String>,
74 prompt: Option<String>,
75 tier_count: usize,
76 },
77 TierStarted {
78 tier_index: usize,
79 step_names: Vec<String>,
80 },
81 StepStarted {
82 step_name: String,
83 tier_index: usize,
84 zag_session_id: String,
85 zag_command: String,
86 model: Option<String>,
87 prompt_preview: String,
88 },
89 StepOutput {
90 step_name: String,
91 stream: OutputStream,
92 line: String,
93 },
94 StepCompleted {
95 step_name: String,
96 exit_code: i32,
97 duration_ms: u64,
98 saved_vars: Vec<String>,
99 },
100 StepFailed {
101 step_name: String,
102 exit_code: Option<i32>,
103 attempt: u32,
104 error: String,
105 },
106 StepSkipped {
107 step_name: String,
108 reason: String,
109 },
110 Heartbeat {
113 interval_secs: u64,
114 },
115 ZigSessionEnded {
116 status: SessionStatus,
117 duration_ms: u64,
118 },
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SessionLogEvent {
126 pub seq: u64,
127 pub ts: String,
128 pub zig_session_id: String,
129 #[serde(flatten)]
130 pub kind: SessionEventKind,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct SessionLogIndexEntry {
138 pub zig_session_id: String,
139 pub workflow_name: String,
140 pub workflow_path: String,
141 pub log_path: String,
142 pub started_at: String,
143 #[serde(default)]
144 pub ended_at: Option<String>,
145 #[serde(default)]
146 pub status: Option<SessionStatus>,
147 #[serde(default)]
148 pub workspace_path: Option<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, Default)]
152pub struct SessionLogIndex {
153 pub sessions: Vec<SessionLogIndexEntry>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct GlobalSessionEntry {
161 pub zig_session_id: String,
162 pub workflow_name: String,
163 pub project: String,
164 pub log_path: String,
165 pub started_at: String,
166 #[serde(default)]
167 pub ended_at: Option<String>,
168 #[serde(default)]
169 pub status: Option<SessionStatus>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, Default)]
173pub struct GlobalSessionIndex {
174 pub sessions: Vec<GlobalSessionEntry>,
175}
176
177pub fn load_project_index(path: &Path) -> SessionLogIndex {
182 if !path.exists() {
183 return SessionLogIndex::default();
184 }
185 std::fs::read_to_string(path)
186 .ok()
187 .and_then(|s| serde_json::from_str(&s).ok())
188 .unwrap_or_default()
189}
190
191pub fn save_project_index(path: &Path, index: &SessionLogIndex) -> Result<(), ZigError> {
192 if let Some(parent) = path.parent() {
193 std::fs::create_dir_all(parent)
194 .map_err(|e| ZigError::Io(format!("failed to create {}: {e}", parent.display())))?;
195 }
196 let json = serde_json::to_string_pretty(index)
197 .map_err(|e| ZigError::Io(format!("failed to serialize project index: {e}")))?;
198 std::fs::write(path, json)
199 .map_err(|e| ZigError::Io(format!("failed to write {}: {e}", path.display())))?;
200 Ok(())
201}
202
203pub fn load_global_index(path: &Path) -> GlobalSessionIndex {
204 if !path.exists() {
205 return GlobalSessionIndex::default();
206 }
207 std::fs::read_to_string(path)
208 .ok()
209 .and_then(|s| serde_json::from_str(&s).ok())
210 .unwrap_or_default()
211}
212
213pub fn save_global_index(path: &Path, index: &GlobalSessionIndex) -> Result<(), ZigError> {
214 if let Some(parent) = path.parent() {
215 std::fs::create_dir_all(parent)
216 .map_err(|e| ZigError::Io(format!("failed to create {}: {e}", parent.display())))?;
217 }
218 let json = serde_json::to_string_pretty(index)
219 .map_err(|e| ZigError::Io(format!("failed to serialize global index: {e}")))?;
220 std::fs::write(path, json)
221 .map_err(|e| ZigError::Io(format!("failed to write {}: {e}", path.display())))?;
222 Ok(())
223}
224
225pub fn list_sessions() -> Result<Vec<SessionLogIndexEntry>, ZigError> {
231 let index_path = paths::project_index_path(None)
232 .ok_or_else(|| ZigError::Io("HOME environment variable not set".into()))?;
233 let index = load_project_index(&index_path);
234 Ok(index.sessions)
235}
236
237pub fn read_session_events(log_path: &Path) -> Result<Vec<SessionLogEvent>, ZigError> {
239 let content = std::fs::read_to_string(log_path)
240 .map_err(|e| ZigError::Io(format!("failed to read {}: {e}", log_path.display())))?;
241
242 let mut events = Vec::new();
243 for line in content.lines() {
244 let line = line.trim();
245 if line.is_empty() {
246 continue;
247 }
248 let event: SessionLogEvent = serde_json::from_str(line)
249 .map_err(|e| ZigError::Parse(format!("failed to parse session event: {e}")))?;
250 events.push(event);
251 }
252 Ok(events)
253}
254
255pub fn find_session(session_id: &str) -> Result<SessionLogIndexEntry, ZigError> {
257 let sessions = list_sessions()?;
258 let matches: Vec<_> = sessions
259 .into_iter()
260 .filter(|s| s.zig_session_id.starts_with(session_id))
261 .collect();
262
263 match matches.len() {
264 0 => Err(ZigError::Io(format!("session not found: {session_id}"))),
265 1 => Ok(matches.into_iter().next().unwrap()),
266 _ => Err(ZigError::Io(format!(
267 "ambiguous session prefix '{session_id}' matches {} sessions",
268 matches.len()
269 ))),
270 }
271}
272
273pub struct SessionWriter {
284 zig_session_id: String,
285 log_path: PathBuf,
286 project_index_path: Option<PathBuf>,
287 global_index_path: Option<PathBuf>,
288 inner: Mutex<WriterInner>,
289}
290
291struct WriterInner {
292 file: File,
293 seq: u64,
294}
295
296impl SessionWriter {
297 pub fn create(
301 workflow_name: &str,
302 workflow_path: &str,
303 prompt: Option<&str>,
304 tier_count: usize,
305 ) -> Result<Self, ZigError> {
306 let zig_session_id = Uuid::new_v4().to_string();
307
308 let sessions_dir = paths::ensure_project_sessions_dir(None)?;
309 let log_path = sessions_dir.join(format!("{zig_session_id}.jsonl"));
310
311 let file = OpenOptions::new()
312 .create(true)
313 .append(true)
314 .open(&log_path)
315 .map_err(|e| ZigError::Io(format!("failed to open {}: {e}", log_path.display())))?;
316
317 let writer = Self {
318 zig_session_id: zig_session_id.clone(),
319 log_path: log_path.clone(),
320 project_index_path: paths::project_index_path(None),
321 global_index_path: paths::global_sessions_index_path(),
322 inner: Mutex::new(WriterInner { file, seq: 0 }),
323 };
324
325 let cwd = std::env::current_dir()
326 .ok()
327 .map(|p| p.to_string_lossy().into_owned());
328 let workspace_path = paths::project_dir(None).map(|p| p.to_string_lossy().into_owned());
329 let started_at = now_rfc3339();
330
331 writer.emit(SessionEventKind::ZigSessionStarted {
332 workflow_name: workflow_name.to_string(),
333 workflow_path: workflow_path.to_string(),
334 workspace_path: workspace_path.clone(),
335 cwd,
336 prompt: prompt.map(str::to_string),
337 tier_count,
338 })?;
339
340 writer.upsert_indexes(
341 workflow_name,
342 workflow_path,
343 &workspace_path,
344 &started_at,
345 None,
346 )?;
347
348 Ok(writer)
349 }
350
351 pub fn session_id(&self) -> &str {
353 &self.zig_session_id
354 }
355
356 pub fn log_path(&self) -> &Path {
358 &self.log_path
359 }
360
361 pub fn tier_started(&self, tier_index: usize, step_names: Vec<String>) -> Result<(), ZigError> {
362 self.emit(SessionEventKind::TierStarted {
363 tier_index,
364 step_names,
365 })
366 }
367
368 #[allow(clippy::too_many_arguments)]
369 pub fn step_started(
370 &self,
371 step_name: &str,
372 tier_index: usize,
373 zag_session_id: &str,
374 zag_command: &str,
375 model: Option<&str>,
376 prompt_preview: &str,
377 ) -> Result<(), ZigError> {
378 self.emit(SessionEventKind::StepStarted {
379 step_name: step_name.to_string(),
380 tier_index,
381 zag_session_id: zag_session_id.to_string(),
382 zag_command: zag_command.to_string(),
383 model: model.map(str::to_string),
384 prompt_preview: prompt_preview.to_string(),
385 })
386 }
387
388 pub fn step_output(
389 &self,
390 step_name: &str,
391 stream: OutputStream,
392 line: &str,
393 ) -> Result<(), ZigError> {
394 self.emit(SessionEventKind::StepOutput {
395 step_name: step_name.to_string(),
396 stream,
397 line: line.to_string(),
398 })
399 }
400
401 pub fn step_completed(
402 &self,
403 step_name: &str,
404 exit_code: i32,
405 duration_ms: u64,
406 saved_vars: Vec<String>,
407 ) -> Result<(), ZigError> {
408 self.emit(SessionEventKind::StepCompleted {
409 step_name: step_name.to_string(),
410 exit_code,
411 duration_ms,
412 saved_vars,
413 })
414 }
415
416 pub fn step_failed(
417 &self,
418 step_name: &str,
419 exit_code: Option<i32>,
420 attempt: u32,
421 error: &str,
422 ) -> Result<(), ZigError> {
423 self.emit(SessionEventKind::StepFailed {
424 step_name: step_name.to_string(),
425 exit_code,
426 attempt,
427 error: error.to_string(),
428 })
429 }
430
431 pub fn step_skipped(&self, step_name: &str, reason: &str) -> Result<(), ZigError> {
432 self.emit(SessionEventKind::StepSkipped {
433 step_name: step_name.to_string(),
434 reason: reason.to_string(),
435 })
436 }
437
438 pub fn heartbeat(&self) -> Result<(), ZigError> {
439 self.emit(SessionEventKind::Heartbeat {
440 interval_secs: HEARTBEAT_INTERVAL_SECS,
441 })
442 }
443
444 pub fn ended(&self, status: SessionStatus, duration_ms: u64) -> Result<(), ZigError> {
446 self.emit(SessionEventKind::ZigSessionEnded {
447 status,
448 duration_ms,
449 })?;
450 self.stamp_ended(status)?;
451 Ok(())
452 }
453
454 fn emit(&self, kind: SessionEventKind) -> Result<(), ZigError> {
455 let mut inner = self
456 .inner
457 .lock()
458 .map_err(|_| ZigError::Io("session writer mutex poisoned".into()))?;
459 inner.seq += 1;
460 let event = SessionLogEvent {
461 seq: inner.seq,
462 ts: now_rfc3339(),
463 zig_session_id: self.zig_session_id.clone(),
464 kind,
465 };
466 let line = serde_json::to_string(&event)
467 .map_err(|e| ZigError::Io(format!("failed to serialize session event: {e}")))?;
468 writeln!(inner.file, "{line}")
469 .map_err(|e| ZigError::Io(format!("failed to write session event: {e}")))?;
470 inner
471 .file
472 .flush()
473 .map_err(|e| ZigError::Io(format!("failed to flush session log: {e}")))?;
474 Ok(())
475 }
476
477 fn upsert_indexes(
478 &self,
479 workflow_name: &str,
480 workflow_path: &str,
481 workspace_path: &Option<String>,
482 started_at: &str,
483 ended_at: Option<String>,
484 ) -> Result<(), ZigError> {
485 let log_path_str = self.log_path.to_string_lossy().into_owned();
486
487 if let Some(idx_path) = &self.project_index_path {
488 let mut index = load_project_index(idx_path);
489 index
490 .sessions
491 .retain(|e| e.zig_session_id != self.zig_session_id);
492 index.sessions.push(SessionLogIndexEntry {
493 zig_session_id: self.zig_session_id.clone(),
494 workflow_name: workflow_name.to_string(),
495 workflow_path: workflow_path.to_string(),
496 log_path: log_path_str.clone(),
497 started_at: started_at.to_string(),
498 ended_at,
499 status: None,
500 workspace_path: workspace_path.clone(),
501 });
502 save_project_index(idx_path, &index)?;
503 }
504
505 if let Some(idx_path) = &self.global_index_path {
506 let mut index = load_global_index(idx_path);
507 index
508 .sessions
509 .retain(|e| e.zig_session_id != self.zig_session_id);
510 index.sessions.push(GlobalSessionEntry {
511 zig_session_id: self.zig_session_id.clone(),
512 workflow_name: workflow_name.to_string(),
513 project: workspace_path.clone().unwrap_or_default(),
514 log_path: log_path_str,
515 started_at: started_at.to_string(),
516 ended_at: None,
517 status: None,
518 });
519 save_global_index(idx_path, &index)?;
520 }
521
522 Ok(())
523 }
524
525 fn stamp_ended(&self, status: SessionStatus) -> Result<(), ZigError> {
526 let ended_at = now_rfc3339();
527
528 if let Some(idx_path) = &self.project_index_path {
529 let mut index = load_project_index(idx_path);
530 for entry in &mut index.sessions {
531 if entry.zig_session_id == self.zig_session_id {
532 entry.ended_at = Some(ended_at.clone());
533 entry.status = Some(status);
534 }
535 }
536 save_project_index(idx_path, &index)?;
537 }
538
539 if let Some(idx_path) = &self.global_index_path {
540 let mut index = load_global_index(idx_path);
541 for entry in &mut index.sessions {
542 if entry.zig_session_id == self.zig_session_id {
543 entry.ended_at = Some(ended_at.clone());
544 entry.status = Some(status);
545 }
546 }
547 save_global_index(idx_path, &index)?;
548 }
549
550 Ok(())
551 }
552}
553
554pub struct SessionCoordinator {
566 writer: Arc<SessionWriter>,
567 started: Instant,
568 stop_flag: Arc<AtomicBool>,
569 heartbeat: Option<JoinHandle<()>>,
570 finished: bool,
571}
572
573impl SessionCoordinator {
574 pub fn start(writer: SessionWriter) -> Self {
575 let writer = Arc::new(writer);
576 let stop_flag = Arc::new(AtomicBool::new(false));
577
578 let hb_writer = Arc::clone(&writer);
579 let hb_stop = Arc::clone(&stop_flag);
580 let heartbeat = thread::spawn(move || {
581 let interval = Duration::from_secs(HEARTBEAT_INTERVAL_SECS);
582 let tick = Duration::from_millis(200);
584 let mut elapsed = Duration::ZERO;
585 while !hb_stop.load(Ordering::Relaxed) {
586 thread::sleep(tick);
587 elapsed += tick;
588 if elapsed >= interval {
589 elapsed = Duration::ZERO;
590 let _ = hb_writer.heartbeat();
591 }
592 }
593 });
594
595 Self {
596 writer,
597 started: Instant::now(),
598 stop_flag,
599 heartbeat: Some(heartbeat),
600 finished: false,
601 }
602 }
603
604 pub fn writer(&self) -> Arc<SessionWriter> {
605 Arc::clone(&self.writer)
606 }
607
608 pub fn finish(mut self, status: SessionStatus) -> Result<(), ZigError> {
611 self.stop_flag.store(true, Ordering::Relaxed);
612 if let Some(h) = self.heartbeat.take() {
613 let _ = h.join();
614 }
615 let duration_ms = self.started.elapsed().as_millis() as u64;
616 self.writer.ended(status, duration_ms)?;
617 self.finished = true;
618 Ok(())
619 }
620}
621
622impl Drop for SessionCoordinator {
623 fn drop(&mut self) {
624 if self.finished {
625 return;
626 }
627 self.stop_flag.store(true, Ordering::Relaxed);
630 if let Some(h) = self.heartbeat.take() {
631 let _ = h.join();
632 }
633 let duration_ms = self.started.elapsed().as_millis() as u64;
634 let _ = self.writer.ended(SessionStatus::Failure, duration_ms);
635 }
636}
637
638fn now_rfc3339() -> String {
639 Utc::now().to_rfc3339()
640}
641
642#[cfg(test)]
643#[path = "session_tests.rs"]
644mod tests;