1use futures::stream::StreamExt;
25use std::time::{SystemTime, UNIX_EPOCH};
26use tokio_util::sync::CancellationToken;
27
28use crate::config::LoopConfig;
29use crate::error::{LoopError, StreamError};
30use crate::event::AgentEvent;
31use crate::exec::{execute_tool_batch, ExecutedBatch};
32use crate::plugin::TransformContext;
33use crate::stream::{ReasoningEffort, StreamErrorKind, StreamEvent, StreamRequest, ToolSchema};
34use crate::types::{
35 AgentContext, AgentMessage, AssistantContent, StopReason, ToolResultContent, Usage,
36};
37
38const EMPTY_STREAM_MAX_ATTEMPTS: u8 = 3;
39const EMPTY_STREAM_RETRY_INITIAL_DELAY: std::time::Duration = std::time::Duration::from_millis(250);
40const ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS: u8 = 2;
41const ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY: std::time::Duration =
42 std::time::Duration::from_millis(500);
43const ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT: &str = "\
44[runtime context — transport recovery, not user instruction]\n\
45The previous provider attempt produced no actionable output: no visible assistant text and no usable tool call reached the runtime. \
46It may have produced private-only reasoning or an unusable burst of partial tool calls. \
47Do not continue with private reasoning only. Re-read the latest observation and immediately choose exactly one next structured tool call; \
48if the answer is ready, use the final response tool.";
49
50const MAX_PLAIN_TEXT_NUDGE_RETRIES: usize = 2;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum LoopOutcome {
64 Done,
67 WrappedUp,
72 HitMaxIterations,
76}
77
78impl LoopOutcome {
79 pub fn is_complete(self) -> bool {
81 matches!(self, LoopOutcome::Done | LoopOutcome::WrappedUp)
82 }
83
84 pub fn label(self) -> &'static str {
86 match self {
87 LoopOutcome::Done => "done",
88 LoopOutcome::WrappedUp => "wrapped_up",
89 LoopOutcome::HitMaxIterations => "hit_max_iterations",
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
101pub struct RunResult {
102 pub messages: Vec<AgentMessage>,
103 pub outcome: LoopOutcome,
104}
105
106pub async fn run(
115 prompts: Vec<AgentMessage>,
116 context: AgentContext,
117 config: &LoopConfig,
118 signal: CancellationToken,
119) -> Result<RunResult, LoopError> {
120 let mut current = context;
121 let mut new_messages = prompts.clone();
122
123 current.messages.extend(prompts.iter().cloned());
124
125 emit(config, AgentEvent::AgentStart).await;
126 if let Some(identity) = current.identity.clone() {
127 emit(config, AgentEvent::RunIdentified { identity }).await;
128 }
129 emit(config, AgentEvent::TurnStart).await;
130 for prompt in &prompts {
131 emit(
132 config,
133 AgentEvent::MessageStart {
134 message: prompt.clone(),
135 },
136 )
137 .await;
138 emit(
139 config,
140 AgentEvent::MessageEnd {
141 message: prompt.clone(),
142 },
143 )
144 .await;
145 }
146
147 let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
148
149 Ok(RunResult {
150 messages: new_messages,
151 outcome,
152 })
153}
154
155pub async fn run_continue(
162 context: AgentContext,
163 config: &LoopConfig,
164 signal: CancellationToken,
165) -> Result<RunResult, LoopError> {
166 let last = context
167 .messages
168 .last()
169 .ok_or_else(|| LoopError::InvalidContinuation("no messages in context".into()))?;
170 if matches!(last, AgentMessage::Assistant { .. }) {
171 return Err(LoopError::InvalidContinuation(
172 "trailing message is assistant".into(),
173 ));
174 }
175
176 let mut current = context;
177 let mut new_messages = Vec::new();
178
179 emit(config, AgentEvent::AgentStart).await;
180 if let Some(identity) = current.identity.clone() {
181 emit(config, AgentEvent::RunIdentified { identity }).await;
182 }
183 emit(config, AgentEvent::TurnStart).await;
184
185 let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
186
187 Ok(RunResult {
188 messages: new_messages,
189 outcome,
190 })
191}
192
193async fn emit(config: &LoopConfig, event: AgentEvent) {
196 config.event_sink.emit(event.clone()).await;
197 for observer in &config.plugins.event_observer {
198 observer.on_event(&event).await;
199 }
200}
201
202async fn inner_run(
203 current: &mut AgentContext,
204 new_messages: &mut Vec<AgentMessage>,
205 config: &LoopConfig,
206 signal: &CancellationToken,
207) -> Result<LoopOutcome, LoopError> {
208 let mut first_turn = true;
209 let mut iterations: usize = 0;
210 let mut empty_outcomes_seen: usize = 0;
211 let mut last_turn_stopped_without_tool = false;
212 let mut plain_text_terminal_fallback_candidate: Option<AgentMessage> = None;
213
214 let mut pending = collect_steering(config).await;
217
218 'outer: loop {
219 let mut has_more_tool_calls = true;
220 let mut last_batch_terminated = false;
236
237 while has_more_tool_calls || !pending.is_empty() {
238 if signal.is_cancelled() {
239 return Err(LoopError::Aborted);
240 }
241 if let Some(max) = config.max_iterations {
242 if iterations >= max {
243 break;
249 }
250 }
251 iterations += 1;
252
253 if !first_turn {
254 emit(config, AgentEvent::TurnStart).await;
255 } else {
256 first_turn = false;
257 }
258
259 if !pending.is_empty() {
261 for msg in pending.drain(..) {
262 emit(
263 config,
264 AgentEvent::MessageStart {
265 message: msg.clone(),
266 },
267 )
268 .await;
269 emit(
270 config,
271 AgentEvent::MessageEnd {
272 message: msg.clone(),
273 },
274 )
275 .await;
276 current.messages.push(msg.clone());
277 new_messages.push(msg);
278 }
279 }
280
281 let (assistant, turn_allowlist) =
288 stream_with_max_tokens_recovery(current, config, signal, iterations - 1).await?;
289 current.messages.push(assistant.clone());
294 new_messages.push(assistant.clone());
295
296 let stop_reason = match &assistant {
303 AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
304 _ => StopReason::Other,
305 };
306 if matches!(stop_reason, StopReason::Error | StopReason::Aborted) {
307 let loop_error = match &assistant {
308 AgentMessage::Assistant {
309 stop_reason: StopReason::Aborted,
310 ..
311 } => LoopError::Aborted,
312 AgentMessage::Assistant { error_message, .. } => LoopError::Stream(
313 StreamError::Transient(error_message.clone().unwrap_or_else(|| {
314 "assistant stream ended with error stop reason".into()
315 })),
316 ),
317 _ => LoopError::Stream(StreamError::Transient(
318 "assistant stream ended with error stop reason".into(),
319 )),
320 };
321 emit(
322 config,
323 AgentEvent::TurnEnd {
324 message: assistant,
325 tool_results: Vec::new(),
326 },
327 )
328 .await;
329 emit(
330 config,
331 AgentEvent::AgentEnd {
332 messages: new_messages.clone(),
333 },
334 )
335 .await;
336 return Err(loop_error);
337 }
338
339 let tool_calls: Vec<_> = match &assistant {
341 AgentMessage::Assistant { content, .. } => {
342 content.tool_calls().into_iter().cloned().collect()
343 }
344 _ => Vec::new(),
345 };
346 last_turn_stopped_without_tool = tool_calls.is_empty();
347 if last_turn_stopped_without_tool {
348 empty_outcomes_seen = empty_outcomes_seen.saturating_add(1);
349 }
350
351 let mut tool_result_messages = Vec::new();
352 has_more_tool_calls = false;
353
354 if tool_calls.is_empty() {
355 if let Some(tool_name) = config.plain_text_terminal_fallback_tool.as_deref() {
356 let eager = config.plain_text_terminal_fallback_eager;
357 let terminal_tool_names = config.protocol.terminal_tool_names();
358 let narrowed_to_terminators = is_terminal_only_allowlist(
359 turn_allowlist.as_ref(),
360 tool_name,
361 &terminal_tool_names,
362 );
363 let preserve_plain_text_candidate = plain_assistant_text(&assistant)
364 .is_some_and(|text| should_preserve_plain_text_terminal_candidate(&text));
365 if plain_text_terminal_fallback_candidate.is_none()
366 && preserve_plain_text_candidate
367 {
368 plain_text_terminal_fallback_candidate = Some(assistant.clone());
369 }
370 let nudge_mode = config.plain_text_terminal_fallback_eager_nudge
371 && eager
372 && !narrowed_to_terminators
373 && empty_outcomes_seen <= MAX_PLAIN_TEXT_NUDGE_RETRIES;
374 if nudge_mode {
375 let available_tool_names: Vec<&str> =
396 config.tools.iter().map(|t| t.name()).collect();
397 let nudge_text = config
398 .protocol
399 .plain_text_recovery_prompt(crate::protocol::PlainTextRecoveryContext {
400 messages: ¤t.messages,
401 iteration: iterations - 1,
402 available_tool_names: &available_tool_names,
403 terminal_fallback_tool: Some(tool_name),
404 })
405 .unwrap_or_else(|| {
406 crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT.to_string()
407 });
408 let nudge = AgentMessage::System {
409 content: nudge_text,
410 timestamp: Some(now_ms()),
411 };
412 current.messages.push(nudge.clone());
413 new_messages.push(nudge);
414 has_more_tool_calls = true;
415 } else if let Some(result_msg) = synthesize_plain_text_terminal_result(
416 plain_text_terminal_fallback_candidate
417 .as_ref()
418 .unwrap_or(&assistant),
419 turn_allowlist.as_ref(),
420 tool_name,
421 eager,
422 &terminal_tool_names,
423 ) {
424 plain_text_terminal_fallback_candidate = None;
425 last_turn_stopped_without_tool = false;
426 empty_outcomes_seen = 0;
427 last_batch_terminated = true;
428 current.messages.push(result_msg.clone());
429 new_messages.push(result_msg.clone());
430 tool_result_messages.push(result_msg);
431 }
432 }
433 } else {
434 let ExecutedBatch {
435 messages,
436 terminate,
437 } = execute_tool_batch(
438 &assistant,
439 tool_calls,
440 current,
441 config,
442 signal,
443 turn_allowlist.as_ref(),
444 )
445 .await?;
446
447 empty_outcomes_seen = 0;
450 plain_text_terminal_fallback_candidate = None;
451 tool_result_messages = messages;
452 has_more_tool_calls = !terminate;
453 last_batch_terminated = terminate;
454
455 for result_msg in &tool_result_messages {
456 current.messages.push(result_msg.clone());
457 new_messages.push(result_msg.clone());
458 }
459 }
460
461 emit(
462 config,
463 AgentEvent::TurnEnd {
464 message: assistant,
465 tool_results: tool_result_messages,
466 },
467 )
468 .await;
469
470 pending = if last_batch_terminated {
476 Vec::new()
477 } else {
478 collect_steering(config).await
479 };
480 }
481
482 let cap_hit = config.max_iterations.is_some_and(|max| iterations >= max);
490 let follow_up = if last_batch_terminated {
496 Vec::new()
497 } else {
498 collect_follow_up(config).await
499 };
500 if last_turn_stopped_without_tool {
501 if let Some(budget) = config.empty_outcome_retry_budget {
502 if empty_outcomes_seen > budget {
503 emit(
504 config,
505 AgentEvent::AgentEnd {
506 messages: new_messages.clone(),
507 },
508 )
509 .await;
510 return Err(LoopError::EmptyOutcomeBudgetExhausted {
511 budget,
512 observed: empty_outcomes_seen,
513 });
514 }
515 }
516 }
517 if !follow_up.is_empty() && !cap_hit {
518 pending = follow_up;
519 continue 'outer;
520 }
521 if cap_hit {
526 for msg in follow_up {
527 emit(
528 config,
529 AgentEvent::MessageStart {
530 message: msg.clone(),
531 },
532 )
533 .await;
534 emit(
535 config,
536 AgentEvent::MessageEnd {
537 message: msg.clone(),
538 },
539 )
540 .await;
541 current.messages.push(msg.clone());
542 new_messages.push(msg);
543 }
544 }
545
546 break;
547 }
548
549 emit(
550 config,
551 AgentEvent::AgentEnd {
552 messages: new_messages.clone(),
553 },
554 )
555 .await;
556
557 let cap_hit_final = config.max_iterations.is_some_and(|max| iterations >= max);
566 let wrap_up_fired = config
567 .grace_signal
568 .as_ref()
569 .is_some_and(|flag| flag.load(std::sync::atomic::Ordering::Relaxed));
570 let outcome = if cap_hit_final {
571 LoopOutcome::HitMaxIterations
572 } else if wrap_up_fired {
573 LoopOutcome::WrappedUp
574 } else {
575 LoopOutcome::Done
576 };
577 Ok(outcome)
578}
579
580async fn collect_steering(config: &LoopConfig) -> Vec<AgentMessage> {
581 let mut out = Vec::new();
582 for source in &config.plugins.steering {
583 out.extend(source.next_steering_messages().await);
584 }
585 out
586}
587
588async fn collect_follow_up(config: &LoopConfig) -> Vec<AgentMessage> {
589 let mut out = Vec::new();
590 for source in &config.plugins.follow_up {
591 out.extend(source.next_follow_up_messages().await);
592 }
593 out
594}
595
596fn synthesize_plain_text_terminal_result(
597 assistant: &AgentMessage,
598 turn_allowlist: Option<&std::collections::HashSet<String>>,
599 tool_name: &str,
600 eager: bool,
601 terminal_tool_names: &std::collections::HashSet<String>,
602) -> Option<AgentMessage> {
603 if !eager && !is_terminal_only_allowlist(turn_allowlist, tool_name, terminal_tool_names) {
610 return None;
611 }
612 let text = plain_assistant_text(assistant)?;
613 Some(AgentMessage::ToolResult {
614 tool_call_id: format!("plain_text_terminal_fallback_{}", now_ms()),
615 tool_name: tool_name.to_string(),
616 content: ToolResultContent::text(text),
617 is_error: false,
618 narration: Some(
619 "Converted plain assistant text into terminal delivery for an auto-tool-choice provider."
620 .to_string(),
621 ),
622 details: None,
623 timestamp: Some(now_ms()),
624 })
625}
626
627fn plain_assistant_text(assistant: &AgentMessage) -> Option<String> {
628 let AgentMessage::Assistant { content, .. } = assistant else {
629 return None;
630 };
631 let text = crate::strip_thinking_tags(&content.plain_text())
632 .trim()
633 .to_string();
634 (!text.is_empty()).then_some(text)
635}
636
637fn should_preserve_plain_text_terminal_candidate(text: &str) -> bool {
638 !looks_like_permission_or_clarification_question(text)
639}
640
641fn looks_like_permission_or_clarification_question(text: &str) -> bool {
642 let trimmed = text.trim();
643 if !trimmed.contains('?') {
644 return false;
645 }
646 let lower = trimmed.to_ascii_lowercase();
647 let starts_with_prompt = [
648 "would you like",
649 "shall i",
650 "should i",
651 "do you want",
652 "what would you like",
653 "what do you need",
654 "what's your next move",
655 "what is your next move",
656 "continue what",
657 ]
658 .iter()
659 .any(|prefix| lower.starts_with(prefix));
660 starts_with_prompt
661 || (trimmed.len() <= 500
662 && lower.contains("what")
663 && (lower.contains("next") || lower.contains("continue")))
664}
665
666fn is_terminal_only_allowlist(
675 turn_allowlist: Option<&std::collections::HashSet<String>>,
676 terminal_tool: &str,
677 terminal_tool_names: &std::collections::HashSet<String>,
678) -> bool {
679 let Some(allowlist) = turn_allowlist else {
680 return false;
681 };
682 !allowlist.is_empty()
683 && allowlist.contains(terminal_tool)
684 && allowlist
685 .iter()
686 .all(|tool| tool == terminal_tool || terminal_tool_names.contains(tool))
687}
688
689async fn stream_with_max_tokens_recovery(
705 context: &AgentContext,
706 config: &LoopConfig,
707 signal: &CancellationToken,
708 iteration: usize,
709) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
710 let mut current_cap = config.max_output_tokens;
711 let mut max_tokens_attempt: u8 = 0;
712 let mut empty_stream_attempts: u8 = 0;
713 let mut zero_output_transport_attempts: u8 = 0;
714 let mut zero_output_recovery_context: Option<AgentContext> = None;
715 let mut reasoning = config.reasoning;
716
717 loop {
718 let attempt_context = zero_output_recovery_context.as_ref().unwrap_or(context);
719 let (assistant, allowlist) = match stream_assistant_response(
720 attempt_context,
721 config,
722 signal,
723 iteration,
724 current_cap,
725 reasoning,
726 )
727 .await
728 {
729 Ok(pair) => pair,
730 Err(LoopError::Stream(StreamError::Empty))
731 if empty_stream_attempts + 1 < EMPTY_STREAM_MAX_ATTEMPTS =>
732 {
733 empty_stream_attempts = empty_stream_attempts.saturating_add(1);
734 let delay = EMPTY_STREAM_RETRY_INITIAL_DELAY * u32::from(empty_stream_attempts);
735 tokio::select! {
736 _ = signal.cancelled() => return Err(LoopError::Aborted),
737 _ = tokio::time::sleep(delay) => {}
738 }
739 continue;
740 }
741 Err(LoopError::Stream(StreamError::ZeroOutputTransport(_)))
742 if zero_output_transport_attempts + 1 < ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS =>
743 {
744 zero_output_transport_attempts = zero_output_transport_attempts.saturating_add(1);
745 zero_output_recovery_context =
746 Some(context_with_zero_output_transport_recovery(context));
747 reasoning = zero_output_transport_retry_reasoning(config.reasoning);
748 let delay = ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY
749 * u32::from(zero_output_transport_attempts);
750 tokio::select! {
751 _ = signal.cancelled() => return Err(LoopError::Aborted),
752 _ = tokio::time::sleep(delay) => {}
753 }
754 continue;
755 }
756 Err(err) => return Err(err),
757 };
758
759 let stop_reason = match &assistant {
760 AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
761 _ => StopReason::Other,
762 };
763 if stop_reason != StopReason::MaxTokens {
764 return Ok((assistant, allowlist));
765 }
766 let Some(recovery) = config.max_output_tokens_recovery.as_ref() else {
767 return Ok((assistant, allowlist));
768 };
769 if max_tokens_attempt >= recovery.max_attempts {
770 return Ok((assistant, allowlist));
771 }
772 let Some(prev_cap) = current_cap else {
777 return Ok((assistant, allowlist));
778 };
779 let Some(new_cap) = recovery.next_cap(prev_cap, max_tokens_attempt) else {
780 return Ok((assistant, allowlist));
781 };
782
783 max_tokens_attempt = max_tokens_attempt.saturating_add(1);
784 emit(
785 config,
786 AgentEvent::OutputTokensEscalation {
787 attempt: max_tokens_attempt,
788 prev_cap,
789 new_cap,
790 },
791 )
792 .await;
793 current_cap = Some(new_cap);
794 }
800}
801
802async fn stream_assistant_response(
803 context: &AgentContext,
804 config: &LoopConfig,
805 signal: &CancellationToken,
806 iteration: usize,
807 max_output_tokens: Option<u32>,
808 reasoning: ReasoningEffort,
809) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
810 let last_provider_usage = last_provider_usage(&context.messages);
816 let cx = TransformContext {
817 signal,
818 model_id: config.model_id.as_deref().unwrap_or(""),
819 iteration,
820 last_provider_usage: last_provider_usage.as_ref(),
821 estimator: &*config.token_estimator,
822 };
823 let mut messages = context.messages.clone();
824 for transform in &config.plugins.context_transform {
831 if !transform.should_run(&messages, &cx) {
837 continue;
838 }
839 let before = messages.clone();
840 messages = transform.transform(messages, &cx).await;
841 emit(
842 config,
843 AgentEvent::ContextTransformApplied {
844 iteration,
845 plugin: transform.name(),
846 before,
847 after: messages.clone(),
848 },
849 )
850 .await;
851 }
852
853 let allowlist = collect_tool_allowlist_with_events(config, iteration, &messages).await;
858
859 let tools = build_tool_schemas(config, allowlist.as_ref());
860 emit(
865 config,
866 AgentEvent::ProviderRequestPrepared {
867 iteration,
868 model_id: config.model_id.clone(),
869 system_prompt: context.system_prompt.clone(),
870 messages: messages.clone(),
871 tools: tools.clone(),
872 temperature: config.temperature,
873 max_output_tokens,
874 },
875 )
876 .await;
877 let request = StreamRequest {
878 system_prompt: context.system_prompt.clone(),
879 messages,
880 tools,
881 temperature: config.temperature,
882 max_output_tokens,
883 reasoning,
884 provider_extras: config
885 .provider_extras
886 .clone()
887 .unwrap_or(serde_json::Value::Null),
888 force_tool_call: true,
897 };
898
899 let mut stream = config.stream.stream(request, signal.clone()).await;
900
901 let mut last_partial: Option<AgentMessage> = None;
902
903 while let Some(event) = stream.next().await {
904 match event {
905 StreamEvent::Start { partial } => {
906 emit(
907 config,
908 AgentEvent::MessageStart {
909 message: partial.clone(),
910 },
911 )
912 .await;
913 last_partial = Some(partial);
914 }
915 StreamEvent::Chunk(chunk) => {
916 if let Some(ref partial) = last_partial {
917 emit(
918 config,
919 AgentEvent::MessageUpdate {
920 partial: partial.clone(),
921 chunk,
922 },
923 )
924 .await;
925 }
926 }
927 StreamEvent::Done { message } => {
928 emit(
929 config,
930 AgentEvent::MessageEnd {
931 message: message.clone(),
932 },
933 )
934 .await;
935 return Ok((message, allowlist));
936 }
937 StreamEvent::Error {
938 partial,
939 kind,
940 message,
941 } => {
942 let stop_reason = match kind {
943 StreamErrorKind::Aborted => StopReason::Aborted,
944 _ => StopReason::Error,
945 };
946 let error_message = AgentMessage::Assistant {
947 content: match &partial {
948 AgentMessage::Assistant { content, .. } => content.clone(),
949 _ => AssistantContent { blocks: Vec::new() },
950 },
951 stop_reason,
952 error_message: Some(message.clone()),
953 timestamp: Some(now_ms()),
954 usage: None,
955 };
956 emit(
957 config,
958 AgentEvent::MessageEnd {
959 message: error_message.clone(),
960 },
961 )
962 .await;
963 return Err(loop_error_from_stream_kind(kind, message));
964 }
965 }
966 }
967
968 let empty = AgentMessage::Assistant {
971 content: AssistantContent { blocks: Vec::new() },
972 stop_reason: StopReason::Error,
973 error_message: Some("stream ended without terminal event".into()),
974 timestamp: Some(now_ms()),
975 usage: None,
976 };
977 emit(
978 config,
979 AgentEvent::MessageEnd {
980 message: empty.clone(),
981 },
982 )
983 .await;
984 Err(LoopError::Stream(StreamError::Empty))
985}
986
987fn context_with_zero_output_transport_recovery(context: &AgentContext) -> AgentContext {
988 let mut recovered = context.clone();
989 recovered.messages.push(AgentMessage::System {
990 content: ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT.to_string(),
991 timestamp: Some(now_ms()),
992 });
993 recovered
994}
995
996fn zero_output_transport_retry_reasoning(reasoning: ReasoningEffort) -> ReasoningEffort {
997 match reasoning {
998 ReasoningEffort::Medium | ReasoningEffort::High | ReasoningEffort::XHigh => {
999 ReasoningEffort::Minimal
1000 }
1001 ReasoningEffort::None | ReasoningEffort::Minimal | ReasoningEffort::Low => reasoning,
1002 }
1003}
1004
1005fn loop_error_from_stream_kind(kind: StreamErrorKind, message: String) -> LoopError {
1006 match kind {
1011 StreamErrorKind::Transient => LoopError::Stream(StreamError::Transient(message)),
1012 StreamErrorKind::ProviderRateLimited => {
1013 LoopError::Stream(StreamError::ProviderRateLimited(message))
1014 }
1015 StreamErrorKind::ZeroOutputTransport => {
1016 LoopError::Stream(StreamError::ZeroOutputTransport(message))
1017 }
1018 StreamErrorKind::Fatal => LoopError::Stream(StreamError::Fatal(message)),
1019 StreamErrorKind::Empty => LoopError::Stream(StreamError::Empty),
1020 StreamErrorKind::Aborted => LoopError::Aborted,
1021 StreamErrorKind::ContextOverflow => {
1022 LoopError::Stream(StreamError::ContextOverflow(message))
1023 }
1024 }
1025}
1026
1027fn now_ms() -> u64 {
1028 SystemTime::now()
1029 .duration_since(UNIX_EPOCH)
1030 .map(|d| d.as_millis() as u64)
1031 .unwrap_or(0)
1032}
1033
1034fn last_provider_usage(messages: &[AgentMessage]) -> Option<Usage> {
1038 messages.iter().rev().find_map(|message| match message {
1039 AgentMessage::Assistant {
1040 usage: Some(usage), ..
1041 } => Some(usage.clone()),
1042 _ => None,
1043 })
1044}
1045
1046fn build_tool_schemas(
1047 config: &LoopConfig,
1048 allowlist: Option<&std::collections::HashSet<String>>,
1049) -> Vec<ToolSchema> {
1050 config
1051 .tools
1052 .iter()
1053 .filter(|tool| match allowlist {
1054 Some(set) => set.contains(tool.name()),
1055 None => true,
1056 })
1057 .map(|tool| ToolSchema {
1058 name: tool.name().to_string(),
1059 description: tool.description().to_string(),
1060 parameters: tool.parameters_schema(),
1061 })
1062 .collect()
1063}
1064
1065async fn collect_tool_allowlist_with_events(
1077 config: &LoopConfig,
1078 iteration: usize,
1079 messages: &[AgentMessage],
1080) -> Option<std::collections::HashSet<String>> {
1081 if config.plugins.tool_gate.is_empty() {
1082 return None;
1083 }
1084 let conversation_id = config.conversation_id.as_deref();
1085 let available_tool_names: Vec<&str> = config.tools.iter().map(|t| t.name()).collect();
1086 let mut decisions: Vec<GateAllowDecision> = Vec::new();
1087 for gate in &config.plugins.tool_gate {
1088 let ctx = crate::plugin::ToolGateContext {
1089 iteration,
1090 messages,
1091 conversation_id,
1092 available_tool_names: &available_tool_names,
1093 };
1094 let decision = gate.next_turn_tool_allowlist(ctx).await;
1095 emit(
1096 config,
1097 AgentEvent::ToolGateApplied {
1098 iteration,
1099 plugin: gate.name(),
1100 allow: decision.as_ref().map(|set| {
1101 let mut sorted: Vec<String> = set.iter().cloned().collect();
1102 sorted.sort();
1103 sorted
1104 }),
1105 },
1106 )
1107 .await;
1108 if let Some(set) = decision {
1109 let suppresses_advisory =
1110 gate.suppresses_advisory_gates(crate::plugin::ToolGateContext {
1111 iteration,
1112 messages,
1113 conversation_id,
1114 available_tool_names: &available_tool_names,
1115 });
1116 decisions.push(GateAllowDecision {
1117 plugin: gate.name(),
1118 priority: gate.conflict_priority(),
1119 class: gate.tool_gate_class(),
1120 suppresses_advisory,
1121 allow: set,
1122 });
1123 }
1124 }
1125 let suppression_priority = decisions
1126 .iter()
1127 .filter(|decision| decision.suppresses_advisory)
1128 .map(|decision| decision.priority)
1129 .max();
1130 let active_decisions = decisions
1131 .iter()
1132 .filter(|decision| {
1133 !matches!(
1134 suppression_priority,
1135 Some(priority)
1136 if decision.class == crate::plugin::ToolGateClass::Advisory
1137 && decision.priority < priority
1138 )
1139 })
1140 .collect::<Vec<_>>();
1141 let mut combined: Option<std::collections::HashSet<String>> = None;
1142 for decision in &active_decisions {
1143 combined = Some(match combined {
1144 Some(prev) => prev.intersection(&decision.allow).cloned().collect(),
1145 None => decision.allow.clone(),
1146 });
1147 }
1148 if combined.as_ref().is_some_and(|allow| allow.is_empty()) {
1149 let non_empty_decisions = active_decisions
1150 .iter()
1151 .filter(|decision| !decision.allow.is_empty())
1152 .map(|decision| (decision.plugin, decision.priority, decision.allow.clone()))
1153 .collect::<Vec<_>>();
1154 let resolved = resolve_empty_tool_gate_intersection(&non_empty_decisions);
1155 let (chosen_plugin, allow, reason) = match resolved {
1156 Some((plugin, allow, reason)) => (Some(plugin.to_string()), allow, reason),
1157 None => (
1158 None,
1159 std::collections::HashSet::new(),
1160 "all gating plugins returned empty allowlists".to_string(),
1161 ),
1162 };
1163 let sorted_allow = sorted_tool_names(&allow);
1164 emit(
1165 config,
1166 AgentEvent::ToolGateConflictResolved {
1167 iteration,
1168 plugins: active_decisions
1169 .iter()
1170 .map(|decision| decision.plugin.to_string())
1171 .collect(),
1172 chosen_plugin,
1173 allow: sorted_allow,
1174 reason,
1175 },
1176 )
1177 .await;
1178 return if allow.is_empty() { None } else { Some(allow) };
1179 }
1180 combined
1181}
1182
1183struct GateAllowDecision {
1184 plugin: &'static str,
1185 priority: i32,
1186 class: crate::plugin::ToolGateClass,
1187 suppresses_advisory: bool,
1188 allow: std::collections::HashSet<String>,
1189}
1190
1191fn resolve_empty_tool_gate_intersection(
1192 decisions: &[(&'static str, i32, std::collections::HashSet<String>)],
1193) -> Option<(&'static str, std::collections::HashSet<String>, String)> {
1194 decisions
1195 .iter()
1196 .max_by(|(left_plugin, left_priority, left), (right_plugin, right_priority, right)| {
1197 left_priority
1198 .cmp(right_priority)
1199 .then_with(|| right.len().cmp(&left.len()))
1200 .then_with(|| right_plugin.cmp(left_plugin))
1201 })
1202 .map(|(plugin, priority, allow)| {
1203 (
1204 *plugin,
1205 allow.clone(),
1206 format!(
1207 "empty intersection repaired by highest-priority owner `{plugin}` (priority {priority})"
1208 ),
1209 )
1210 })
1211}
1212
1213fn sorted_tool_names(set: &std::collections::HashSet<String>) -> Vec<String> {
1214 let mut sorted: Vec<String> = set.iter().cloned().collect();
1215 sorted.sort();
1216 sorted
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222 use crate::config::AgentBuilder;
1223 use crate::plugin::{
1224 FollowUpSource, Plugin, PluginCapabilities, ToolGate, ToolGateClass, ToolGateContext,
1225 };
1226 use crate::stream::{ReasoningEffort, StreamFn};
1227 use crate::types::{AssistantBlock, UserContent};
1228 use futures::stream::{self, BoxStream};
1229 use std::sync::{
1230 atomic::{AtomicUsize, Ordering},
1231 Arc, Mutex,
1232 };
1233
1234 fn empty_assistant_message() -> AgentMessage {
1235 AgentMessage::Assistant {
1236 content: AssistantContent { blocks: Vec::new() },
1237 stop_reason: StopReason::Other,
1238 error_message: None,
1239 timestamp: None,
1240 usage: None,
1241 }
1242 }
1243
1244 fn text_assistant_message(text: impl Into<String>) -> AgentMessage {
1245 AgentMessage::Assistant {
1246 content: AssistantContent::text(text),
1247 stop_reason: StopReason::EndTurn,
1248 error_message: None,
1249 timestamp: None,
1250 usage: None,
1251 }
1252 }
1253
1254 fn tool_call_assistant_message(name: impl Into<String>, id: impl Into<String>) -> AgentMessage {
1255 AgentMessage::Assistant {
1256 content: AssistantContent::with_tool_calls(
1257 None,
1258 vec![crate::tool::ToolCall {
1259 id: id.into(),
1260 name: name.into(),
1261 arguments: serde_json::json!({}),
1262 }],
1263 ),
1264 stop_reason: StopReason::ToolUse,
1265 error_message: None,
1266 timestamp: None,
1267 usage: None,
1268 }
1269 }
1270
1271 #[derive(Default)]
1272 struct EmptyThenTextStream {
1273 calls: AtomicUsize,
1274 }
1275
1276 #[derive(Default)]
1277 struct ZeroOutputThenTextStream {
1278 calls: AtomicUsize,
1279 requests: Mutex<Vec<StreamRequest>>,
1280 }
1281
1282 impl ZeroOutputThenTextStream {
1283 fn requests(&self) -> Vec<StreamRequest> {
1284 self.requests.lock().unwrap().clone()
1285 }
1286 }
1287
1288 #[derive(Default)]
1289 struct RepeatedTextStream {
1290 calls: AtomicUsize,
1291 }
1292
1293 #[derive(Default)]
1294 struct EmptyStopsAroundProgressStream {
1295 calls: AtomicUsize,
1296 }
1297
1298 struct CountingFollowUp {
1299 remaining: AtomicUsize,
1300 }
1301
1302 struct TerminalOnlyGate;
1303 struct TerminalWithStatusGate;
1304
1305 struct TestTerminalPolicy;
1311 impl crate::protocol::ProtocolPolicy for TestTerminalPolicy {
1312 fn terminal_tool_names(&self) -> std::collections::HashSet<String> {
1313 [
1314 "message_info",
1315 "message_ask",
1316 "message_result",
1317 "terminator",
1318 ]
1319 .iter()
1320 .map(|s| s.to_string())
1321 .collect()
1322 }
1323 }
1324 struct StaticAllowGate {
1325 name: &'static str,
1326 tools: &'static [&'static str],
1327 priority: i32,
1328 class: ToolGateClass,
1329 suppresses_advisory: bool,
1330 }
1331
1332 impl Plugin for TerminalOnlyGate {
1333 fn name(&self) -> &'static str {
1334 "terminal_only_gate"
1335 }
1336
1337 fn capabilities(&self) -> PluginCapabilities {
1338 PluginCapabilities::tool_gate()
1339 }
1340 }
1341
1342 #[async_trait::async_trait]
1343 impl ToolGate for TerminalOnlyGate {
1344 async fn next_turn_tool_allowlist(
1345 &self,
1346 _ctx: ToolGateContext<'_>,
1347 ) -> Option<std::collections::HashSet<String>> {
1348 Some(["message_result".to_string()].into_iter().collect())
1349 }
1350 }
1351
1352 impl Plugin for TerminalWithStatusGate {
1353 fn name(&self) -> &'static str {
1354 "terminal_with_status_gate"
1355 }
1356
1357 fn capabilities(&self) -> PluginCapabilities {
1358 PluginCapabilities::tool_gate()
1359 }
1360 }
1361
1362 #[async_trait::async_trait]
1363 impl ToolGate for TerminalWithStatusGate {
1364 async fn next_turn_tool_allowlist(
1365 &self,
1366 _ctx: ToolGateContext<'_>,
1367 ) -> Option<std::collections::HashSet<String>> {
1368 Some(
1369 ["message_info".to_string(), "message_result".to_string()]
1370 .into_iter()
1371 .collect(),
1372 )
1373 }
1374 }
1375
1376 impl Plugin for StaticAllowGate {
1377 fn name(&self) -> &'static str {
1378 self.name
1379 }
1380
1381 fn capabilities(&self) -> PluginCapabilities {
1382 PluginCapabilities::tool_gate()
1383 }
1384 }
1385
1386 #[async_trait::async_trait]
1387 impl ToolGate for StaticAllowGate {
1388 fn conflict_priority(&self) -> i32 {
1389 self.priority
1390 }
1391
1392 fn tool_gate_class(&self) -> ToolGateClass {
1393 self.class
1394 }
1395
1396 fn suppresses_advisory_gates(&self, _ctx: ToolGateContext<'_>) -> bool {
1397 self.suppresses_advisory
1398 }
1399
1400 async fn next_turn_tool_allowlist(
1401 &self,
1402 _ctx: ToolGateContext<'_>,
1403 ) -> Option<std::collections::HashSet<String>> {
1404 Some(self.tools.iter().map(|name| (*name).to_string()).collect())
1405 }
1406 }
1407
1408 impl CountingFollowUp {
1409 fn new(remaining: usize) -> Self {
1410 Self {
1411 remaining: AtomicUsize::new(remaining),
1412 }
1413 }
1414 }
1415
1416 impl Plugin for CountingFollowUp {
1417 fn name(&self) -> &'static str {
1418 "counting_follow_up"
1419 }
1420
1421 fn capabilities(&self) -> PluginCapabilities {
1422 PluginCapabilities::follow_up()
1423 }
1424 }
1425
1426 #[async_trait::async_trait]
1427 impl FollowUpSource for CountingFollowUp {
1428 async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
1429 let used = self
1430 .remaining
1431 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| {
1432 remaining.checked_sub(1)
1433 })
1434 .unwrap_or(0);
1435 if used == 0 {
1436 return Vec::new();
1437 }
1438 vec![AgentMessage::System {
1439 content: "retry after no-tool stop".into(),
1440 timestamp: None,
1441 }]
1442 }
1443 }
1444
1445 #[async_trait::async_trait]
1446 impl StreamFn for EmptyThenTextStream {
1447 async fn stream(
1448 &self,
1449 _request: StreamRequest,
1450 _signal: CancellationToken,
1451 ) -> BoxStream<'static, StreamEvent> {
1452 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1453 let partial = empty_assistant_message();
1454 if call == 0 {
1455 return Box::pin(stream::iter(vec![
1456 StreamEvent::Start {
1457 partial: partial.clone(),
1458 },
1459 StreamEvent::Error {
1460 partial,
1461 kind: StreamErrorKind::Empty,
1462 message: "empty provider response".to_string(),
1463 },
1464 ]));
1465 }
1466 Box::pin(stream::iter(vec![
1467 StreamEvent::Start { partial },
1468 StreamEvent::Done {
1469 message: text_assistant_message("recovered"),
1470 },
1471 ]))
1472 }
1473 }
1474
1475 #[async_trait::async_trait]
1476 impl StreamFn for RepeatedTextStream {
1477 async fn stream(
1478 &self,
1479 _request: StreamRequest,
1480 _signal: CancellationToken,
1481 ) -> BoxStream<'static, StreamEvent> {
1482 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1483 let partial = empty_assistant_message();
1484 Box::pin(stream::iter(vec![
1485 StreamEvent::Start { partial },
1486 StreamEvent::Done {
1487 message: text_assistant_message(format!("plain stop {call}")),
1488 },
1489 ]))
1490 }
1491 }
1492
1493 #[async_trait::async_trait]
1494 impl StreamFn for EmptyStopsAroundProgressStream {
1495 async fn stream(
1496 &self,
1497 _request: StreamRequest,
1498 _signal: CancellationToken,
1499 ) -> BoxStream<'static, StreamEvent> {
1500 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1501 let partial = empty_assistant_message();
1502 let message = match call {
1503 0 | 2 | 4 => text_assistant_message(format!("plain stop {call}")),
1504 1 | 3 => tool_call_assistant_message("progress", format!("tc-progress-{call}")),
1505 5 => tool_call_assistant_message("terminator", "tc-terminator"),
1506 other => panic!("unexpected stream call after terminal turn: {other}"),
1507 };
1508 Box::pin(stream::iter(vec![
1509 StreamEvent::Start { partial },
1510 StreamEvent::Done { message },
1511 ]))
1512 }
1513 }
1514
1515 #[async_trait::async_trait]
1516 impl StreamFn for ZeroOutputThenTextStream {
1517 async fn stream(
1518 &self,
1519 request: StreamRequest,
1520 _signal: CancellationToken,
1521 ) -> BoxStream<'static, StreamEvent> {
1522 self.requests.lock().unwrap().push(request);
1523 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1524 let partial = empty_assistant_message();
1525 if call == 0 {
1526 return Box::pin(stream::iter(vec![
1527 StreamEvent::Start {
1528 partial: partial.clone(),
1529 },
1530 StreamEvent::Error {
1531 partial,
1532 kind: StreamErrorKind::ZeroOutputTransport,
1533 message: "response body decode failed before output".to_string(),
1534 },
1535 ]));
1536 }
1537 Box::pin(stream::iter(vec![
1538 StreamEvent::Start { partial },
1539 StreamEvent::Done {
1540 message: text_assistant_message("recovered from transport"),
1541 },
1542 ]))
1543 }
1544 }
1545
1546 #[test]
1547 fn wrapped_up_is_complete() {
1548 assert!(LoopOutcome::Done.is_complete());
1549 assert!(LoopOutcome::WrappedUp.is_complete());
1550 assert!(!LoopOutcome::HitMaxIterations.is_complete());
1551 }
1552
1553 #[tokio::test]
1554 async fn empty_stream_response_is_retried_before_returning() {
1555 let stream = Arc::new(EmptyThenTextStream::default());
1556 let config = AgentBuilder::new()
1557 .stream(stream.clone())
1558 .model_id("test-model")
1559 .build()
1560 .expect("config builds");
1561 let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
1562 content: UserContent::Text("continue".to_string()),
1563 timestamp: None,
1564 }]);
1565
1566 let (assistant, _allowlist) =
1567 stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
1568 .await
1569 .expect("second stream attempt should recover");
1570
1571 let AgentMessage::Assistant { content, .. } = assistant else {
1572 panic!("expected assistant response");
1573 };
1574 assert_eq!(content.plain_text(), "recovered");
1575 assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1576 }
1577
1578 #[tokio::test]
1579 async fn zero_output_transport_error_is_retried_before_returning() {
1580 let stream = Arc::new(ZeroOutputThenTextStream::default());
1581 let config = AgentBuilder::new()
1582 .stream(stream.clone())
1583 .model_id("test-model")
1584 .reasoning(ReasoningEffort::High)
1585 .build()
1586 .expect("config builds");
1587 let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
1588 content: UserContent::Text("continue".to_string()),
1589 timestamp: None,
1590 }]);
1591
1592 let (assistant, _allowlist) =
1593 stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
1594 .await
1595 .expect("second zero-output transport attempt should recover");
1596
1597 let AgentMessage::Assistant { content, .. } = assistant else {
1598 panic!("expected assistant response");
1599 };
1600 assert_eq!(content.plain_text(), "recovered from transport");
1601 assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1602
1603 let requests = stream.requests();
1604 assert_eq!(requests.len(), 2);
1605 assert_eq!(requests[0].reasoning, ReasoningEffort::High);
1606 assert_eq!(
1607 requests[1].reasoning,
1608 ReasoningEffort::Minimal,
1609 "zero-output replay should lower high reasoning so reasoning-heavy private-only spins can produce a tool call"
1610 );
1611 assert!(
1612 requests[1].messages.iter().any(|message| matches!(
1613 message,
1614 AgentMessage::System { content, .. }
1615 if content.contains("transport recovery")
1616 && content.contains("no visible assistant text")
1617 && content.contains("no usable tool call")
1618 && content.contains("unusable burst of partial tool calls")
1619 && content.contains("exactly one next structured tool call")
1620 && content.contains("next structured tool call")
1621 )),
1622 "zero-output replay must carry explicit recovery context"
1623 );
1624 }
1625
1626 struct TerminatorOnlyStream {
1630 calls: AtomicUsize,
1631 }
1632
1633 impl Default for TerminatorOnlyStream {
1634 fn default() -> Self {
1635 Self {
1636 calls: AtomicUsize::new(0),
1637 }
1638 }
1639 }
1640
1641 #[async_trait::async_trait]
1642 impl StreamFn for TerminatorOnlyStream {
1643 async fn stream(
1644 &self,
1645 _request: StreamRequest,
1646 _signal: CancellationToken,
1647 ) -> BoxStream<'static, StreamEvent> {
1648 let call = self.calls.fetch_add(1, Ordering::SeqCst);
1649 assert_eq!(
1650 call, 0,
1651 "terminate-on-turn-1 test must NOT re-enter the LLM after a successful terminator"
1652 );
1653 let partial = empty_assistant_message();
1654 let assistant = AgentMessage::Assistant {
1655 content: AssistantContent {
1656 blocks: vec![AssistantBlock::ToolCall(crate::tool::ToolCall {
1657 id: "tc-terminator-1".into(),
1658 name: "terminator".into(),
1659 arguments: serde_json::json!({}),
1660 })],
1661 },
1662 stop_reason: StopReason::ToolUse,
1663 error_message: None,
1664 timestamp: None,
1665 usage: None,
1666 };
1667 Box::pin(stream::iter(vec![
1668 StreamEvent::Start { partial },
1669 StreamEvent::Done { message: assistant },
1670 ]))
1671 }
1672 }
1673
1674 struct TerminatorTool;
1677
1678 #[async_trait::async_trait]
1679 impl crate::tool::AgentTool for TerminatorTool {
1680 fn name(&self) -> &str {
1681 "terminator"
1682 }
1683
1684 fn description(&self) -> &str {
1685 "test terminator"
1686 }
1687
1688 fn parameters_schema(&self) -> serde_json::Value {
1689 serde_json::json!({"type": "object"})
1690 }
1691
1692 async fn execute(
1693 &self,
1694 _call_id: &str,
1695 _args: serde_json::Value,
1696 _signal: CancellationToken,
1697 _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
1698 ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
1699 Ok(crate::tool::ToolResult {
1700 content: vec![crate::types::ToolResultBlock::Text(
1701 crate::types::TextContent {
1702 text: "delivered".into(),
1703 },
1704 )],
1705 is_error: false,
1706 details: serde_json::Value::Null,
1707 terminate: true,
1708 narration: None,
1709 })
1710 }
1711 }
1712
1713 struct ProgressTool;
1714
1715 #[async_trait::async_trait]
1716 impl crate::tool::AgentTool for ProgressTool {
1717 fn name(&self) -> &str {
1718 "progress"
1719 }
1720
1721 fn description(&self) -> &str {
1722 "test progress tool"
1723 }
1724
1725 fn parameters_schema(&self) -> serde_json::Value {
1726 serde_json::json!({"type": "object"})
1727 }
1728
1729 async fn execute(
1730 &self,
1731 _call_id: &str,
1732 _args: serde_json::Value,
1733 _signal: CancellationToken,
1734 _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
1735 ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
1736 Ok(crate::tool::ToolResult::text("made progress"))
1737 }
1738 }
1739
1740 struct AlwaysSteer {
1745 polls: Arc<AtomicUsize>,
1746 }
1747
1748 impl Plugin for AlwaysSteer {
1749 fn name(&self) -> &'static str {
1750 "always_steer"
1751 }
1752
1753 fn capabilities(&self) -> PluginCapabilities {
1754 PluginCapabilities {
1755 steering: true,
1756 ..PluginCapabilities::default()
1757 }
1758 }
1759 }
1760
1761 #[async_trait::async_trait]
1762 impl crate::plugin::SteeringSource for AlwaysSteer {
1763 async fn next_steering_messages(&self) -> Vec<AgentMessage> {
1764 self.polls.fetch_add(1, Ordering::SeqCst);
1765 vec![AgentMessage::System {
1766 content: "wrap up now".into(),
1767 timestamp: None,
1768 }]
1769 }
1770 }
1771
1772 #[tokio::test]
1773 async fn terminator_vote_skips_post_batch_steering_collection() {
1774 let stream = Arc::new(TerminatorOnlyStream::default());
1784 let polls = Arc::new(AtomicUsize::new(0));
1785 let mut tool_registry = crate::tool::ToolRegistry::new();
1786 tool_registry = tool_registry.with(Arc::new(TerminatorTool));
1787 let config = AgentBuilder::new()
1788 .stream(stream.clone())
1789 .model_id("test-model")
1790 .tools(tool_registry)
1791 .steering(AlwaysSteer {
1792 polls: polls.clone(),
1793 })
1794 .build()
1795 .expect("config builds");
1796 let context = AgentContext::new("system");
1797 let prompts = vec![AgentMessage::User {
1798 content: UserContent::Text("deliver".to_string()),
1799 timestamp: None,
1800 }];
1801
1802 let result = run(prompts, context, &config, CancellationToken::new())
1803 .await
1804 .expect("run completes after one terminator turn");
1805
1806 assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
1808 assert_eq!(result.outcome, LoopOutcome::Done);
1811 assert_eq!(
1815 polls.load(Ordering::SeqCst),
1816 1,
1817 "steering source polled more than once — terminator vote did not gate post-batch re-entry"
1818 );
1819 }
1820
1821 struct AlwaysFollowUp {
1825 polls: Arc<AtomicUsize>,
1826 }
1827
1828 impl Plugin for AlwaysFollowUp {
1829 fn name(&self) -> &'static str {
1830 "always_follow_up"
1831 }
1832
1833 fn capabilities(&self) -> PluginCapabilities {
1834 PluginCapabilities::follow_up()
1835 }
1836 }
1837
1838 #[async_trait::async_trait]
1839 impl FollowUpSource for AlwaysFollowUp {
1840 async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
1841 self.polls.fetch_add(1, Ordering::SeqCst);
1842 vec![AgentMessage::System {
1843 content: "deliver something".into(),
1844 timestamp: None,
1845 }]
1846 }
1847 }
1848
1849 #[tokio::test]
1850 async fn terminator_vote_skips_post_batch_follow_up_collection() {
1851 let stream = Arc::new(TerminatorOnlyStream::default());
1857 let polls = Arc::new(AtomicUsize::new(0));
1858 let mut tool_registry = crate::tool::ToolRegistry::new();
1859 tool_registry = tool_registry.with(Arc::new(TerminatorTool));
1860 let config = AgentBuilder::new()
1861 .stream(stream.clone())
1862 .model_id("test-model")
1863 .tools(tool_registry)
1864 .follow_up(AlwaysFollowUp {
1865 polls: polls.clone(),
1866 })
1867 .build()
1868 .expect("config builds");
1869 let context = AgentContext::new("system");
1870 let prompts = vec![AgentMessage::User {
1871 content: UserContent::Text("deliver".to_string()),
1872 timestamp: None,
1873 }];
1874
1875 let result = run(prompts, context, &config, CancellationToken::new())
1876 .await
1877 .expect("run completes after one terminator turn");
1878
1879 assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
1880 assert_eq!(result.outcome, LoopOutcome::Done);
1881 assert_eq!(
1882 polls.load(Ordering::SeqCst),
1883 0,
1884 "follow-up source polled after a terminator vote — terminator did not gate post-batch re-entry"
1885 );
1886 }
1887
1888 #[tokio::test]
1889 async fn exhausted_empty_outcome_budget_returns_typed_loop_error() {
1890 let stream = Arc::new(RepeatedTextStream::default());
1891 let config = AgentBuilder::new()
1892 .stream(stream.clone())
1893 .model_id("test-model")
1894 .empty_outcome_retry_budget(1)
1895 .follow_up(CountingFollowUp::new(1))
1896 .build()
1897 .expect("config builds");
1898 let context = AgentContext::new("system");
1899 let prompts = vec![AgentMessage::User {
1900 content: UserContent::Text("continue".to_string()),
1901 timestamp: None,
1902 }];
1903
1904 let err = run(prompts, context, &config, CancellationToken::new())
1905 .await
1906 .expect_err("second no-tool stop should exhaust the budget");
1907
1908 assert!(
1909 matches!(
1910 err,
1911 LoopError::EmptyOutcomeBudgetExhausted {
1912 budget: 1,
1913 observed: 2,
1914 }
1915 ),
1916 "unexpected error: {err:?}"
1917 );
1918 assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1919 }
1920
1921 #[tokio::test]
1922 async fn empty_tool_gate_intersection_prefers_delivery_repair_owner() {
1923 let (sink, mut rx) = crate::event::ChannelSink::new();
1924 let config = AgentBuilder::new()
1925 .stream(Arc::new(RepeatedTextStream::default()))
1926 .event_sink(Arc::new(sink))
1927 .tool_gate_arc(Arc::new(StaticAllowGate {
1928 name: "delivery_repair_gate",
1929 tools: &["browser_interact"],
1930 priority: 100,
1931 class: ToolGateClass::Required,
1932 suppresses_advisory: false,
1933 }))
1934 .tool_gate_arc(Arc::new(StaticAllowGate {
1935 name: "terminal_message_guard",
1936 tools: &["message_result"],
1937 priority: 10,
1938 class: ToolGateClass::Required,
1939 suppresses_advisory: false,
1940 }))
1941 .build()
1942 .expect("config builds");
1943
1944 let allow = collect_tool_allowlist_with_events(&config, 3, &[])
1945 .await
1946 .expect("conflict repair should keep a non-empty allowlist");
1947
1948 assert_eq!(
1949 allow,
1950 ["browser_interact".to_string()].into_iter().collect()
1951 );
1952
1953 let mut saw_conflict = false;
1954 while let Ok(event) = rx.try_recv() {
1955 if let AgentEvent::ToolGateConflictResolved {
1956 chosen_plugin,
1957 allow,
1958 ..
1959 } = event
1960 {
1961 saw_conflict = true;
1962 assert_eq!(chosen_plugin.as_deref(), Some("delivery_repair_gate"));
1963 assert_eq!(allow, vec!["browser_interact".to_string()]);
1964 }
1965 }
1966 assert!(saw_conflict, "tool-gate deadlock should be diagnosable");
1967 }
1968
1969 #[tokio::test]
1970 async fn repair_owner_suppresses_advisory_gate_before_plan_only_intersection() {
1971 let config = AgentBuilder::new()
1972 .stream(Arc::new(RepeatedTextStream::default()))
1973 .tool_gate_arc(Arc::new(StaticAllowGate {
1974 name: "delivery_repair_gate",
1975 tools: &["plan", "file_write"],
1976 priority: 100,
1977 class: ToolGateClass::Required,
1978 suppresses_advisory: true,
1979 }))
1980 .tool_gate_arc(Arc::new(StaticAllowGate {
1981 name: "wrap_up_gate",
1982 tools: &["plan", "message_result", "message_ask"],
1983 priority: 0,
1984 class: ToolGateClass::Advisory,
1985 suppresses_advisory: false,
1986 }))
1987 .build()
1988 .expect("config builds");
1989
1990 let allow = collect_tool_allowlist_with_events(&config, 3, &[])
1991 .await
1992 .expect("repair owner should keep its own allowlist");
1993
1994 assert_eq!(
1995 allow,
1996 ["plan".to_string(), "file_write".to_string()]
1997 .into_iter()
1998 .collect()
1999 );
2000 }
2001
2002 #[tokio::test]
2003 async fn productive_tool_batch_resets_empty_outcome_budget() {
2004 let stream = Arc::new(EmptyStopsAroundProgressStream::default());
2005 let mut tool_registry = crate::tool::ToolRegistry::new();
2006 tool_registry = tool_registry
2007 .with(Arc::new(ProgressTool))
2008 .with(Arc::new(TerminatorTool));
2009 let config = AgentBuilder::new()
2010 .stream(stream.clone())
2011 .model_id("test-model")
2012 .tools(tool_registry)
2013 .empty_outcome_retry_budget(1)
2014 .follow_up(CountingFollowUp::new(3))
2015 .build()
2016 .expect("config builds");
2017 let context = AgentContext::new("system");
2018 let prompts = vec![AgentMessage::User {
2019 content: UserContent::Text("continue".to_string()),
2020 timestamp: None,
2021 }];
2022
2023 let result = run(prompts, context, &config, CancellationToken::new())
2024 .await
2025 .expect("productive tool batches should reset the empty-outcome budget");
2026
2027 assert_eq!(result.outcome, LoopOutcome::Done);
2028 assert_eq!(stream.calls.load(Ordering::SeqCst), 6);
2029 }
2030
2031 #[tokio::test]
2032 async fn terminal_only_plain_text_fallback_synthesizes_terminal_result() {
2033 let stream = Arc::new(RepeatedTextStream::default());
2034 let mut tool_registry = crate::tool::ToolRegistry::new();
2035 tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2036 let config = AgentBuilder::new()
2037 .stream(stream.clone())
2038 .model_id("auto-tool-provider")
2039 .tools(tool_registry)
2040 .tool_gate_arc(Arc::new(TerminalOnlyGate))
2041 .plain_text_terminal_fallback_tool("message_result")
2042 .empty_outcome_retry_budget(0)
2043 .build()
2044 .expect("config builds");
2045 let context = AgentContext::new("system");
2046 let prompts = vec![AgentMessage::User {
2047 content: UserContent::Text("answer directly".to_string()),
2048 timestamp: None,
2049 }];
2050
2051 let result = run(prompts, context, &config, CancellationToken::new())
2052 .await
2053 .expect("plain text should be converted on terminal-only turn");
2054
2055 assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2056 assert_eq!(result.outcome, LoopOutcome::Done);
2057 assert!(result.messages.iter().any(|message| matches!(
2058 message,
2059 AgentMessage::ToolResult {
2060 tool_name,
2061 content,
2062 is_error: false,
2063 ..
2064 } if tool_name == "message_result"
2065 && content.plain_text() == "plain stop 0"
2066 )));
2067 }
2068
2069 #[tokio::test]
2070 async fn eager_plain_text_fallback_fires_without_terminal_only_allowlist() {
2071 let stream = Arc::new(RepeatedTextStream::default());
2083 let mut tool_registry = crate::tool::ToolRegistry::new();
2084 tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2085 let config = AgentBuilder::new()
2086 .stream(stream.clone())
2087 .model_id("auto-tool-provider-eager")
2088 .tools(tool_registry)
2089 .plain_text_terminal_fallback_tool("message_result")
2090 .plain_text_terminal_fallback_eager(true)
2091 .empty_outcome_retry_budget(0)
2092 .build()
2093 .expect("config builds");
2094 let context = AgentContext::new("system");
2095 let prompts = vec![AgentMessage::User {
2096 content: UserContent::Text("answer directly".to_string()),
2097 timestamp: None,
2098 }];
2099
2100 let result = run(prompts, context, &config, CancellationToken::new())
2101 .await
2102 .expect("eager fallback should convert plain text on first stop");
2103
2104 assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2105 assert_eq!(result.outcome, LoopOutcome::Done);
2106 assert!(result.messages.iter().any(|message| matches!(
2107 message,
2108 AgentMessage::ToolResult {
2109 tool_name,
2110 content,
2111 is_error: false,
2112 ..
2113 } if tool_name == "message_result"
2114 && content.plain_text() == "plain stop 0"
2115 )));
2116 }
2117
2118 #[tokio::test]
2119 async fn eager_nudge_mode_injects_protocol_recovery_before_synthesizing() {
2120 let stream = Arc::new(RepeatedTextStream::default());
2131 let mut tool_registry = crate::tool::ToolRegistry::new();
2132 tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2133 let config = AgentBuilder::new()
2134 .stream(stream.clone())
2135 .model_id("auto-tool-provider-eager-nudge")
2136 .tools(tool_registry)
2137 .plain_text_terminal_fallback_tool("message_result")
2138 .plain_text_terminal_fallback_eager(true)
2139 .plain_text_terminal_fallback_eager_nudge(true)
2140 .build()
2141 .expect("config builds");
2142 let context = AgentContext::new("system");
2143 let prompts = vec![AgentMessage::User {
2144 content: UserContent::Text("answer directly".to_string()),
2145 timestamp: None,
2146 }];
2147
2148 let result = run(prompts, context, &config, CancellationToken::new())
2149 .await
2150 .expect("nudge mode should eventually synthesize after retries");
2151
2152 assert_eq!(stream.calls.load(Ordering::SeqCst), 3);
2155 assert_eq!(result.outcome, LoopOutcome::Done);
2156
2157 let nudge_count = result
2158 .messages
2159 .iter()
2160 .filter(|m| matches!(m, AgentMessage::System { content, .. } if content == crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT))
2161 .count();
2162 assert_eq!(
2163 nudge_count, 2,
2164 "expected two protocol-recovery system messages in the run output, got {nudge_count}",
2165 );
2166
2167 let synthesized_text = result
2168 .messages
2169 .iter()
2170 .find_map(|message| match message {
2171 AgentMessage::ToolResult {
2172 tool_name,
2173 content,
2174 is_error: false,
2175 ..
2176 } if tool_name == "message_result" => Some(content.plain_text()),
2177 _ => None,
2178 })
2179 .expect("a terminal tool result should be synthesized as last resort");
2180 assert_eq!(
2181 synthesized_text, "plain stop 0",
2182 "synthesizer should deliver the first preserved plain text, not later recovery drift",
2183 );
2184 }
2185
2186 #[test]
2187 fn plain_text_fallback_candidate_skips_obvious_clarifying_questions() {
2188 assert!(!should_preserve_plain_text_terminal_candidate(
2189 "Continue what, exactly? What's your next move?"
2190 ));
2191 assert!(!should_preserve_plain_text_terminal_candidate(
2192 "Would you like me to proceed?"
2193 ));
2194 assert!(should_preserve_plain_text_terminal_candidate(
2195 "# Machine Learning\n\nMachine learning is the branch of artificial intelligence that studies systems which improve from data."
2196 ));
2197 }
2198
2199 #[tokio::test]
2200 async fn non_eager_plain_text_fallback_still_requires_narrowed_allowlist() {
2201 let stream = Arc::new(RepeatedTextStream::default());
2206 let mut tool_registry = crate::tool::ToolRegistry::new();
2207 tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2208 let config = AgentBuilder::new()
2209 .stream(stream.clone())
2210 .model_id("non-eager-provider")
2211 .tools(tool_registry)
2212 .plain_text_terminal_fallback_tool("message_result")
2213 .empty_outcome_retry_budget(0)
2215 .build()
2216 .expect("config builds");
2217 let context = AgentContext::new("system");
2218 let prompts = vec![AgentMessage::User {
2219 content: UserContent::Text("answer directly".to_string()),
2220 timestamp: None,
2221 }];
2222
2223 let err = run(prompts, context, &config, CancellationToken::new())
2224 .await
2225 .expect_err("non-eager fallback must not convert without narrowed allowlist");
2226
2227 assert!(
2228 matches!(err, LoopError::EmptyOutcomeBudgetExhausted { .. }),
2229 "unexpected error: {err:?}"
2230 );
2231 }
2232
2233 #[tokio::test]
2234 async fn terminal_plain_text_fallback_allows_status_delivery_gate() {
2235 let stream = Arc::new(RepeatedTextStream::default());
2236 let mut tool_registry = crate::tool::ToolRegistry::new();
2237 tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2238 let config = AgentBuilder::new()
2239 .stream(stream.clone())
2240 .model_id("auto-tool-provider")
2241 .tools(tool_registry)
2242 .protocol_policy(Arc::new(TestTerminalPolicy))
2243 .tool_gate_arc(Arc::new(TerminalWithStatusGate))
2244 .plain_text_terminal_fallback_tool("message_result")
2245 .empty_outcome_retry_budget(0)
2246 .build()
2247 .expect("config builds");
2248 let context = AgentContext::new("system");
2249 let prompts = vec![AgentMessage::User {
2250 content: UserContent::Text("answer directly".to_string()),
2251 timestamp: None,
2252 }];
2253
2254 let result = run(prompts, context, &config, CancellationToken::new())
2255 .await
2256 .expect(
2257 "plain text should be converted when only status and terminal tools are allowed",
2258 );
2259
2260 assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2261 assert_eq!(result.outcome, LoopOutcome::Done);
2262 assert!(result.messages.iter().any(|message| matches!(
2263 message,
2264 AgentMessage::ToolResult {
2265 tool_name,
2266 content,
2267 is_error: false,
2268 ..
2269 } if tool_name == "message_result"
2270 && content.plain_text() == "plain stop 0"
2271 )));
2272 }
2273
2274 struct TerminalNamedTool(&'static str);
2275
2276 #[async_trait::async_trait]
2277 impl crate::tool::AgentTool for TerminalNamedTool {
2278 fn name(&self) -> &str {
2279 self.0
2280 }
2281
2282 fn description(&self) -> &str {
2283 "test terminal tool"
2284 }
2285
2286 fn parameters_schema(&self) -> serde_json::Value {
2287 serde_json::json!({"type": "object"})
2288 }
2289
2290 async fn execute(
2291 &self,
2292 _call_id: &str,
2293 _args: serde_json::Value,
2294 _signal: CancellationToken,
2295 _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
2296 ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
2297 Ok(crate::tool::ToolResult {
2298 content: vec![crate::types::ToolResultBlock::Text(
2299 crate::types::TextContent {
2300 text: "not used".into(),
2301 },
2302 )],
2303 is_error: false,
2304 details: serde_json::Value::Null,
2305 terminate: true,
2306 narration: None,
2307 })
2308 }
2309 }
2310}