use super::ids::ExecutionId;
use crate::streaming::{EventEmitter, StreamEvent};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::mpsc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum InterruptReason {
PlanApproval { plan: Value },
UserInput { prompt: String },
Custom { reason: String, data: Value },
}
#[derive(Debug, Clone)]
pub enum InterruptDecision {
Approve,
Reject { reason: Option<String> },
Input { value: Value },
}
pub struct InterruptableRunner {
execution_id: ExecutionId,
emitter: EventEmitter,
interrupt_tx: mpsc::Sender<InterruptDecision>,
interrupt_rx: Option<mpsc::Receiver<InterruptDecision>>,
}
impl InterruptableRunner {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
execution_id: ExecutionId::new(),
emitter: EventEmitter::new(),
interrupt_tx: tx,
interrupt_rx: Some(rx),
}
}
pub fn execution_id(&self) -> &ExecutionId {
&self.execution_id
}
#[deprecated(note = "Use execution_id() instead")]
pub fn run_id(&self) -> &ExecutionId {
&self.execution_id
}
pub fn emitter(&self) -> &EventEmitter {
&self.emitter
}
pub fn decision_sender(&self) -> mpsc::Sender<InterruptDecision> {
self.interrupt_tx.clone()
}
pub async fn approve(&self) {
let _ = self.interrupt_tx.send(InterruptDecision::Approve).await;
}
pub async fn reject(&self, reason: Option<String>) {
let _ = self.interrupt_tx.send(InterruptDecision::Reject { reason }).await;
}
pub async fn request_approval(&mut self, plan: Value) -> anyhow::Result<bool> {
self.emitter.emit(StreamEvent::text_delta(
self.execution_id.as_str(),
format!("Waiting for approval: {}", serde_json::to_string_pretty(&plan)?),
));
if let Some(ref mut rx) = self.interrupt_rx {
if let Some(decision) = rx.recv().await {
match decision {
InterruptDecision::Approve => {
self.emitter.emit(StreamEvent::text_delta(
self.execution_id.as_str(),
"Plan approved, continuing execution",
));
Ok(true)
}
InterruptDecision::Reject { reason } => {
self.emitter.emit(StreamEvent::text_delta(
self.execution_id.as_str(),
format!("Plan rejected: {:?}", reason),
));
Ok(false)
}
InterruptDecision::Input { .. } => {
Ok(true)
}
}
} else {
anyhow::bail!("Interrupt channel closed")
}
} else {
anyhow::bail!("Interrupt receiver not available")
}
}
}
impl Default for InterruptableRunner {
fn default() -> Self {
Self::new()
}
}