#![allow(dead_code)]
use rusqlite::Connection;
use crate::session::{ClaudeSession, SessionStatus};
use crate::terminals;
use super::store;
use super::types::*;
pub fn deliver_pending(conn: &Connection, sessions: &[ClaudeSession]) -> Vec<(String, String)> {
let _ = store::expire_stale_interrupts(conn);
let _ = store::expire_exhausted_interrupts(conn);
let interrupts = match store::list_deliverable_interrupts(conn) {
Ok(list) => list,
Err(e) => {
crate::logger::log("INTERRUPT_BUS", &format!("Failed to list interrupts: {e}"));
return Vec::new();
}
};
let mut results = Vec::new();
for interrupt in &interrupts {
let session = sessions
.iter()
.find(|s| s.session_id == interrupt.target_session_id);
let Some(session) = session else {
continue;
};
if !can_deliver(interrupt, session) {
continue;
}
let message = format_interrupt_message(interrupt);
match terminals::send_input(session, &message) {
Ok(()) => {
let _ = store::mark_interrupt_delivered(conn, &interrupt.id);
let _ = store::append_event(
conn,
&CoordEvent {
id: None,
event_type: EventType::InterruptDelivered,
timestamp: crate::logger::timestamp_now(),
session_id: Some(interrupt.target_session_id.clone()),
payload: serde_json::json!({
"interrupt_id": interrupt.id,
"type": interrupt.interrupt_type.as_str(),
}),
},
);
results.push((
interrupt.id.clone(),
format!(
"Interrupt delivered: {} ({}) -> {}",
interrupt.interrupt_type,
interrupt.priority,
session.display_name()
),
));
}
Err(e) => {
crate::logger::log(
"INTERRUPT_BUS",
&format!("Delivery failed for {}: {e}", interrupt.id),
);
match store::record_interrupt_delivery_failure(conn, &interrupt.id) {
Ok(Some(updated)) if updated.state == InterruptState::Expired => {
results.push((
interrupt.id.clone(),
format!(
"Interrupt expired after {}/{} delivery attempts: {}",
updated.retry_count, updated.max_retries, interrupt.id
),
));
}
Ok(Some(updated)) => {
let next = updated.next_retry_at.as_deref().unwrap_or("next tick");
results.push((
interrupt.id.clone(),
format!(
"Interrupt delivery failed ({}/{}); retry after {next}",
updated.retry_count, updated.max_retries
),
));
}
Ok(None) => {}
Err(err) => crate::logger::log(
"INTERRUPT_BUS",
&format!("Failed to record retry for {}: {err}", interrupt.id),
),
}
}
}
}
results
}
fn can_deliver(interrupt: &Interrupt, session: &ClaudeSession) -> bool {
match interrupt.delivery_mode.as_str() {
"immediate" => true,
"safe_boundary" => {
session.status != SessionStatus::Processing || session.pending_tool_name.is_none()
}
"waiting_only" => session.status == SessionStatus::WaitingInput,
"manual_review" => false, _ => {
session.status != SessionStatus::Processing || session.pending_tool_name.is_none()
}
}
}
fn format_interrupt_message(interrupt: &Interrupt) -> String {
let mut msg = format!(
"[Interrupt: {}] {}\nPriority: {}",
interrupt.interrupt_type, interrupt.reason, interrupt.priority
);
if let Some(ref payload) = interrupt.payload {
if let Some(obj) = payload.as_object() {
for (key, value) in obj {
let val_str = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
msg.push_str(&format!("\n{key}: {val_str}"));
}
}
}
msg
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::RawSession;
fn test_session(status: SessionStatus) -> ClaudeSession {
let mut s = ClaudeSession::from_raw(RawSession {
pid: 1,
session_id: "s1".into(),
cwd: "/tmp".into(),
started_at: 0,
});
s.status = status;
s
}
fn test_interrupt(id: &str, delivery_mode: &str, itype: InterruptType) -> Interrupt {
Interrupt {
id: id.into(),
interrupt_type: itype,
priority: "medium".into(),
target_session_id: "s1".into(),
reason: "test".into(),
payload: None,
delivery_mode: delivery_mode.into(),
max_retries: 3,
retry_count: 0,
next_retry_at: None,
expires_at: None,
dedupe_key: None,
state: InterruptState::Pending,
created_at: "2026-04-20T10:00:00Z".into(),
delivered_at: None,
acknowledged_at: None,
}
}
#[test]
fn can_deliver_immediate_always() {
let interrupt = test_interrupt("i1", "immediate", InterruptType::Stop);
let session = test_session(SessionStatus::Processing);
assert!(can_deliver(&interrupt, &session));
let session = test_session(SessionStatus::WaitingInput);
assert!(can_deliver(&interrupt, &session));
}
#[test]
fn can_deliver_waiting_only_checks_status() {
let interrupt = test_interrupt("i2", "waiting_only", InterruptType::Nudge);
let session = test_session(SessionStatus::Processing);
assert!(!can_deliver(&interrupt, &session));
let session = test_session(SessionStatus::WaitingInput);
assert!(can_deliver(&interrupt, &session));
let session = test_session(SessionStatus::NeedsInput);
assert!(!can_deliver(&interrupt, &session));
}
#[test]
fn can_deliver_manual_review_never() {
let interrupt = test_interrupt("i3", "manual_review", InterruptType::Reroute);
let session = test_session(SessionStatus::WaitingInput);
assert!(!can_deliver(&interrupt, &session));
}
#[test]
fn format_interrupt_message_basic() {
let interrupt = Interrupt {
id: "i1".into(),
interrupt_type: InterruptType::Pause,
priority: "high".into(),
target_session_id: "s1".into(),
reason: "Lease conflict on src/app.rs".into(),
payload: None,
delivery_mode: "safe_boundary".into(),
max_retries: 3,
retry_count: 0,
next_retry_at: None,
expires_at: None,
dedupe_key: None,
state: InterruptState::Pending,
created_at: "2026-04-20T10:00:00Z".into(),
delivered_at: None,
acknowledged_at: None,
};
let msg = format_interrupt_message(&interrupt);
assert!(msg.contains("[Interrupt: pause]"));
assert!(msg.contains("Lease conflict on src/app.rs"));
assert!(msg.contains("Priority: high"));
}
#[test]
fn format_interrupt_message_with_payload() {
let interrupt = Interrupt {
id: "i1".into(),
interrupt_type: InterruptType::ReleaseOwnership,
priority: "high".into(),
target_session_id: "s1".into(),
reason: "Another agent needs src/app.rs".into(),
payload: Some(serde_json::json!({"resource": "src/app.rs", "owner": "sess_9"})),
delivery_mode: "safe_boundary".into(),
max_retries: 3,
retry_count: 0,
next_retry_at: None,
expires_at: None,
dedupe_key: None,
state: InterruptState::Pending,
created_at: "2026-04-20T10:00:00Z".into(),
delivered_at: None,
acknowledged_at: None,
};
let msg = format_interrupt_message(&interrupt);
assert!(msg.contains("resource: src/app.rs"));
assert!(msg.contains("owner: sess_9"));
}
}