use tokio::sync::oneshot;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QueueClosedError;
impl std::fmt::Display for QueueClosedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "thread queue is closed — no further commands accepted")
}
}
impl std::error::Error for QueueClosedError {}
#[derive(Debug, PartialEq, Eq)]
pub enum CloseResult {
ClosedIdle,
ClosedWithSummary,
Interrupted,
}
pub enum ThreadCommand {
ClaudeReply {
request_id: serde_json::Value,
args: serde_json::Value,
respond_tx: oneshot::Sender<serde_json::Value>,
},
AutoMailInject {
content: String,
},
Close {
respond_tx: oneshot::Sender<CloseResult>,
},
}
impl std::fmt::Debug for ThreadCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ClaudeReply { request_id, .. } => {
write!(f, "ClaudeReply {{ request_id: {request_id} }}")
}
Self::AutoMailInject { .. } => write!(f, "AutoMailInject"),
Self::Close { .. } => write!(f, "Close"),
}
}
}
#[derive(Debug)]
pub struct ThreadCommandQueue {
agent_id: String,
queue: std::collections::VecDeque<ThreadCommand>,
close_requested: bool,
}
impl ThreadCommandQueue {
pub fn new(agent_id: String) -> Self {
Self {
agent_id,
queue: std::collections::VecDeque::new(),
close_requested: false,
}
}
pub fn agent_id(&self) -> &str {
&self.agent_id
}
pub fn is_close_requested(&self) -> bool {
self.close_requested
}
pub fn push_claude_reply(
&mut self,
request_id: serde_json::Value,
args: serde_json::Value,
respond_tx: oneshot::Sender<serde_json::Value>,
) -> Result<(), QueueClosedError> {
if self.close_requested {
return Err(QueueClosedError);
}
self.queue
.push_back(ThreadCommand::ClaudeReply { request_id, args, respond_tx });
Ok(())
}
pub fn push_auto_mail(&mut self, content: String) -> bool {
if self.close_requested {
return false;
}
let has_pending_reply = self
.queue
.iter()
.any(|c| matches!(c, ThreadCommand::ClaudeReply { .. }));
if has_pending_reply {
return false;
}
self.queue
.push_back(ThreadCommand::AutoMailInject { content });
true
}
pub fn push_close(&mut self, respond_tx: oneshot::Sender<CloseResult>) -> bool {
if self.close_requested {
return false;
}
self.close_requested = true;
self.queue
.push_front(ThreadCommand::Close { respond_tx });
true
}
pub fn pop_next(&mut self) -> Option<ThreadCommand> {
self.queue.pop_front()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::oneshot;
fn make_queue() -> ThreadCommandQueue {
ThreadCommandQueue::new("codex:test-agent".to_string())
}
#[test]
fn close_is_idempotent_second_push_returns_false() {
let mut q = make_queue();
let (tx1, _rx1) = oneshot::channel::<CloseResult>();
let (tx2, _rx2) = oneshot::channel::<CloseResult>();
assert!(q.push_close(tx1), "first close should be accepted");
assert!(!q.push_close(tx2), "second close must return false (idempotent)");
assert!(q.is_close_requested());
}
#[test]
fn claude_reply_rejected_after_close() {
let mut q = make_queue();
let (tx, _rx) = oneshot::channel::<CloseResult>();
q.push_close(tx);
let (reply_tx, _reply_rx) = oneshot::channel();
let result = q.push_claude_reply(serde_json::json!(1), serde_json::json!({}), reply_tx);
assert!(result.is_err(), "ClaudeReply must be rejected when close is pending");
}
#[test]
fn auto_mail_rejected_after_close() {
let mut q = make_queue();
let (tx, _rx) = oneshot::channel::<CloseResult>();
q.push_close(tx);
let queued = q.push_auto_mail("inject me".to_string());
assert!(!queued, "AutoMailInject must be rejected when close is pending");
}
#[test]
fn auto_mail_rejected_when_claude_reply_queued() {
let mut q = make_queue();
let (reply_tx, _reply_rx) = oneshot::channel();
q.push_claude_reply(serde_json::json!(1), serde_json::json!({}), reply_tx)
.unwrap();
let queued = q.push_auto_mail("inject me".to_string());
assert!(!queued, "AutoMailInject must be rejected when a ClaudeReply is pending (FR-8.10)");
}
#[test]
fn close_jumps_to_front_of_non_empty_queue() {
let mut q = make_queue();
let (reply_tx, _reply_rx) = oneshot::channel();
q.push_claude_reply(serde_json::json!(42), serde_json::json!({"prompt": "hello"}), reply_tx)
.unwrap();
let (tx, _rx) = oneshot::channel::<CloseResult>();
assert!(q.push_close(tx));
let first = q.pop_next().expect("queue must not be empty");
assert!(
matches!(first, ThreadCommand::Close { .. }),
"Close must be the first command popped"
);
let second = q.pop_next().expect("ClaudeReply must still be present");
assert!(
matches!(second, ThreadCommand::ClaudeReply { .. }),
"ClaudeReply must follow Close"
);
}
#[test]
fn pop_next_on_empty_queue_returns_none() {
let mut q = make_queue();
assert!(q.pop_next().is_none());
}
#[test]
fn push_and_pop_claude_reply_round_trip() {
let mut q = make_queue();
let (reply_tx, _reply_rx) = oneshot::channel();
q.push_claude_reply(serde_json::json!(99), serde_json::json!({"x": 1}), reply_tx)
.unwrap();
let cmd = q.pop_next().unwrap();
match cmd {
ThreadCommand::ClaudeReply { request_id, args, .. } => {
assert_eq!(request_id, serde_json::json!(99));
assert_eq!(args["x"], 1);
}
_ => panic!("expected ClaudeReply"),
}
assert!(q.pop_next().is_none());
}
#[test]
fn push_and_pop_auto_mail_round_trip() {
let mut q = make_queue();
let queued = q.push_auto_mail("hello world".to_string());
assert!(queued);
let cmd = q.pop_next().unwrap();
match cmd {
ThreadCommand::AutoMailInject { content } => {
assert_eq!(content, "hello world");
}
_ => panic!("expected AutoMailInject"),
}
}
}