use super::*;
use crate::usage_resume_store::PendingResume;
use chrono::Duration;
use std::sync::Mutex;
use tempfile::TempDir;
use zag_agent::session_log::{LogCompleteness, SessionLogMetadata, SessionLogWriter};
struct CapturingStrategy {
captured: Arc<Mutex<Vec<(String, String, u32)>>>,
should_fail: bool,
}
impl ResumeStrategy for CapturingStrategy {
fn resume<'a>(
&'a self,
session_id: &'a str,
message: &'a str,
attempt: u32,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
let captured = Arc::clone(&self.captured);
let session_id = session_id.to_string();
let message = message.to_string();
let should_fail = self.should_fail;
Box::pin(async move {
captured
.lock()
.unwrap()
.push((session_id, message, attempt));
if should_fail {
Err(anyhow::anyhow!("simulated resume failure"))
} else {
Ok(())
}
})
}
}
fn make_writer(dir: &TempDir, provider: &str) -> Arc<SessionLogWriter> {
let writer = SessionLogWriter::create(
dir.path(),
SessionLogMetadata {
provider: provider.to_string(),
wrapper_session_id: "test-session".to_string(),
provider_session_id: None,
workspace_path: None,
command: "test".to_string(),
model: None,
resumed: false,
backfilled: false,
},
)
.expect("create writer");
writer.set_completeness(LogCompleteness::Full).unwrap();
Arc::new(writer)
}
fn find_jsonl(dir: &std::path::Path) -> Option<std::path::PathBuf> {
for entry in std::fs::read_dir(dir).ok()?.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(found) = find_jsonl(&path) {
return Some(found);
}
} else if path.extension().is_some_and(|x| x == "jsonl") {
return Some(path);
}
}
None
}
fn read_events(dir: &TempDir) -> Vec<serde_json::Value> {
use std::io::BufRead;
let path = find_jsonl(dir.path()).expect("a jsonl log exists under dir");
let file = std::fs::File::open(&path).unwrap();
std::io::BufReader::new(file)
.lines()
.map_while(|l| l.ok())
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str(&l).ok())
.collect()
}
fn make_pending(
session_id: &str,
when: DateTime<Utc>,
incident_id: &str,
attempt: u32,
) -> PendingResume {
PendingResume {
incident_id: incident_id.to_string(),
session_id: session_id.to_string(),
provider: "claude".to_string(),
model: None,
root: None,
when,
message: "Continue".to_string(),
attempt,
log_path: std::path::PathBuf::from("/tmp/test.jsonl"),
}
}
#[tokio::test]
async fn schedule_resume_fires_strategy_after_wait() {
let dir = TempDir::new().unwrap();
let writer = make_writer(&dir, "claude");
let captured = Arc::new(Mutex::new(Vec::new()));
let strategy: Arc<dyn ResumeStrategy> = Arc::new(CapturingStrategy {
captured: Arc::clone(&captured),
should_fail: false,
});
let when = Utc::now() + Duration::milliseconds(150);
let handle = schedule_resume(
make_pending("test-session", when, "incident-1", 1),
Arc::clone(&writer),
strategy,
);
handle.await.unwrap();
let calls = captured.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "test-session");
assert_eq!(calls[0].1, "Continue");
assert_eq!(calls[0].2, 1);
let events = read_events(&dir);
let resumed_count = events
.iter()
.filter(|e| e.get("type").and_then(|v| v.as_str()) == Some("usage_limit_resumed"))
.count();
assert_eq!(resumed_count, 1, "expected one UsageLimitResumed event");
}
#[tokio::test]
async fn schedule_resume_emits_failed_event_on_error() {
let dir = TempDir::new().unwrap();
let writer = make_writer(&dir, "codex");
let captured = Arc::new(Mutex::new(Vec::new()));
let strategy: Arc<dyn ResumeStrategy> = Arc::new(CapturingStrategy {
captured: Arc::clone(&captured),
should_fail: true,
});
let when = Utc::now() + Duration::milliseconds(50);
schedule_resume(
make_pending("test-session", when, "incident-2", 3),
Arc::clone(&writer),
strategy,
)
.await
.unwrap();
let events = read_events(&dir);
let failed = events
.iter()
.find(|e| e.get("type").and_then(|v| v.as_str()) == Some("usage_limit_resume_failed"))
.expect("expected a UsageLimitResumeFailed event");
assert_eq!(
failed.get("incident_id").and_then(|v| v.as_str()),
Some("incident-2")
);
assert_eq!(failed.get("attempt").and_then(|v| v.as_u64()), Some(3));
assert!(
failed
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("")
.contains("simulated resume failure")
);
}
#[tokio::test]
async fn strategy_for_claude_uses_fifo_path() {
let s = strategy_for("claude", None, None);
let err = s
.resume("nonexistent-session", "Continue", 1)
.await
.unwrap_err()
.to_string();
let lower = err.to_lowercase();
assert!(
lower.contains("fifo") || lower.contains("relay") || lower.contains("interactive"),
"expected FIFO-path error, got: {err}"
);
}
#[tokio::test]
async fn strategy_for_codex_uses_respawn_path() {
let s = strategy_for("codex", None, None);
let err = s
.resume("nonexistent-session", "Continue", 1)
.await
.unwrap_err()
.to_string();
let lower = err.to_lowercase();
assert!(
lower.contains("session") && (lower.contains("store") || lower.contains("not found")),
"expected respawn-path error, got: {err}"
);
}
use std::collections::HashMap;
use zag_agent::output::{AgentOutput, ContentBlock, Event};
use zag_agent::usage_limits::UsageLimitConfig;
fn empty_output(provider: &str) -> AgentOutput {
AgentOutput {
agent: provider.to_string(),
session_id: String::new(),
events: Vec::new(),
result: None,
is_error: false,
exit_code: None,
error_message: None,
total_cost_usd: None,
usage: None,
model: None,
provider: Some(provider.to_string()),
log_path: None,
}
}
#[test]
fn find_usage_limit_recognizes_explicit_detected_event() {
let cfg = UsageLimitConfig::default();
let mut out = empty_output("claude");
out.events.push(Event::UsageLimitDetected {
provider: "claude".to_string(),
scope: "weekly".to_string(),
reset_at: Some("2030-01-01T00:00:00Z".to_string()),
raw: Some("Claude AI weekly usage limit reached|1893456000".to_string()),
});
let hit = find_usage_limit_in_output(&out, "claude", &cfg).expect("should detect");
assert_eq!(hit.provider, "claude");
assert_eq!(hit.scope.as_str(), "weekly");
assert!(hit.reset_at.is_some());
}
#[test]
fn find_usage_limit_falls_back_to_text_scan_for_codex() {
let cfg = UsageLimitConfig::default();
let mut out = empty_output("codex");
out.result = Some(
"You've hit your usage limit. Please try again at Mar 20th, 2030 3:36 PM.".to_string(),
);
out.events.push(Event::Result {
success: false,
message: out.result.clone(),
duration_ms: None,
num_turns: None,
});
let hit = find_usage_limit_in_output(&out, "codex", &cfg).expect("should detect via text scan");
assert_eq!(hit.provider, "codex");
assert!(hit.reset_at.is_some());
}
#[test]
fn find_usage_limit_returns_none_when_no_signal() {
let cfg = UsageLimitConfig::default();
let mut out = empty_output("codex");
out.result = Some("All good, no limits hit".to_string());
out.events.push(Event::AssistantMessage {
content: vec![ContentBlock::Text {
text: "hello world".to_string(),
}],
usage: None,
parent_tool_use_id: None,
});
assert!(find_usage_limit_in_output(&out, "codex", &cfg).is_none());
}
#[test]
fn find_usage_limit_skips_unknown_providers() {
let cfg = UsageLimitConfig::default();
let mut out = empty_output("ollama");
out.result = Some(
"Claude AI usage limit reached|1893456000 — but this is an ollama session".to_string(),
);
out.events.push(Event::Result {
success: false,
message: out.result.clone(),
duration_ms: None,
num_turns: None,
});
assert!(find_usage_limit_in_output(&out, "ollama", &cfg).is_none());
}
#[test]
fn extract_provider_session_id_prefers_output_field() {
let mut out = empty_output("codex");
out.session_id = "thread-abc".to_string();
out.events.push(Event::Init {
model: "x".to_string(),
tools: vec![],
working_directory: None,
metadata: HashMap::new(),
});
assert_eq!(
extract_provider_session_id(&out),
Some("thread-abc".to_string())
);
}
#[test]
fn extract_provider_session_id_falls_back_to_init_metadata() {
let mut out = empty_output("claude");
out.session_id = "unknown".to_string(); let mut meta = HashMap::new();
meta.insert(
"session_id".to_string(),
serde_json::Value::String("claude-sid-1".to_string()),
);
out.events.push(Event::Init {
model: "claude-sonnet".to_string(),
tools: vec![],
working_directory: None,
metadata: meta,
});
assert_eq!(
extract_provider_session_id(&out),
Some("claude-sid-1".to_string())
);
}
#[test]
fn extract_provider_session_id_returns_none_when_absent() {
let out = empty_output("codex");
assert_eq!(extract_provider_session_id(&out), None);
}
#[test]
fn find_usage_limit_pulls_text_from_assistant_messages_too() {
let cfg = UsageLimitConfig::default();
let mut out = empty_output("claude");
out.events.push(Event::AssistantMessage {
content: vec![ContentBlock::Text {
text: "Claude AI usage limit reached|1893456000".to_string(),
}],
usage: None,
parent_tool_use_id: None,
});
let hit = find_usage_limit_in_output(&out, "claude", &cfg).expect("should detect");
assert_eq!(hit.reset_at.unwrap().timestamp(), 1_893_456_000);
}