enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Interruptable execution for plan approval flows
//!
//! Kernel-owned interrupt handling logic.

use super::ids::ExecutionId;
use crate::runner::approval_policy::ApprovalPolicy;
use crate::streaming::{EventEmitter, StreamEvent};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::mpsc;

/// Interrupt reason
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum InterruptReason {
    /// Waiting for plan approval
    PlanApproval { plan: Value },
    /// Waiting for user input
    UserInput { prompt: String },
    /// Custom interrupt
    Custom { reason: String, data: Value },
}

/// User decision for an interrupt
#[derive(Debug, Clone)]
pub enum InterruptDecision {
    /// Approve and continue
    Approve,
    /// Reject and abort/replan
    Reject { reason: Option<String> },
    /// Provide input and continue
    Input { value: Value },
}

/// Interruptable execution handler
pub struct InterruptableRunner {
    execution_id: ExecutionId,
    emitter: EventEmitter,
    interrupt_tx: mpsc::Sender<InterruptDecision>,
    interrupt_rx: Option<mpsc::Receiver<InterruptDecision>>,
    /// Optional approval policy for plan approval
    approval_policy: Option<Arc<dyn ApprovalPolicy>>,
}

impl InterruptableRunner {
    pub fn new() -> Self {
        let (tx, rx) = mpsc::channel(1);
        Self {
            execution_id: ExecutionId::new(),
            emitter: EventEmitter::new(),
            interrupt_tx: tx,
            interrupt_rx: Some(rx),
            approval_policy: None,
        }
    }

    /// Set the approval policy for this runner
    pub fn with_approval_policy<P: ApprovalPolicy + 'static>(mut self, policy: P) -> Self {
        self.approval_policy = Some(Arc::new(policy));
        self
    }

    /// Set the approval policy from an Arc
    pub fn with_approval_policy_arc(mut self, policy: Arc<dyn ApprovalPolicy>) -> Self {
        self.approval_policy = Some(policy);
        self
    }

    /// Check if a plan requires approval based on the configured policy
    /// Returns the approval reason if approval is required, None otherwise
    pub fn check_plan_approval(&self, plan: &Value) -> Option<String> {
        self.approval_policy
            .as_ref()
            .and_then(|p| p.approval_reason(plan))
    }

    /// Check if a plan requires approval (boolean)
    pub fn requires_approval(&self, plan: &Value) -> bool {
        self.approval_policy
            .as_ref()
            .map(|p| p.requires_approval(plan))
            .unwrap_or(false)
    }

    /// Get the approval policy name (for logging)
    pub fn policy_name(&self) -> Option<&str> {
        self.approval_policy.as_ref().map(|p| p.name())
    }

    /// Get the execution ID
    pub fn execution_id(&self) -> &ExecutionId {
        &self.execution_id
    }

    pub fn emitter(&self) -> &EventEmitter {
        &self.emitter
    }

    /// Get the decision sender (for external use to send decisions)
    pub fn decision_sender(&self) -> mpsc::Sender<InterruptDecision> {
        self.interrupt_tx.clone()
    }

    /// Send approval decision
    pub async fn approve(&self) {
        let _ = self.interrupt_tx.send(InterruptDecision::Approve).await;
    }

    /// Send rejection decision
    pub async fn reject(&self, reason: Option<String>) {
        let _ = self
            .interrupt_tx
            .send(InterruptDecision::Reject { reason })
            .await;
    }

    /// Request approval if the policy requires it
    ///
    /// This combines policy check and approval request into a single method.
    /// Returns Ok(true) if:
    /// - No approval policy is set
    /// - Policy doesn't require approval for this plan
    /// - Approval is requested and granted
    ///
    /// Returns Ok(false) if approval is rejected.
    /// Returns Err if there's a communication error.
    pub async fn request_approval_if_needed(&mut self, plan: &Value) -> anyhow::Result<bool> {
        // No policy or policy doesn't require approval
        if !self.requires_approval(plan) {
            return Ok(true);
        }

        // Log the approval reason
        if let Some(reason) = self.check_plan_approval(plan) {
            tracing::info!(
                execution_id = %self.execution_id,
                policy = ?self.policy_name(),
                reason = %reason,
                "Plan requires approval"
            );
        }

        // Request approval
        self.request_approval(plan.clone()).await
    }

    /// Request approval and wait for decision
    pub async fn request_approval(&mut self, plan: Value) -> anyhow::Result<bool> {
        // Emit waiting event
        self.emitter.emit(StreamEvent::text_delta(
            self.execution_id.as_str(),
            format!(
                "Waiting for approval: {}",
                serde_json::to_string_pretty(&plan)?
            ),
        ));

        // Wait for decision
        if let Some(ref mut rx) = self.interrupt_rx {
            if let Some(decision) = rx.recv().await {
                match decision {
                    InterruptDecision::Approve => {
                        self.emitter.emit(StreamEvent::text_delta(
                            self.execution_id.as_str(),
                            "Plan approved, continuing execution",
                        ));
                        Ok(true)
                    }
                    InterruptDecision::Reject { reason } => {
                        self.emitter.emit(StreamEvent::text_delta(
                            self.execution_id.as_str(),
                            format!("Plan rejected: {:?}", reason),
                        ));
                        Ok(false)
                    }
                    InterruptDecision::Input { .. } => Ok(true),
                }
            } else {
                anyhow::bail!("Interrupt channel closed")
            }
        } else {
            anyhow::bail!("Interrupt receiver not available")
        }
    }
}

