use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PendingResume {
pub incident_id: String,
pub session_id: String,
pub provider: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub root: Option<String>,
pub when: DateTime<Utc>,
pub message: String,
pub attempt: u32,
pub log_path: PathBuf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompleteStatus {
Resumed,
Failed,
}
impl CompleteStatus {
fn as_str(self) -> &'static str {
match self {
Self::Resumed => "resumed",
Self::Failed => "failed",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
enum Record {
Schedule(PendingResume),
Complete {
incident_id: String,
status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
error: Option<String>,
},
Cancel {
incident_id: String,
},
}
pub fn store_path(root: Option<&str>) -> PathBuf {
zag_agent::config::Config::agent_dir(root).join("scheduled_resumes.jsonl")
}
fn append_record(root: Option<&str>, rec: &Record) -> Result<()> {
let path = store_path(root);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating parent directory for {}", path.display()))?;
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.with_context(|| format!("opening {} for append", path.display()))?;
let mut line = serde_json::to_string(rec).context("serializing resume record")?;
line.push('\n');
file.write_all(line.as_bytes())
.with_context(|| format!("writing to {}", path.display()))?;
file.sync_data().ok();
Ok(())
}
pub fn record_pending(root: Option<&str>, pending: &PendingResume) -> Result<()> {
append_record(root, &Record::Schedule(pending.clone()))
}
pub fn record_complete(
root: Option<&str>,
incident_id: &str,
status: CompleteStatus,
error: Option<&str>,
) -> Result<()> {
append_record(
root,
&Record::Complete {
incident_id: incident_id.to_string(),
status: status.as_str().to_string(),
error: error.map(str::to_string),
},
)
}
pub fn record_cancel(root: Option<&str>, incident_id: &str) -> Result<()> {
append_record(
root,
&Record::Cancel {
incident_id: incident_id.to_string(),
},
)
}
pub fn list_pending(root: Option<&str>) -> Result<Vec<PendingResume>> {
let path = store_path(root);
list_pending_at(&path)
}
pub fn list_pending_at(path: &Path) -> Result<Vec<PendingResume>> {
if !path.exists() {
return Ok(Vec::new());
}
let file = File::open(path).with_context(|| format!("opening {}", path.display()))?;
let reader = BufReader::new(&file);
let mut scheduled: HashMap<String, PendingResume> = HashMap::new();
let mut completed: HashSet<String> = HashSet::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let Ok(rec) = serde_json::from_str::<Record>(&line) else {
log::warn!("skipping malformed resume record: {line}");
continue;
};
match rec {
Record::Schedule(p) => {
scheduled.insert(p.incident_id.clone(), p);
}
Record::Complete { incident_id, .. } | Record::Cancel { incident_id } => {
completed.insert(incident_id);
}
}
}
let mut pending: Vec<PendingResume> = scheduled
.into_iter()
.filter(|(id, _)| !completed.contains(id))
.map(|(_, p)| p)
.collect();
pending.sort_by_key(|p| p.when);
Ok(pending)
}
#[cfg(test)]
#[path = "usage_resume_store_tests.rs"]
mod tests;