use std::collections::HashSet;
use crate::error::AgentError;
use crate::lifecycle::RunId;
use crate::ops::{AsyncOpRef, OperationId};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TurnPhase {
Ready,
ApplyingPrimitive,
CallingLlm,
WaitingForOps,
DrainingBoundary,
Extracting,
ErrorRecovery,
Cancelling,
Completed,
Failed,
Cancelled,
}
impl TurnPhase {
pub fn is_terminal(self) -> bool {
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
}
pub fn is_extracting(self) -> bool {
matches!(self, Self::Extracting)
}
}
impl std::fmt::Display for TurnPhase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Ready => "Ready",
Self::ApplyingPrimitive => "ApplyingPrimitive",
Self::CallingLlm => "CallingLlm",
Self::WaitingForOps => "WaitingForOps",
Self::DrainingBoundary => "DrainingBoundary",
Self::Extracting => "Extracting",
Self::ErrorRecovery => "ErrorRecovery",
Self::Cancelling => "Cancelling",
Self::Completed => "Completed",
Self::Failed => "Failed",
Self::Cancelled => "Cancelled",
};
f.write_str(s)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TurnPrimitiveKind {
None,
ConversationTurn,
ImmediateAppend,
ImmediateContextAppend,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TurnTerminalOutcome {
None,
Completed,
Failed,
Cancelled,
BudgetExhausted,
TimeBudgetExceeded,
StructuredOutputValidationFailed,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContentShape(pub String);
#[derive(Debug, Clone)]
pub enum TurnExecutionInput {
StartConversationRun {
run_id: RunId,
},
StartImmediateAppend {
run_id: RunId,
},
StartImmediateContext {
run_id: RunId,
},
PrimitiveApplied {
run_id: RunId,
admitted_content_shape: ContentShape,
vision_enabled: bool,
image_tool_results_enabled: bool,
},
LlmReturnedToolCalls {
run_id: RunId,
tool_count: u32,
},
LlmReturnedTerminal {
run_id: RunId,
},
RegisterPendingOps {
run_id: RunId,
op_refs: Vec<AsyncOpRef>,
barrier_operation_ids: Vec<OperationId>,
has_barrier_ops: bool,
},
ToolCallsResolved {
run_id: RunId,
},
OpsBarrierSatisfied {
run_id: RunId,
operation_ids: Vec<OperationId>,
},
BoundaryContinue {
run_id: RunId,
},
BoundaryComplete {
run_id: RunId,
},
RecoverableFailure {
run_id: RunId,
},
FatalFailure {
run_id: RunId,
},
RetryRequested {
run_id: RunId,
},
CancelNow {
run_id: RunId,
},
CancelAfterBoundary {
run_id: RunId,
},
CancellationObserved {
run_id: RunId,
},
AcknowledgeTerminal {
run_id: RunId,
},
TurnLimitReached {
run_id: RunId,
},
BudgetExhausted {
run_id: RunId,
},
TimeBudgetExceeded {
run_id: RunId,
},
EnterExtraction {
run_id: RunId,
max_retries: u32,
},
ExtractionValidationPassed {
run_id: RunId,
},
ExtractionValidationFailed {
run_id: RunId,
error: String,
},
ExtractionStart {
run_id: RunId,
},
ForceCancelNoRun,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TurnExecutionEffect {
RunStarted {
run_id: RunId,
},
BoundaryApplied {
run_id: RunId,
boundary_sequence: u64,
},
RunCompleted {
run_id: RunId,
},
RunFailed {
run_id: RunId,
},
RunCancelled {
run_id: RunId,
},
CheckCompaction,
}
#[derive(Debug)]
pub struct TurnExecutionTransition {
pub prev_phase: TurnPhase,
pub next_phase: TurnPhase,
pub effects: Vec<TurnExecutionEffect>,
}
#[derive(Debug, Clone)]
struct TurnExecutionFields {
active_run: Option<RunId>,
primitive_kind: TurnPrimitiveKind,
admitted_content_shape: Option<ContentShape>,
vision_enabled: bool,
image_tool_results_enabled: bool,
tool_calls_pending: u32,
pending_op_refs: Option<Vec<AsyncOpRef>>,
barrier_operation_ids: Vec<OperationId>,
has_barrier_ops: bool,
barrier_satisfied: bool,
boundary_count: u32,
cancel_after_boundary: bool,
terminal_outcome: TurnTerminalOutcome,
extraction_attempts: u32,
max_extraction_retries: u32,
}
impl TurnExecutionFields {
fn init() -> Self {
Self {
active_run: None,
primitive_kind: TurnPrimitiveKind::None,
admitted_content_shape: None,
vision_enabled: false,
image_tool_results_enabled: false,
tool_calls_pending: 0,
pending_op_refs: None,
barrier_operation_ids: Vec::new(),
has_barrier_ops: false,
barrier_satisfied: true,
boundary_count: 0,
cancel_after_boundary: false,
terminal_outcome: TurnTerminalOutcome::None,
extraction_attempts: 0,
max_extraction_retries: 0,
}
}
fn reset(&mut self) {
*self = Self::init();
}
}
mod sealed {
pub trait Sealed {}
}
pub trait TurnExecutionMutator: sealed::Sealed {
fn apply(&mut self, input: TurnExecutionInput) -> Result<TurnExecutionTransition, AgentError>;
}
#[derive(Debug, Clone)]
pub struct TurnExecutionAuthority {
phase: TurnPhase,
fields: TurnExecutionFields,
}
impl Default for TurnExecutionAuthority {
fn default() -> Self {
Self::new()
}
}
impl sealed::Sealed for TurnExecutionAuthority {}
impl TurnExecutionAuthority {
pub fn new() -> Self {
Self {
phase: TurnPhase::Ready,
fields: TurnExecutionFields::init(),
}
}
pub fn phase(&self) -> TurnPhase {
self.phase
}
pub fn active_run(&self) -> Option<&RunId> {
self.fields.active_run.as_ref()
}
pub fn primitive_kind(&self) -> TurnPrimitiveKind {
self.fields.primitive_kind
}
pub fn boundary_count(&self) -> u32 {
self.fields.boundary_count
}
pub fn cancel_after_boundary(&self) -> bool {
self.fields.cancel_after_boundary
}
pub fn terminal_outcome(&self) -> TurnTerminalOutcome {
self.fields.terminal_outcome
}
pub fn tool_calls_pending(&self) -> u32 {
self.fields.tool_calls_pending
}
pub fn pending_op_refs(&self) -> Option<&[AsyncOpRef]> {
self.fields.pending_op_refs.as_deref()
}
pub fn has_barrier_ops(&self) -> bool {
self.fields.has_barrier_ops
}
pub fn barrier_satisfied(&self) -> bool {
self.fields.barrier_satisfied
}
pub fn barrier_op_ids(&self) -> Vec<&OperationId> {
self.fields.barrier_operation_ids.iter().collect()
}
pub fn pending_op_ids(&self) -> Option<Vec<&OperationId>> {
self.fields
.pending_op_refs
.as_ref()
.map(|refs| refs.iter().map(|r| &r.operation_id).collect())
}
fn barrier_operation_ids_match(&self, operation_ids: &[OperationId]) -> bool {
let expected = self
.fields
.barrier_operation_ids
.iter()
.cloned()
.collect::<HashSet<_>>();
let actual = operation_ids.iter().cloned().collect::<HashSet<_>>();
expected.len() == operation_ids.len() && expected == actual
}
pub fn vision_enabled(&self) -> bool {
self.fields.vision_enabled
}
pub fn image_tool_results_enabled(&self) -> bool {
self.fields.image_tool_results_enabled
}
pub fn admitted_content_shape(&self) -> Option<&ContentShape> {
self.fields.admitted_content_shape.as_ref()
}
pub fn extraction_attempts(&self) -> u32 {
self.fields.extraction_attempts
}
pub fn max_extraction_retries(&self) -> u32 {
self.fields.max_extraction_retries
}
pub fn in_extraction_flow(&self) -> bool {
self.fields.max_extraction_retries > 0
}
pub fn can_accept(&self, input: &TurnExecutionInput) -> bool {
self.evaluate(input).is_ok()
}
fn invalid(from: TurnPhase, input: &TurnExecutionInput) -> AgentError {
AgentError::InvalidStateTransition {
from: from.to_string(),
to: format!("{input:?}"),
}
}
fn guard_run_matches(&self, run_id: &RunId) -> bool {
self.fields.active_run.as_ref() == Some(run_id)
}
fn evaluate(
&self,
input: &TurnExecutionInput,
) -> Result<(TurnPhase, TurnExecutionFields, Vec<TurnExecutionEffect>), AgentError> {
use TurnExecutionInput::{
AcknowledgeTerminal, BoundaryComplete, BoundaryContinue, BudgetExhausted,
CancelAfterBoundary, CancelNow, CancellationObserved, EnterExtraction, ExtractionStart,
ExtractionValidationFailed, ExtractionValidationPassed, FatalFailure, ForceCancelNoRun,
LlmReturnedTerminal, LlmReturnedToolCalls, OpsBarrierSatisfied, PrimitiveApplied,
RecoverableFailure, RegisterPendingOps, RetryRequested, StartConversationRun,
StartImmediateAppend, StartImmediateContext, TimeBudgetExceeded, ToolCallsResolved,
TurnLimitReached,
};
use TurnPhase::{
ApplyingPrimitive, CallingLlm, Cancelled, Cancelling, Completed, DrainingBoundary,
ErrorRecovery, Extracting, Failed, Ready, WaitingForOps,
};
let phase = self.phase;
let mut fields = self.fields.clone();
let mut effects = Vec::new();
let next_phase = match (phase, input) {
(Ready, StartConversationRun { run_id }) => {
fields.active_run = Some(run_id.clone());
fields.primitive_kind = TurnPrimitiveKind::ConversationTurn;
fields.tool_calls_pending = 0;
fields.admitted_content_shape = None;
fields.vision_enabled = false;
fields.image_tool_results_enabled = false;
fields.boundary_count = 0;
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::None;
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
effects.push(TurnExecutionEffect::RunStarted {
run_id: run_id.clone(),
});
ApplyingPrimitive
}
(Ready, StartImmediateAppend { run_id }) => {
fields.active_run = Some(run_id.clone());
fields.primitive_kind = TurnPrimitiveKind::ImmediateAppend;
fields.tool_calls_pending = 0;
fields.admitted_content_shape = None;
fields.vision_enabled = false;
fields.image_tool_results_enabled = false;
fields.boundary_count = 0;
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::None;
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
effects.push(TurnExecutionEffect::RunStarted {
run_id: run_id.clone(),
});
ApplyingPrimitive
}
(Ready, StartImmediateContext { run_id }) => {
fields.active_run = Some(run_id.clone());
fields.primitive_kind = TurnPrimitiveKind::ImmediateContextAppend;
fields.tool_calls_pending = 0;
fields.admitted_content_shape = None;
fields.vision_enabled = false;
fields.image_tool_results_enabled = false;
fields.boundary_count = 0;
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::None;
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
effects.push(TurnExecutionEffect::RunStarted {
run_id: run_id.clone(),
});
ApplyingPrimitive
}
(
ApplyingPrimitive,
PrimitiveApplied {
run_id,
admitted_content_shape,
vision_enabled,
image_tool_results_enabled,
},
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.admitted_content_shape = Some(admitted_content_shape.clone());
fields.vision_enabled = *vision_enabled;
fields.image_tool_results_enabled = *image_tool_results_enabled;
match fields.primitive_kind {
TurnPrimitiveKind::ConversationTurn => {
effects.push(TurnExecutionEffect::CheckCompaction);
CallingLlm
}
TurnPrimitiveKind::ImmediateAppend => {
fields.boundary_count += 1;
let boundary_seq = u64::from(fields.boundary_count);
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: boundary_seq,
});
if fields.cancel_after_boundary {
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
Cancelled
} else {
fields.terminal_outcome = TurnTerminalOutcome::Completed;
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
}
TurnPrimitiveKind::ImmediateContextAppend => {
fields.boundary_count += 1;
let boundary_seq = u64::from(fields.boundary_count);
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: boundary_seq,
});
if fields.cancel_after_boundary {
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
Cancelled
} else {
fields.terminal_outcome = TurnTerminalOutcome::Completed;
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
}
TurnPrimitiveKind::None => {
return Err(Self::invalid(phase, input));
}
}
}
(CallingLlm, LlmReturnedToolCalls { run_id, tool_count }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if *tool_count == 0 {
return Err(Self::invalid(phase, input));
}
fields.tool_calls_pending = *tool_count;
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
WaitingForOps
}
(CallingLlm, LlmReturnedTerminal { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.boundary_count += 1;
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: u64::from(fields.boundary_count),
});
DrainingBoundary
}
(
WaitingForOps,
RegisterPendingOps {
run_id,
op_refs,
barrier_operation_ids,
has_barrier_ops,
},
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if fields.tool_calls_pending == 0 {
return Err(Self::invalid(phase, input));
}
fields.pending_op_refs = Some(op_refs.clone());
fields.barrier_operation_ids = barrier_operation_ids.clone();
fields.has_barrier_ops = *has_barrier_ops;
fields.barrier_satisfied = !*has_barrier_ops;
WaitingForOps
}
(
WaitingForOps,
OpsBarrierSatisfied {
run_id,
operation_ids,
},
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if fields.barrier_satisfied {
return Err(Self::invalid(phase, input));
}
if !self.barrier_operation_ids_match(operation_ids) {
return Err(Self::invalid(phase, input));
}
fields.barrier_satisfied = true;
WaitingForOps
}
(WaitingForOps, ToolCallsResolved { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if fields.tool_calls_pending == 0 {
return Err(Self::invalid(phase, input));
}
if fields.pending_op_refs.is_none() {
return Err(Self::invalid(phase, input));
}
if !fields.barrier_satisfied {
return Err(Self::invalid(phase, input));
}
fields.tool_calls_pending = 0;
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
fields.boundary_count += 1;
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: u64::from(fields.boundary_count),
});
DrainingBoundary
}
(DrainingBoundary, BoundaryContinue { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if fields.primitive_kind != TurnPrimitiveKind::ConversationTurn {
return Err(Self::invalid(phase, input));
}
if fields.cancel_after_boundary {
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
Cancelled
} else {
effects.push(TurnExecutionEffect::CheckCompaction);
CallingLlm
}
}
(DrainingBoundary, BoundaryComplete { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if fields.cancel_after_boundary {
fields.cancel_after_boundary = false;
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
Cancelled
} else {
fields.terminal_outcome = TurnTerminalOutcome::Completed;
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
}
(
DrainingBoundary,
EnterExtraction {
run_id,
max_retries,
},
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.max_extraction_retries = *max_retries;
Extracting
}
(Extracting, ExtractionValidationPassed { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.terminal_outcome = TurnTerminalOutcome::Completed;
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
(Extracting, ExtractionStart { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
effects.push(TurnExecutionEffect::CheckCompaction);
CallingLlm
}
(Extracting, ExtractionValidationFailed { run_id, .. }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.extraction_attempts += 1;
if fields.extraction_attempts < fields.max_extraction_retries {
effects.push(TurnExecutionEffect::CheckCompaction);
CallingLlm
} else {
fields.terminal_outcome = TurnTerminalOutcome::StructuredOutputValidationFailed;
effects.push(TurnExecutionEffect::RunFailed {
run_id: run_id.clone(),
});
Failed
}
}
(CallingLlm | WaitingForOps | DrainingBoundary, RecoverableFailure { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
ErrorRecovery
}
(ErrorRecovery, RetryRequested { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
effects.push(TurnExecutionEffect::CheckCompaction);
CallingLlm
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
FatalFailure { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
fields.terminal_outcome = TurnTerminalOutcome::Failed;
effects.push(TurnExecutionEffect::RunFailed {
run_id: run_id.clone(),
});
Failed
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
CancelNow { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
Cancelling
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
CancelAfterBoundary { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.cancel_after_boundary = true;
phase
}
(Cancelling, CancellationObserved { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
fields.cancel_after_boundary = false;
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
Cancelled
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
TurnLimitReached { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
fields.boundary_count += 1;
fields.terminal_outcome = TurnTerminalOutcome::Completed;
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: u64::from(fields.boundary_count),
});
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
BudgetExhausted { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
fields.boundary_count += 1;
fields.terminal_outcome = TurnTerminalOutcome::BudgetExhausted;
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: u64::from(fields.boundary_count),
});
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
(
ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary | Extracting
| ErrorRecovery,
TimeBudgetExceeded { run_id },
) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
fields.boundary_count += 1;
fields.terminal_outcome = TurnTerminalOutcome::TimeBudgetExceeded;
effects.push(TurnExecutionEffect::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: u64::from(fields.boundary_count),
});
effects.push(TurnExecutionEffect::RunCompleted {
run_id: run_id.clone(),
});
Completed
}
(
Ready | ApplyingPrimitive | CallingLlm | WaitingForOps | DrainingBoundary
| Extracting | ErrorRecovery | Cancelling,
ForceCancelNoRun,
) => {
if matches!(phase, WaitingForOps) {
fields.pending_op_refs = None;
fields.barrier_operation_ids = Vec::new();
fields.has_barrier_ops = false;
fields.barrier_satisfied = true;
}
fields.terminal_outcome = TurnTerminalOutcome::Cancelled;
if let Some(ref run_id) = fields.active_run {
effects.push(TurnExecutionEffect::RunCancelled {
run_id: run_id.clone(),
});
}
Cancelled
}
(Completed | Failed | Cancelled, AcknowledgeTerminal { run_id }) => {
if !self.guard_run_matches(run_id) {
return Err(Self::invalid(phase, input));
}
fields.reset();
Ready
}
_ => {
return Err(Self::invalid(phase, input));
}
};
Ok((next_phase, fields, effects))
}
}
impl TurnExecutionMutator for TurnExecutionAuthority {
fn apply(&mut self, input: TurnExecutionInput) -> Result<TurnExecutionTransition, AgentError> {
let prev_phase = self.phase;
let (next_phase, next_fields, effects) = self.evaluate(&input)?;
self.phase = next_phase;
self.fields = next_fields;
Ok(TurnExecutionTransition {
prev_phase,
next_phase,
effects,
})
}
}
impl TurnPhase {
pub fn to_loop_state(self) -> crate::state::LoopState {
use crate::state::LoopState;
match self {
Self::Ready | Self::ApplyingPrimitive | Self::CallingLlm => LoopState::CallingLlm,
Self::WaitingForOps => LoopState::WaitingForOps,
Self::DrainingBoundary => LoopState::DrainingEvents,
Self::Extracting => LoopState::DrainingEvents,
Self::ErrorRecovery => LoopState::ErrorRecovery,
Self::Cancelling => LoopState::Cancelling,
Self::Completed | Self::Failed | Self::Cancelled => LoopState::Completed,
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::ops::WaitPolicy;
use uuid::Uuid;
fn test_run_id() -> RunId {
RunId(Uuid::from_u128(1))
}
fn other_run_id() -> RunId {
RunId(Uuid::from_u128(2))
}
fn make_authority() -> TurnExecutionAuthority {
TurnExecutionAuthority::new()
}
fn authority_at_calling_llm() -> TurnExecutionAuthority {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start");
auth.apply(TurnExecutionInput::PrimitiveApplied {
run_id: test_run_id(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: false,
image_tool_results_enabled: false,
})
.expect("primitive applied");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
auth
}
fn authority_at_waiting_for_ops() -> TurnExecutionAuthority {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: test_run_id(),
tool_count: 3,
})
.expect("tool calls");
assert_eq!(auth.phase(), TurnPhase::WaitingForOps);
auth
}
fn authority_at_draining_boundary() -> TurnExecutionAuthority {
let mut auth = authority_at_waiting_for_ops();
let barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![AsyncOpRef::barrier(barrier_id.clone())],
barrier_operation_ids: vec![barrier_id.clone()],
has_barrier_ops: true,
})
.expect("register pending ops");
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: test_run_id(),
operation_ids: vec![barrier_id],
})
.expect("ops barrier satisfied");
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: test_run_id(),
})
.expect("tool calls resolved");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
auth
}
#[test]
fn start_conversation_run_from_ready() {
let mut auth = make_authority();
let t = auth
.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start conversation run");
assert_eq!(t.prev_phase, TurnPhase::Ready);
assert_eq!(t.next_phase, TurnPhase::ApplyingPrimitive);
assert_eq!(auth.phase(), TurnPhase::ApplyingPrimitive);
assert_eq!(auth.active_run(), Some(&test_run_id()));
assert_eq!(auth.primitive_kind(), TurnPrimitiveKind::ConversationTurn);
assert_eq!(t.effects.len(), 1);
assert!(matches!(
t.effects[0],
TurnExecutionEffect::RunStarted { .. }
));
}
#[test]
fn start_immediate_append_from_ready() {
let mut auth = make_authority();
let t = auth
.apply(TurnExecutionInput::StartImmediateAppend {
run_id: test_run_id(),
})
.expect("start immediate append");
assert_eq!(t.next_phase, TurnPhase::ApplyingPrimitive);
assert_eq!(auth.primitive_kind(), TurnPrimitiveKind::ImmediateAppend);
}
#[test]
fn start_immediate_context_from_ready() {
let mut auth = make_authority();
let t = auth
.apply(TurnExecutionInput::StartImmediateContext {
run_id: test_run_id(),
})
.expect("start immediate context");
assert_eq!(t.next_phase, TurnPhase::ApplyingPrimitive);
assert_eq!(
auth.primitive_kind(),
TurnPrimitiveKind::ImmediateContextAppend
);
}
#[test]
fn start_from_non_ready_is_rejected() {
let mut auth = authority_at_calling_llm();
assert!(
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.is_err()
);
}
#[test]
fn primitive_applied_conversation_turn_goes_to_calling_llm() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start");
let t = auth
.apply(TurnExecutionInput::PrimitiveApplied {
run_id: test_run_id(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: true,
image_tool_results_enabled: true,
})
.expect("primitive applied");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
assert!(auth.vision_enabled());
assert!(auth.image_tool_results_enabled());
assert_eq!(t.effects.len(), 1);
assert!(matches!(t.effects[0], TurnExecutionEffect::CheckCompaction));
}
#[test]
fn primitive_applied_immediate_append_completes() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartImmediateAppend {
run_id: test_run_id(),
})
.expect("start");
let t = auth
.apply(TurnExecutionInput::PrimitiveApplied {
run_id: test_run_id(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: false,
image_tool_results_enabled: false,
})
.expect("primitive applied");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert_eq!(auth.boundary_count(), 1);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Completed);
assert_eq!(t.effects.len(), 2);
assert!(matches!(
t.effects[0],
TurnExecutionEffect::BoundaryApplied { .. }
));
assert!(matches!(
t.effects[1],
TurnExecutionEffect::RunCompleted { .. }
));
}
#[test]
fn primitive_applied_immediate_append_with_cancel_after_boundary() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartImmediateAppend {
run_id: test_run_id(),
})
.expect("start");
auth.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: test_run_id(),
})
.expect("cancel after boundary");
let t = auth
.apply(TurnExecutionInput::PrimitiveApplied {
run_id: test_run_id(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: false,
image_tool_results_enabled: false,
})
.expect("primitive applied");
assert_eq!(t.next_phase, TurnPhase::Cancelled);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Cancelled);
assert!(!auth.cancel_after_boundary());
}
#[test]
fn primitive_applied_wrong_run_id_rejected() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start");
assert!(
auth.apply(TurnExecutionInput::PrimitiveApplied {
run_id: other_run_id(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: false,
image_tool_results_enabled: false,
})
.is_err()
);
}
#[test]
fn llm_returned_tool_calls() {
let mut auth = authority_at_calling_llm();
let t = auth
.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: test_run_id(),
tool_count: 5,
})
.expect("tool calls");
assert_eq!(t.next_phase, TurnPhase::WaitingForOps);
assert_eq!(auth.tool_calls_pending(), 5);
assert_eq!(auth.pending_op_refs(), None);
}
#[test]
fn llm_returned_tool_calls_zero_count_rejected() {
let mut auth = authority_at_calling_llm();
assert!(
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: test_run_id(),
tool_count: 0,
})
.is_err()
);
}
#[test]
fn llm_returned_terminal() {
let mut auth = authority_at_calling_llm();
let t = auth
.apply(TurnExecutionInput::LlmReturnedTerminal {
run_id: test_run_id(),
})
.expect("terminal");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
assert_eq!(auth.boundary_count(), 1);
assert_eq!(t.effects.len(), 1);
assert!(matches!(
t.effects[0],
TurnExecutionEffect::BoundaryApplied { .. }
));
}
#[test]
fn register_pending_ops_records_turn_local_wait_set() {
let mut auth = authority_at_waiting_for_ops();
let op_a = OperationId::new();
let op_b = OperationId::new();
let t = auth
.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![
AsyncOpRef::barrier(op_a.clone()),
AsyncOpRef::detached(op_b.clone()),
],
barrier_operation_ids: vec![op_a.clone()],
has_barrier_ops: true,
})
.expect("register pending ops");
assert_eq!(t.next_phase, TurnPhase::WaitingForOps);
assert!(auth.has_barrier_ops());
let refs = auth.pending_op_refs().expect("should have refs");
assert_eq!(refs.len(), 2);
assert_eq!(refs[0].operation_id, op_a);
assert_eq!(refs[0].wait_policy, WaitPolicy::Barrier);
assert_eq!(refs[1].operation_id, op_b);
assert_eq!(refs[1].wait_policy, WaitPolicy::Detached);
let barrier_ids = auth.barrier_op_ids();
assert_eq!(barrier_ids.len(), 1);
assert_eq!(*barrier_ids[0], op_a);
}
#[test]
fn tool_calls_resolved() {
let mut auth = authority_at_waiting_for_ops();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![],
barrier_operation_ids: vec![],
has_barrier_ops: false,
})
.expect("register empty pending ops");
let t = auth
.apply(TurnExecutionInput::ToolCallsResolved {
run_id: test_run_id(),
})
.expect("resolved");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
assert_eq!(auth.tool_calls_pending(), 0);
assert_eq!(auth.pending_op_refs(), None);
assert_eq!(auth.boundary_count(), 1);
}
#[test]
fn tool_calls_resolved_without_registered_wait_set_rejected() {
let mut auth = authority_at_waiting_for_ops();
assert!(
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: test_run_id(),
})
.is_err()
);
}
#[test]
fn tool_calls_resolved_from_wrong_phase_rejected() {
let mut auth = authority_at_waiting_for_ops();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![],
barrier_operation_ids: vec![],
has_barrier_ops: false,
})
.expect("register empty pending ops");
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: test_run_id(),
})
.expect("first resolve");
assert!(
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: test_run_id(),
})
.is_err()
);
}
#[test]
fn boundary_continue_goes_to_calling_llm() {
let mut auth = authority_at_draining_boundary();
let t = auth
.apply(TurnExecutionInput::BoundaryContinue {
run_id: test_run_id(),
})
.expect("boundary continue");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
}
#[test]
fn boundary_continue_with_cancel_after_boundary_goes_to_cancelled() {
let mut auth = authority_at_draining_boundary();
auth.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: test_run_id(),
})
.expect("cancel after boundary");
let t = auth
.apply(TurnExecutionInput::BoundaryContinue {
run_id: test_run_id(),
})
.expect("boundary continue");
assert_eq!(t.next_phase, TurnPhase::Cancelled);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Cancelled);
}
#[test]
fn boundary_complete_goes_to_completed() {
let mut auth = authority_at_draining_boundary();
let t = auth
.apply(TurnExecutionInput::BoundaryComplete {
run_id: test_run_id(),
})
.expect("boundary complete");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Completed);
assert_eq!(t.effects.len(), 1);
assert!(matches!(
t.effects[0],
TurnExecutionEffect::RunCompleted { .. }
));
}
#[test]
fn boundary_complete_with_cancel_after_boundary_goes_to_cancelled() {
let mut auth = authority_at_draining_boundary();
auth.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: test_run_id(),
})
.expect("cancel after boundary");
let t = auth
.apply(TurnExecutionInput::BoundaryComplete {
run_id: test_run_id(),
})
.expect("boundary complete");
assert_eq!(t.next_phase, TurnPhase::Cancelled);
}
#[test]
fn recoverable_failure_from_calling_llm() {
let mut auth = authority_at_calling_llm();
let t = auth
.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
assert_eq!(t.next_phase, TurnPhase::ErrorRecovery);
}
#[test]
fn recoverable_failure_from_waiting_for_ops() {
let mut auth = authority_at_waiting_for_ops();
let t = auth
.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
assert_eq!(t.next_phase, TurnPhase::ErrorRecovery);
}
#[test]
fn recoverable_failure_from_draining_boundary() {
let mut auth = authority_at_draining_boundary();
let t = auth
.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
assert_eq!(t.next_phase, TurnPhase::ErrorRecovery);
}
#[test]
fn recoverable_failure_from_waiting_for_ops_clears_pending_op_refs() {
let mut auth = authority_at_waiting_for_ops();
let barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![AsyncOpRef::barrier(barrier_id.clone())],
barrier_operation_ids: vec![barrier_id],
has_barrier_ops: true,
})
.expect("register pending ops");
auth.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
assert_eq!(auth.phase(), TurnPhase::ErrorRecovery);
assert_eq!(auth.pending_op_refs(), None);
}
#[test]
fn retry_requested_from_error_recovery() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
let t = auth
.apply(TurnExecutionInput::RetryRequested {
run_id: test_run_id(),
})
.expect("retry");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
}
#[test]
fn fatal_failure_from_various_phases() {
for phase_fn in [
authority_at_calling_llm as fn() -> TurnExecutionAuthority,
authority_at_waiting_for_ops,
authority_at_draining_boundary,
] {
let mut auth = phase_fn();
let t = auth
.apply(TurnExecutionInput::FatalFailure {
run_id: test_run_id(),
})
.expect("fatal failure should work");
assert_eq!(t.next_phase, TurnPhase::Failed);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Failed);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::RunFailed { .. }))
);
}
}
#[test]
fn fatal_failure_from_error_recovery() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
let t = auth
.apply(TurnExecutionInput::FatalFailure {
run_id: test_run_id(),
})
.expect("fatal");
assert_eq!(t.next_phase, TurnPhase::Failed);
}
#[test]
fn fatal_failure_from_applying_primitive() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start");
let t = auth
.apply(TurnExecutionInput::FatalFailure {
run_id: test_run_id(),
})
.expect("fatal");
assert_eq!(t.next_phase, TurnPhase::Failed);
}
#[test]
fn cancel_now_from_various_phases() {
for phase_fn in [
authority_at_calling_llm as fn() -> TurnExecutionAuthority,
authority_at_waiting_for_ops,
authority_at_draining_boundary,
] {
let mut auth = phase_fn();
let t = auth
.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel now");
assert_eq!(t.next_phase, TurnPhase::Cancelling);
}
}
#[test]
fn cancel_now_from_applying_primitive() {
let mut auth = make_authority();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
})
.expect("start");
let t = auth
.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel now");
assert_eq!(t.next_phase, TurnPhase::Cancelling);
}
#[test]
fn cancellation_observed() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel now");
let t = auth
.apply(TurnExecutionInput::CancellationObserved {
run_id: test_run_id(),
})
.expect("cancellation observed");
assert_eq!(t.next_phase, TurnPhase::Cancelled);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Cancelled);
assert!(!auth.cancel_after_boundary());
}
#[test]
fn cancel_now_from_waiting_for_ops_clears_pending_op_refs() {
let mut auth = authority_at_waiting_for_ops();
let barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: test_run_id(),
op_refs: vec![AsyncOpRef::barrier(barrier_id.clone())],
barrier_operation_ids: vec![barrier_id],
has_barrier_ops: true,
})
.expect("register pending ops");
auth.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel now");
assert_eq!(auth.phase(), TurnPhase::Cancelling);
assert_eq!(auth.pending_op_refs(), None);
}
#[test]
fn cancel_after_boundary_stays_in_same_phase() {
let mut auth = authority_at_calling_llm();
let t = auth
.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: test_run_id(),
})
.expect("cancel after boundary");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
assert!(auth.cancel_after_boundary());
}
#[test]
fn cancel_after_boundary_from_waiting_for_ops() {
let mut auth = authority_at_waiting_for_ops();
let t = auth
.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: test_run_id(),
})
.expect("cancel after boundary");
assert_eq!(t.next_phase, TurnPhase::WaitingForOps);
assert!(auth.cancel_after_boundary());
}
#[test]
fn acknowledge_terminal_from_completed_resets_to_ready() {
let mut auth = authority_at_draining_boundary();
auth.apply(TurnExecutionInput::BoundaryComplete {
run_id: test_run_id(),
})
.expect("complete");
assert_eq!(auth.phase(), TurnPhase::Completed);
let t = auth
.apply(TurnExecutionInput::AcknowledgeTerminal {
run_id: test_run_id(),
})
.expect("acknowledge");
assert_eq!(t.next_phase, TurnPhase::Ready);
assert_eq!(auth.active_run(), None);
assert_eq!(auth.primitive_kind(), TurnPrimitiveKind::None);
assert_eq!(auth.boundary_count(), 0);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::None);
}
#[test]
fn acknowledge_terminal_from_failed_resets_to_ready() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::FatalFailure {
run_id: test_run_id(),
})
.expect("fatal");
let t = auth
.apply(TurnExecutionInput::AcknowledgeTerminal {
run_id: test_run_id(),
})
.expect("acknowledge");
assert_eq!(t.next_phase, TurnPhase::Ready);
}
#[test]
fn acknowledge_terminal_from_cancelled_resets_to_ready() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel");
auth.apply(TurnExecutionInput::CancellationObserved {
run_id: test_run_id(),
})
.expect("observed");
let t = auth
.apply(TurnExecutionInput::AcknowledgeTerminal {
run_id: test_run_id(),
})
.expect("acknowledge");
assert_eq!(t.next_phase, TurnPhase::Ready);
}
#[test]
fn acknowledge_terminal_wrong_run_id_rejected() {
let mut auth = authority_at_draining_boundary();
auth.apply(TurnExecutionInput::BoundaryComplete {
run_id: test_run_id(),
})
.expect("complete");
assert!(
auth.apply(TurnExecutionInput::AcknowledgeTerminal {
run_id: other_run_id(),
})
.is_err()
);
}
#[test]
fn wrong_run_id_rejected_everywhere() {
let mut auth = authority_at_calling_llm();
assert!(
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: other_run_id(),
tool_count: 1,
})
.is_err()
);
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
}
#[test]
fn full_conversation_turn_happy_path() {
let mut auth = make_authority();
let rid = test_run_id();
auth.apply(TurnExecutionInput::StartConversationRun {
run_id: rid.clone(),
})
.expect("start");
auth.apply(TurnExecutionInput::PrimitiveApplied {
run_id: rid.clone(),
admitted_content_shape: ContentShape("text".into()),
vision_enabled: false,
image_tool_results_enabled: false,
})
.expect("primitive");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: rid.clone(),
tool_count: 2,
})
.expect("tool calls");
assert_eq!(auth.phase(), TurnPhase::WaitingForOps);
let barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![AsyncOpRef::barrier(barrier_id.clone())],
barrier_operation_ids: vec![barrier_id.clone()],
has_barrier_ops: true,
})
.expect("register pending ops");
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid.clone(),
operation_ids: vec![barrier_id],
})
.expect("barrier satisfied");
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: rid.clone(),
})
.expect("resolved");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
assert_eq!(auth.boundary_count(), 1);
auth.apply(TurnExecutionInput::BoundaryContinue {
run_id: rid.clone(),
})
.expect("continue");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
auth.apply(TurnExecutionInput::LlmReturnedTerminal {
run_id: rid.clone(),
})
.expect("terminal");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
assert_eq!(auth.boundary_count(), 2);
let t = auth
.apply(TurnExecutionInput::BoundaryComplete {
run_id: rid.clone(),
})
.expect("complete");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::RunCompleted { .. }))
);
auth.apply(TurnExecutionInput::AcknowledgeTerminal { run_id: rid })
.expect("ack");
assert_eq!(auth.phase(), TurnPhase::Ready);
assert_eq!(auth.active_run(), None);
}
#[test]
fn full_error_recovery_path() {
let mut auth = authority_at_calling_llm();
let rid = test_run_id();
auth.apply(TurnExecutionInput::RecoverableFailure {
run_id: rid.clone(),
})
.expect("recoverable");
assert_eq!(auth.phase(), TurnPhase::ErrorRecovery);
auth.apply(TurnExecutionInput::RetryRequested {
run_id: rid.clone(),
})
.expect("retry");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
auth.apply(TurnExecutionInput::FatalFailure { run_id: rid })
.expect("fatal");
assert_eq!(auth.phase(), TurnPhase::Failed);
}
#[test]
fn full_cancel_after_boundary_path() {
let mut auth = authority_at_calling_llm();
let rid = test_run_id();
auth.apply(TurnExecutionInput::CancelAfterBoundary {
run_id: rid.clone(),
})
.expect("cancel after boundary");
assert!(auth.cancel_after_boundary());
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: rid.clone(),
tool_count: 1,
})
.expect("tool calls");
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![],
barrier_operation_ids: vec![],
has_barrier_ops: false,
})
.expect("register empty pending ops");
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: rid.clone(),
})
.expect("resolved");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
let t = auth
.apply(TurnExecutionInput::BoundaryContinue { run_id: rid })
.expect("boundary continue");
assert_eq!(t.next_phase, TurnPhase::Cancelled);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Cancelled);
}
#[test]
fn can_accept_probes_without_mutation() {
let auth = make_authority();
assert!(auth.can_accept(&TurnExecutionInput::StartConversationRun {
run_id: test_run_id(),
}));
assert!(!auth.can_accept(&TurnExecutionInput::LlmReturnedTerminal {
run_id: test_run_id(),
}));
assert_eq!(auth.phase(), TurnPhase::Ready);
}
#[test]
fn turn_phase_to_loop_state_mapping() {
use crate::state::LoopState;
assert_eq!(TurnPhase::Ready.to_loop_state(), LoopState::CallingLlm);
assert_eq!(
TurnPhase::ApplyingPrimitive.to_loop_state(),
LoopState::CallingLlm
);
assert_eq!(TurnPhase::CallingLlm.to_loop_state(), LoopState::CallingLlm);
assert_eq!(
TurnPhase::WaitingForOps.to_loop_state(),
LoopState::WaitingForOps
);
assert_eq!(
TurnPhase::DrainingBoundary.to_loop_state(),
LoopState::DrainingEvents
);
assert_eq!(
TurnPhase::Extracting.to_loop_state(),
LoopState::DrainingEvents
);
assert_eq!(
TurnPhase::ErrorRecovery.to_loop_state(),
LoopState::ErrorRecovery
);
assert_eq!(TurnPhase::Cancelling.to_loop_state(), LoopState::Cancelling);
assert_eq!(TurnPhase::Completed.to_loop_state(), LoopState::Completed);
assert_eq!(TurnPhase::Failed.to_loop_state(), LoopState::Completed);
assert_eq!(TurnPhase::Cancelled.to_loop_state(), LoopState::Completed);
}
fn authority_at_draining_boundary_terminal() -> TurnExecutionAuthority {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::LlmReturnedTerminal {
run_id: test_run_id(),
})
.expect("llm returned terminal");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
auth
}
#[test]
fn enter_extraction_from_draining_boundary() {
let mut auth = authority_at_draining_boundary_terminal();
let t = auth
.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 3,
})
.expect("enter extraction");
assert_eq!(t.next_phase, TurnPhase::Extracting);
assert_eq!(auth.max_extraction_retries(), 3);
assert_eq!(auth.extraction_attempts(), 0);
}
#[test]
fn extraction_start_goes_to_calling_llm() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 3,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::ExtractionStart {
run_id: test_run_id(),
})
.expect("extraction start");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
assert_eq!(auth.extraction_attempts(), 0);
}
#[test]
fn extraction_validation_passed_completes() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 3,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::ExtractionValidationPassed {
run_id: test_run_id(),
})
.expect("validation passed");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert_eq!(auth.terminal_outcome(), TurnTerminalOutcome::Completed);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::RunCompleted { .. }))
);
}
#[test]
fn extraction_validation_failed_retries_when_attempts_remain() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 2,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::ExtractionValidationFailed {
run_id: test_run_id(),
error: "bad json".into(),
})
.expect("validation failed with retries remaining");
assert_eq!(t.next_phase, TurnPhase::CallingLlm);
assert_eq!(auth.extraction_attempts(), 1);
}
#[test]
fn extraction_validation_failed_exhausts_when_no_retries_remain() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 1,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::ExtractionValidationFailed {
run_id: test_run_id(),
error: "bad json".into(),
})
.expect("exhausted");
assert_eq!(t.next_phase, TurnPhase::Failed);
assert_eq!(
auth.terminal_outcome(),
TurnTerminalOutcome::StructuredOutputValidationFailed
);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::RunFailed { .. }))
);
}
#[test]
fn full_extraction_retry_cycle() {
let mut auth = authority_at_draining_boundary_terminal();
let rid = test_run_id();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: rid.clone(),
max_retries: 2,
})
.expect("enter");
assert_eq!(auth.phase(), TurnPhase::Extracting);
auth.apply(TurnExecutionInput::ExtractionStart {
run_id: rid.clone(),
})
.expect("start");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
assert_eq!(auth.extraction_attempts(), 0);
auth.apply(TurnExecutionInput::LlmReturnedTerminal {
run_id: rid.clone(),
})
.expect("llm terminal");
assert_eq!(auth.phase(), TurnPhase::DrainingBoundary);
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: rid.clone(),
max_retries: 2,
})
.expect("re-enter");
assert_eq!(auth.phase(), TurnPhase::Extracting);
assert_eq!(auth.extraction_attempts(), 0);
auth.apply(TurnExecutionInput::ExtractionValidationFailed {
run_id: rid.clone(),
error: "bad json".into(),
})
.expect("validation failed");
assert_eq!(auth.phase(), TurnPhase::CallingLlm);
assert_eq!(auth.extraction_attempts(), 1);
auth.apply(TurnExecutionInput::LlmReturnedTerminal {
run_id: rid.clone(),
})
.expect("llm terminal 2");
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: rid.clone(),
max_retries: 2,
})
.expect("re-enter 2");
let t = auth
.apply(TurnExecutionInput::ExtractionValidationPassed { run_id: rid })
.expect("passed");
assert_eq!(t.next_phase, TurnPhase::Completed);
}
#[test]
fn extraction_wrong_run_id_rejected() {
let mut auth = authority_at_draining_boundary_terminal();
assert!(
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: other_run_id(),
max_retries: 3,
})
.is_err()
);
}
#[test]
fn fatal_failure_from_extracting() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 3,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::FatalFailure {
run_id: test_run_id(),
})
.expect("fatal");
assert_eq!(t.next_phase, TurnPhase::Failed);
}
#[test]
fn cancel_now_from_extracting() {
let mut auth = authority_at_draining_boundary_terminal();
auth.apply(TurnExecutionInput::EnterExtraction {
run_id: test_run_id(),
max_retries: 3,
})
.expect("enter extraction");
let t = auth
.apply(TurnExecutionInput::CancelNow {
run_id: test_run_id(),
})
.expect("cancel");
assert_eq!(t.next_phase, TurnPhase::Cancelling);
}
#[test]
fn all_barrier_ops_block_tool_calls_resolved_until_satisfied() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
let op_a = OperationId::new();
let op_b = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![
AsyncOpRef::barrier(op_a.clone()),
AsyncOpRef::barrier(op_b.clone()),
],
barrier_operation_ids: vec![op_a, op_b],
has_barrier_ops: true,
})
.expect("register all-barrier ops");
assert!(auth.has_barrier_ops());
assert!(!auth.barrier_satisfied());
assert_eq!(auth.barrier_op_ids().len(), 2);
assert!(
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: rid.clone(),
})
.is_err()
);
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid.clone(),
operation_ids: auth.barrier_op_ids().into_iter().cloned().collect(),
})
.expect("barrier satisfied");
assert!(auth.barrier_satisfied());
let t = auth
.apply(TurnExecutionInput::ToolCallsResolved { run_id: rid })
.expect("tool calls resolved");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
}
#[test]
fn mixed_barrier_and_detached_ops_still_block_until_barrier_satisfied() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
let barrier_id = OperationId::new();
let detached_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![
AsyncOpRef::barrier(barrier_id.clone()),
AsyncOpRef::detached(detached_id),
],
barrier_operation_ids: vec![barrier_id],
has_barrier_ops: true,
})
.expect("register mixed ops");
assert!(auth.has_barrier_ops());
assert!(!auth.barrier_satisfied());
assert_eq!(auth.barrier_op_ids().len(), 1);
assert_eq!(auth.pending_op_refs().unwrap().len(), 2);
assert!(
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: rid.clone(),
})
.is_err()
);
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid.clone(),
operation_ids: auth.barrier_op_ids().into_iter().cloned().collect(),
})
.expect("barrier satisfied");
let t = auth
.apply(TurnExecutionInput::ToolCallsResolved { run_id: rid })
.expect("resolved");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
}
#[test]
fn detached_only_ops_do_not_block_tool_calls_resolved() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![
AsyncOpRef::detached(OperationId::new()),
AsyncOpRef::detached(OperationId::new()),
],
barrier_operation_ids: vec![],
has_barrier_ops: false,
})
.expect("register detached-only ops");
assert!(!auth.has_barrier_ops());
assert!(auth.barrier_satisfied());
assert_eq!(auth.barrier_op_ids().len(), 0);
let t = auth
.apply(TurnExecutionInput::ToolCallsResolved { run_id: rid })
.expect("resolved without barrier");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
}
#[test]
fn ops_barrier_satisfied_rejected_when_already_satisfied() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![],
barrier_operation_ids: vec![],
has_barrier_ops: false,
})
.expect("register");
assert!(auth.barrier_satisfied());
assert!(
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid,
operation_ids: vec![],
})
.is_err()
);
}
#[test]
fn ops_barrier_satisfied_wrong_run_id_rejected() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
let barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid,
op_refs: vec![AsyncOpRef::barrier(barrier_id.clone())],
barrier_operation_ids: vec![barrier_id],
has_barrier_ops: true,
})
.expect("register");
assert!(
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: other_run_id(),
operation_ids: auth.barrier_op_ids().into_iter().cloned().collect(),
})
.is_err()
);
}
#[test]
fn barrier_satisfied_reset_after_tool_calls_resolved() {
let mut auth = authority_at_waiting_for_ops();
let rid = test_run_id();
let first_barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![AsyncOpRef::barrier(first_barrier_id.clone())],
barrier_operation_ids: vec![first_barrier_id],
has_barrier_ops: true,
})
.expect("register");
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid.clone(),
operation_ids: auth.barrier_op_ids().into_iter().cloned().collect(),
})
.expect("satisfy");
auth.apply(TurnExecutionInput::ToolCallsResolved {
run_id: rid.clone(),
})
.expect("resolved");
assert!(auth.barrier_satisfied());
assert!(!auth.has_barrier_ops());
auth.apply(TurnExecutionInput::BoundaryContinue {
run_id: rid.clone(),
})
.expect("continue");
auth.apply(TurnExecutionInput::LlmReturnedToolCalls {
run_id: rid.clone(),
tool_count: 1,
})
.expect("tool calls");
assert!(auth.barrier_satisfied());
let second_barrier_id = OperationId::new();
auth.apply(TurnExecutionInput::RegisterPendingOps {
run_id: rid.clone(),
op_refs: vec![AsyncOpRef::barrier(second_barrier_id.clone())],
barrier_operation_ids: vec![second_barrier_id],
has_barrier_ops: true,
})
.expect("register again");
assert!(!auth.barrier_satisfied());
auth.apply(TurnExecutionInput::OpsBarrierSatisfied {
run_id: rid.clone(),
operation_ids: auth.barrier_op_ids().into_iter().cloned().collect(),
})
.expect("satisfy again");
let t = auth
.apply(TurnExecutionInput::ToolCallsResolved { run_id: rid })
.expect("resolved again");
assert_eq!(t.next_phase, TurnPhase::DrainingBoundary);
}
#[test]
fn time_budget_exceeded_from_various_phases() {
for phase_fn in [
authority_at_calling_llm as fn() -> TurnExecutionAuthority,
authority_at_waiting_for_ops,
authority_at_draining_boundary,
] {
let mut auth = phase_fn();
let t = auth
.apply(TurnExecutionInput::TimeBudgetExceeded {
run_id: test_run_id(),
})
.expect("time budget exceeded should work");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert_eq!(
auth.terminal_outcome(),
TurnTerminalOutcome::TimeBudgetExceeded
);
assert!(auth.boundary_count() > 0);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::BoundaryApplied { .. }))
);
assert!(
t.effects
.iter()
.any(|e| matches!(e, TurnExecutionEffect::RunCompleted { .. }))
);
}
}
#[test]
fn time_budget_exceeded_classified_as_hard_failure() {
use crate::generated::terminal_surface_mapping::{SurfaceResultClass, classify_terminal};
assert_eq!(
classify_terminal(&TurnTerminalOutcome::TimeBudgetExceeded),
Some(SurfaceResultClass::HardFailure)
);
}
#[test]
fn budget_exhausted_still_classified_as_success() {
use crate::generated::terminal_surface_mapping::{SurfaceResultClass, classify_terminal};
assert_eq!(
classify_terminal(&TurnTerminalOutcome::BudgetExhausted),
Some(SurfaceResultClass::Success)
);
}
#[test]
fn fatal_failure_classified_as_hard_failure() {
use crate::generated::terminal_surface_mapping::{SurfaceResultClass, classify_terminal};
assert_eq!(
classify_terminal(&TurnTerminalOutcome::Failed),
Some(SurfaceResultClass::HardFailure)
);
}
#[test]
fn time_budget_exceeded_from_error_recovery() {
let mut auth = authority_at_calling_llm();
auth.apply(TurnExecutionInput::RecoverableFailure {
run_id: test_run_id(),
})
.expect("recoverable");
assert_eq!(auth.phase(), TurnPhase::ErrorRecovery);
let t = auth
.apply(TurnExecutionInput::TimeBudgetExceeded {
run_id: test_run_id(),
})
.expect("time budget exceeded from error recovery");
assert_eq!(t.next_phase, TurnPhase::Completed);
assert_eq!(
auth.terminal_outcome(),
TurnTerminalOutcome::TimeBudgetExceeded
);
}
}