impl Default for InterruptableRunner {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::runner::approval_policy::{
        AlwaysApprovePolicy, AlwaysRequireApprovalPolicy, ThresholdApprovalPolicy,
    };
    use serde_json::json;

    #[test]
    fn test_runner_without_policy() {
        let runner = InterruptableRunner::new();
        let plan = json!({"steps": ["s1", "s2", "s3"]});

        // Without policy, no approval required
        assert!(!runner.requires_approval(&plan));
        assert!(runner.check_plan_approval(&plan).is_none());
        assert!(runner.policy_name().is_none());
    }

    #[test]
    fn test_runner_with_always_approve_policy() {
        let runner = InterruptableRunner::new().with_approval_policy(AlwaysApprovePolicy);
        let plan = json!({"steps": ["s1", "s2", "s3"]});

        assert!(!runner.requires_approval(&plan));
        assert!(runner.check_plan_approval(&plan).is_none());
        assert_eq!(runner.policy_name(), Some("always_approve"));
    }

    #[test]
    fn test_runner_with_always_require_policy() {
        let runner = InterruptableRunner::new().with_approval_policy(AlwaysRequireApprovalPolicy);
        let plan = json!({"steps": ["s1"]});

        assert!(runner.requires_approval(&plan));
        assert!(runner.check_plan_approval(&plan).is_some());
        assert_eq!(runner.policy_name(), Some("always_require"));
    }

    #[test]
    fn test_runner_with_threshold_policy() {
        let runner =
            InterruptableRunner::new().with_approval_policy(ThresholdApprovalPolicy::new(2));

        let small_plan = json!({"steps": ["s1", "s2"]});
        assert!(!runner.requires_approval(&small_plan));

        let large_plan = json!({"steps": ["s1", "s2", "s3"]});
        assert!(runner.requires_approval(&large_plan));

        let reason = runner.check_plan_approval(&large_plan).unwrap();
        assert!(reason.contains("3 steps"));
    }

    #[test]
    fn test_runner_with_policy_arc() {
        let policy: Arc<dyn ApprovalPolicy> = Arc::new(AlwaysRequireApprovalPolicy);
        let runner = InterruptableRunner::new().with_approval_policy_arc(policy);

        assert!(runner.requires_approval(&json!({})));
    }
}