1use super::*;
2
3impl EngineLoop {
4 pub async fn run_prompt_async_with_context(
5 &self,
6 session_id: String,
7 req: SendMessageRequest,
8 correlation_id: Option<String>,
9 ) -> anyhow::Result<()> {
10 let session_model = self
11 .storage
12 .get_session(&session_id)
13 .await
14 .and_then(|s| s.model);
15 let (provider_id, model_id_value) =
16 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17 anyhow::anyhow!(
18 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19 )
20 })?;
21 let correlation_ref = correlation_id.as_deref();
22 let model_id = Some(model_id_value.as_str());
23 let cancel = self.cancellations.create(&session_id).await;
24 emit_event(
25 Level::INFO,
26 ProcessKind::Engine,
27 ObservabilityEvent {
28 event: "provider.call.start",
29 component: "engine.loop",
30 correlation_id: correlation_ref,
31 session_id: Some(&session_id),
32 run_id: None,
33 message_id: None,
34 provider_id: Some(provider_id.as_str()),
35 model_id,
36 status: Some("start"),
37 error_code: None,
38 detail: Some("run_prompt_async dispatch"),
39 },
40 );
41 self.event_bus.publish(EngineEvent::new(
42 "session.status",
43 json!({"sessionID": session_id, "status":"running"}),
44 ));
45 let request_parts = req.parts.clone();
46 let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47 let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48 let requested_write_required = req.write_required.unwrap_or(false);
49 let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50 let request_tool_allowlist = req
51 .tool_allowlist
52 .clone()
53 .unwrap_or_default()
54 .into_iter()
55 .map(|tool| normalize_tool_name(&tool))
56 .filter(|tool| !tool.trim().is_empty())
57 .collect::<HashSet<_>>();
58 if !request_tool_allowlist.is_empty() {
61 self.set_session_allowed_tools(
62 &session_id,
63 request_tool_allowlist.iter().cloned().collect(),
64 )
65 .await;
66 }
67 let text = req
68 .parts
69 .iter()
70 .map(|p| match p {
71 MessagePartInput::Text { text } => text.clone(),
72 MessagePartInput::File {
73 mime,
74 filename,
75 url,
76 } => format!(
77 "[file mime={} name={} url={}]",
78 mime,
79 filename.clone().unwrap_or_else(|| "unknown".to_string()),
80 url
81 ),
82 })
83 .collect::<Vec<_>>()
84 .join("\n");
85 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
86 self.auto_rename_session_from_user_text(&session_id, &text)
87 .await;
88 let active_agent = self.agents.get(req.agent.as_deref()).await;
89 let mut user_message_id = self
90 .find_recent_matching_user_message_id(&session_id, &text)
91 .await;
92 if user_message_id.is_none() {
93 let user_message = Message::new(
94 MessageRole::User,
95 vec![MessagePart::Text { text: text.clone() }],
96 );
97 let created_message_id = user_message.id.clone();
98 self.storage
99 .append_message(&session_id, user_message)
100 .await?;
101
102 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
103 self.event_bus.publish(EngineEvent::new(
104 "message.part.updated",
105 json!({
106 "part": user_part,
107 "delta": text,
108 "agent": active_agent.name
109 }),
110 ));
111 user_message_id = Some(created_message_id);
112 }
113 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
114
115 if cancel.is_cancelled() {
116 self.event_bus.publish(EngineEvent::new(
117 "session.status",
118 json!({"sessionID": session_id, "status":"cancelled"}),
119 ));
120 self.cancellations.remove(&session_id).await;
121 return Ok(());
122 }
123
124 let mut question_tool_used = false;
125 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
126 if normalize_tool_name(&tool) == "question" {
127 question_tool_used = true;
128 }
129 if !agent_can_use_tool(&active_agent, &tool) {
130 format!(
131 "Tool `{tool}` is not enabled for agent `{}`.",
132 active_agent.name
133 )
134 } else {
135 self.execute_tool_with_permission(
136 &session_id,
137 &user_message_id,
138 tool.clone(),
139 args,
140 None,
141 active_agent.skills.as_deref(),
142 &text,
143 requested_write_required,
144 None,
145 cancel.clone(),
146 )
147 .await?
148 .unwrap_or_default()
149 }
150 } else {
151 let mut completion = String::new();
152 let mut max_iterations = max_tool_iterations();
153 let mut followup_context: Option<String> = None;
154 let mut last_tool_outputs: Vec<String> = Vec::new();
155 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
156 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
157 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
158 let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
159 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
160 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
161 let mut websearch_query_blocked = false;
162 let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
163 let mut pack_builder_executed = false;
164 let mut auto_workspace_probe_attempted = false;
165 let mut productive_tool_calls_total = 0usize;
166 let mut productive_write_tool_calls_total = 0usize;
167 let mut productive_workspace_inspection_total = 0usize;
168 let mut productive_web_research_total = 0usize;
169 let mut productive_concrete_read_total = 0usize;
170 let mut successful_web_research_total = 0usize;
171 let mut required_tool_retry_count = 0usize;
172 let mut required_write_retry_count = 0usize;
173 let mut unmet_prewrite_repair_retry_count = 0usize;
174 let mut empty_completion_retry_count = 0usize;
175 let mut prewrite_gate_waived = false;
176 let mut invalid_tool_args_retry_count = 0usize;
177 let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
178 let mut required_tool_unsatisfied_emitted = false;
179 let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
180 let email_delivery_requested = requires_email_delivery_prompt(&text);
181 let web_research_requested = requires_web_research_prompt(&text);
182 let code_workflow_requested = infer_code_workflow_from_text(&text);
183 let mut email_action_executed = false;
184 let mut latest_email_action_note: Option<String> = None;
185 let mut email_tools_ever_offered = false;
186 let intent = classify_intent(&text);
187 let router_enabled = tool_router_enabled();
188 let retrieval_enabled = semantic_tool_retrieval_enabled();
189 let retrieval_k = semantic_tool_retrieval_k();
190 let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
191 self.tools.mcp_server_names().await
192 } else {
193 Vec::new()
194 };
195 let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
196 let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
197 && runtime_attachments.is_empty()
198 && is_short_simple_prompt(&text)
199 && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
200
201 while max_iterations > 0 && !cancel.is_cancelled() {
202 let iteration = 26usize.saturating_sub(max_iterations);
203 max_iterations -= 1;
204 let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
205 ChatHistoryProfile::Full
206 } else if matches!(requested_context_mode, ContextMode::Compact)
207 || context_is_auto_compact
208 {
209 ChatHistoryProfile::Compact
210 } else {
211 ChatHistoryProfile::Standard
212 };
213 let mut messages =
214 load_chat_history(self.storage.clone(), &session_id, context_profile).await;
215 if iteration == 1 && !runtime_attachments.is_empty() {
216 attach_to_last_user_message(&mut messages, &runtime_attachments);
217 }
218 let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
219 self.event_bus.publish(EngineEvent::new(
220 "context.profile.selected",
221 json!({
222 "sessionID": session_id,
223 "messageID": user_message_id,
224 "iteration": iteration,
225 "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
226 "historyMessageCount": messages.len(),
227 "historyCharCount": history_char_count,
228 "memoryInjected": false
229 }),
230 ));
231 let mut system_parts = vec![tandem_runtime_system_prompt(
232 &self.host_runtime_context,
233 &mcp_server_names,
234 )];
235 if let Some(system) = active_agent.system_prompt.as_ref() {
236 system_parts.push(system.clone());
237 }
238 messages.insert(
239 0,
240 ChatMessage {
241 role: "system".to_string(),
242 content: system_parts.join("\n\n"),
243 attachments: Vec::new(),
244 },
245 );
246 if let Some(extra) = followup_context.take() {
247 messages.push(ChatMessage {
248 role: "user".to_string(),
249 content: extra,
250 attachments: Vec::new(),
251 });
252 }
253 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
254 let ctx = PromptContextHookContext {
255 session_id: session_id.clone(),
256 message_id: user_message_id.clone(),
257 provider_id: provider_id.clone(),
258 model_id: model_id_value.clone(),
259 iteration,
260 };
261 let hook_timeout =
262 Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
263 match tokio::time::timeout(
264 hook_timeout,
265 hook.augment_provider_messages(ctx, messages.clone()),
266 )
267 .await
268 {
269 Ok(Ok(augmented)) => {
270 messages = augmented;
271 }
272 Ok(Err(err)) => {
273 self.event_bus.publish(EngineEvent::new(
274 "memory.context.error",
275 json!({
276 "sessionID": session_id,
277 "messageID": user_message_id,
278 "iteration": iteration,
279 "error": truncate_text(&err.to_string(), 500),
280 }),
281 ));
282 }
283 Err(_) => {
284 self.event_bus.publish(EngineEvent::new(
285 "memory.context.error",
286 json!({
287 "sessionID": session_id,
288 "messageID": user_message_id,
289 "iteration": iteration,
290 "error": format!(
291 "prompt context hook timeout after {} ms",
292 hook_timeout.as_millis()
293 ),
294 }),
295 ));
296 }
297 }
298 }
299 let all_tools = self.tools.list().await;
300 let mut retrieval_fallback_reason: Option<&'static str> = None;
301 let mut candidate_tools = if retrieval_enabled {
302 self.tools.retrieve(&text, retrieval_k).await
303 } else {
304 all_tools.clone()
305 };
306 if retrieval_enabled {
307 if candidate_tools.is_empty() && !all_tools.is_empty() {
308 candidate_tools = all_tools.clone();
309 retrieval_fallback_reason = Some("retrieval_empty_result");
310 } else if web_research_requested
311 && has_web_research_tools(&all_tools)
312 && !has_web_research_tools(&candidate_tools)
313 && required_write_retry_count == 0
314 {
315 candidate_tools = all_tools.clone();
316 retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
317 } else if email_delivery_requested
318 && has_email_action_tools(&all_tools)
319 && !has_email_action_tools(&candidate_tools)
320 {
321 candidate_tools = all_tools.clone();
322 retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
323 }
324 }
325 let mut tool_schemas = if !router_enabled {
326 candidate_tools
327 } else {
328 match requested_tool_mode {
329 ToolMode::None => Vec::new(),
330 ToolMode::Required => select_tool_subset(
331 candidate_tools,
332 intent,
333 &request_tool_allowlist,
334 iteration > 1,
335 ),
336 ToolMode::Auto => {
337 if !auto_tools_escalated {
338 Vec::new()
339 } else {
340 select_tool_subset(
341 candidate_tools,
342 intent,
343 &request_tool_allowlist,
344 iteration > 1,
345 )
346 }
347 }
348 }
349 };
350 let mut policy_patterns =
351 request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
352 if let Some(agent_tools) = active_agent.tools.as_ref() {
353 policy_patterns
354 .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
355 }
356 let session_allowed_tools = self
357 .session_allowed_tools
358 .read()
359 .await
360 .get(&session_id)
361 .cloned()
362 .unwrap_or_default();
363 policy_patterns.extend(session_allowed_tools.iter().cloned());
364 if !policy_patterns.is_empty() {
365 let mut included = tool_schemas
366 .iter()
367 .map(|schema| normalize_tool_name(&schema.name))
368 .collect::<HashSet<_>>();
369 for schema in &all_tools {
370 let normalized = normalize_tool_name(&schema.name);
371 if policy_patterns
372 .iter()
373 .any(|pattern| tool_name_matches_policy(pattern, &normalized))
374 && included.insert(normalized)
375 {
376 tool_schemas.push(schema.clone());
377 }
378 }
379 }
380 if !request_tool_allowlist.is_empty() {
381 tool_schemas.retain(|schema| {
382 let tool = normalize_tool_name(&schema.name);
383 request_tool_allowlist
384 .iter()
385 .any(|pattern| tool_name_matches_policy(pattern, &tool))
386 });
387 }
388 let prewrite_gate = evaluate_prewrite_gate(
389 requested_write_required,
390 &requested_prewrite_requirements,
391 PrewriteProgress {
392 productive_write_tool_calls_total,
393 productive_workspace_inspection_total,
394 productive_concrete_read_total,
395 productive_web_research_total,
396 successful_web_research_total,
397 required_write_retry_count,
398 unmet_prewrite_repair_retry_count,
399 prewrite_gate_waived,
400 },
401 );
402 let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
403 let prewrite_gate_write = prewrite_gate.gate_write;
404 let force_write_only_retry = prewrite_gate.force_write_only_retry;
405 let allow_repair_tools = prewrite_gate.allow_repair_tools;
406 if prewrite_gate_write {
407 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
408 }
409 if requested_prewrite_requirements.repair_on_unmet_requirements
410 && productive_write_tool_calls_total >= 3
411 {
412 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
413 }
414 if allow_repair_tools {
415 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
416 let repair_tools = tool_schemas
417 .iter()
418 .filter(|schema| {
419 tool_matches_unmet_prewrite_repair_requirement(
420 &schema.name,
421 &unmet_prewrite_codes,
422 )
423 })
424 .cloned()
425 .collect::<Vec<_>>();
426 if !repair_tools.is_empty() {
427 tool_schemas = repair_tools;
428 }
429 }
430 if force_write_only_retry && !allow_repair_tools {
431 tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
432 }
433 if active_agent.tools.is_some() {
434 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
435 }
436 tool_schemas.retain(|schema| {
437 let normalized = normalize_tool_name(&schema.name);
438 if let Some(server) = mcp_server_from_tool_name(&normalized) {
439 !blocked_mcp_servers.contains(server)
440 } else {
441 true
442 }
443 });
444 if let Some(allowed_tools) = self
445 .session_allowed_tools
446 .read()
447 .await
448 .get(&session_id)
449 .cloned()
450 {
451 if !allowed_tools.is_empty() {
452 tool_schemas.retain(|schema| {
453 let normalized = normalize_tool_name(&schema.name);
454 any_policy_matches(&allowed_tools, &normalized)
455 });
456 }
457 }
458 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
459 let detail = validation_err.to_string();
460 emit_event(
461 Level::ERROR,
462 ProcessKind::Engine,
463 ObservabilityEvent {
464 event: "provider.call.error",
465 component: "engine.loop",
466 correlation_id: correlation_ref,
467 session_id: Some(&session_id),
468 run_id: None,
469 message_id: Some(&user_message_id),
470 provider_id: Some(provider_id.as_str()),
471 model_id,
472 status: Some("failed"),
473 error_code: Some("TOOL_SCHEMA_INVALID"),
474 detail: Some(&detail),
475 },
476 );
477 anyhow::bail!("{detail}");
478 }
479 let routing_decision = ToolRoutingDecision {
480 pass: if auto_tools_escalated { 2 } else { 1 },
481 mode: match requested_tool_mode {
482 ToolMode::Auto => default_mode_name(),
483 ToolMode::None => "none",
484 ToolMode::Required => "required",
485 },
486 intent,
487 selected_count: tool_schemas.len(),
488 total_available_count: all_tools.len(),
489 mcp_included: tool_schemas
490 .iter()
491 .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
492 };
493 self.event_bus.publish(EngineEvent::new(
494 "tool.routing.decision",
495 json!({
496 "sessionID": session_id,
497 "messageID": user_message_id,
498 "iteration": iteration,
499 "pass": routing_decision.pass,
500 "mode": routing_decision.mode,
501 "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
502 "selectedToolCount": routing_decision.selected_count,
503 "totalAvailableTools": routing_decision.total_available_count,
504 "mcpIncluded": routing_decision.mcp_included,
505 "retrievalEnabled": retrieval_enabled,
506 "retrievalK": retrieval_k,
507 "fallbackToFullTools": retrieval_fallback_reason.is_some(),
508 "fallbackReason": retrieval_fallback_reason
509 }),
510 ));
511 let allowed_tool_names = tool_schemas
512 .iter()
513 .map(|schema| normalize_tool_name(&schema.name))
514 .collect::<HashSet<_>>();
515 if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
516 email_tools_ever_offered = true;
517 }
518 let offered_tool_preview = tool_schemas
519 .iter()
520 .take(8)
521 .map(|schema| normalize_tool_name(&schema.name))
522 .collect::<Vec<_>>()
523 .join(", ");
524 self.event_bus.publish(EngineEvent::new(
525 "provider.call.iteration.start",
526 json!({
527 "sessionID": session_id,
528 "messageID": user_message_id,
529 "iteration": iteration,
530 "selectedToolCount": allowed_tool_names.len(),
531 }),
532 ));
533 let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
534 let provider_connect_timeout =
535 Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
536 let stream_result = tokio::time::timeout(
537 provider_connect_timeout,
538 self.providers.stream_for_provider(
539 Some(provider_id.as_str()),
540 Some(model_id_value.as_str()),
541 messages,
542 requested_tool_mode.clone(),
543 Some(tool_schemas),
544 cancel.clone(),
545 ),
546 )
547 .await
548 .map_err(|_| {
549 anyhow::anyhow!(
550 "provider stream connect timeout after {} ms",
551 provider_connect_timeout.as_millis()
552 )
553 })
554 .and_then(|result| result);
555 let stream = match stream_result {
556 Ok(stream) => stream,
557 Err(err) => {
558 let error_text = err.to_string();
559 let error_code = provider_error_code(&error_text);
560 let detail = truncate_text(&error_text, 500);
561 emit_event(
562 Level::ERROR,
563 ProcessKind::Engine,
564 ObservabilityEvent {
565 event: "provider.call.error",
566 component: "engine.loop",
567 correlation_id: correlation_ref,
568 session_id: Some(&session_id),
569 run_id: None,
570 message_id: Some(&user_message_id),
571 provider_id: Some(provider_id.as_str()),
572 model_id,
573 status: Some("failed"),
574 error_code: Some(error_code),
575 detail: Some(&detail),
576 },
577 );
578 self.event_bus.publish(EngineEvent::new(
579 "provider.call.iteration.error",
580 json!({
581 "sessionID": session_id,
582 "messageID": user_message_id,
583 "iteration": iteration,
584 "error": detail,
585 }),
586 ));
587 return Err(err);
588 }
589 };
590 tokio::pin!(stream);
591 completion.clear();
592 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
593 let mut provider_usage: Option<TokenUsage> = None;
594 let mut accepted_tool_calls_in_cycle = 0usize;
595 let provider_idle_timeout =
596 Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
597 loop {
598 let next_chunk_result =
599 tokio::time::timeout(provider_idle_timeout, stream.next())
600 .await
601 .map_err(|_| {
602 anyhow::anyhow!(
603 "provider stream idle timeout after {} ms",
604 provider_idle_timeout.as_millis()
605 )
606 });
607 let next_chunk = match next_chunk_result {
608 Ok(next_chunk) => next_chunk,
609 Err(err) => {
610 self.event_bus.publish(EngineEvent::new(
611 "provider.call.iteration.error",
612 json!({
613 "sessionID": session_id,
614 "messageID": user_message_id,
615 "iteration": iteration,
616 "error": truncate_text(&err.to_string(), 500),
617 }),
618 ));
619 return Err(err);
620 }
621 };
622 let Some(chunk) = next_chunk else {
623 break;
624 };
625 let chunk = match chunk {
626 Ok(chunk) => chunk,
627 Err(err) => {
628 let error_text = err.to_string();
629 let error_code = provider_error_code(&error_text);
630 let detail = truncate_text(&error_text, 500);
631 emit_event(
632 Level::ERROR,
633 ProcessKind::Engine,
634 ObservabilityEvent {
635 event: "provider.call.error",
636 component: "engine.loop",
637 correlation_id: correlation_ref,
638 session_id: Some(&session_id),
639 run_id: None,
640 message_id: Some(&user_message_id),
641 provider_id: Some(provider_id.as_str()),
642 model_id,
643 status: Some("failed"),
644 error_code: Some(error_code),
645 detail: Some(&detail),
646 },
647 );
648 self.event_bus.publish(EngineEvent::new(
649 "provider.call.iteration.error",
650 json!({
651 "sessionID": session_id,
652 "messageID": user_message_id,
653 "iteration": iteration,
654 "error": detail,
655 }),
656 ));
657 return Err(anyhow::anyhow!(
658 "provider stream chunk error: {error_text}"
659 ));
660 }
661 };
662 match chunk {
663 StreamChunk::TextDelta(delta) => {
664 let delta = strip_model_control_markers(&delta);
665 if delta.trim().is_empty() {
666 continue;
667 }
668 if completion.is_empty() {
669 emit_event(
670 Level::INFO,
671 ProcessKind::Engine,
672 ObservabilityEvent {
673 event: "provider.call.first_byte",
674 component: "engine.loop",
675 correlation_id: correlation_ref,
676 session_id: Some(&session_id),
677 run_id: None,
678 message_id: Some(&user_message_id),
679 provider_id: Some(provider_id.as_str()),
680 model_id,
681 status: Some("streaming"),
682 error_code: None,
683 detail: Some("first text delta"),
684 },
685 );
686 }
687 completion.push_str(&delta);
688 let delta = truncate_text(&delta, 4_000);
689 let delta_part =
690 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
691 self.event_bus.publish(EngineEvent::new(
692 "message.part.updated",
693 json!({"part": delta_part, "delta": delta}),
694 ));
695 }
696 StreamChunk::ReasoningDelta(_reasoning) => {}
697 StreamChunk::Done {
698 finish_reason: _,
699 usage,
700 } => {
701 if usage.is_some() {
702 provider_usage = usage;
703 }
704 break;
705 }
706 StreamChunk::ToolCallStart { id, name } => {
707 let entry = streamed_tool_calls.entry(id).or_default();
708 if entry.name.is_empty() {
709 entry.name = name;
710 }
711 }
712 StreamChunk::ToolCallDelta { id, args_delta } => {
713 let entry = streamed_tool_calls.entry(id.clone()).or_default();
714 entry.args.push_str(&args_delta);
715 let tool_name = if entry.name.trim().is_empty() {
716 "tool".to_string()
717 } else {
718 normalize_tool_name(&entry.name)
719 };
720 let parsed_preview = if entry.name.trim().is_empty() {
721 Value::String(truncate_text(&entry.args, 1_000))
722 } else {
723 parse_streamed_tool_args(&tool_name, &entry.args)
724 };
725 let mut tool_part = WireMessagePart::tool_invocation(
726 &session_id,
727 &user_message_id,
728 tool_name.clone(),
729 parsed_preview.clone(),
730 );
731 tool_part.id = Some(id.clone());
732 if tool_name == "write" {
733 tracing::info!(
734 session_id = %session_id,
735 message_id = %user_message_id,
736 tool_call_id = %id,
737 args_delta_len = args_delta.len(),
738 accumulated_args_len = entry.args.len(),
739 parsed_preview_empty = parsed_preview.is_null()
740 || parsed_preview.as_object().is_some_and(|value| value.is_empty())
741 || parsed_preview
742 .as_str()
743 .map(|value| value.trim().is_empty())
744 .unwrap_or(false),
745 "streamed write tool args delta received"
746 );
747 }
748 self.event_bus.publish(EngineEvent::new(
749 "message.part.updated",
750 json!({
751 "part": tool_part,
752 "toolCallDelta": {
753 "id": id,
754 "tool": tool_name,
755 "argsDelta": truncate_text(&args_delta, 1_000),
756 "rawArgsPreview": truncate_text(&entry.args, 2_000),
757 "parsedArgsPreview": parsed_preview
758 }
759 }),
760 ));
761 }
762 StreamChunk::ToolCallEnd { id: _ } => {}
763 }
764 if cancel.is_cancelled() {
765 break;
766 }
767 }
768
769 let streamed_tool_call_count = streamed_tool_calls.len();
770 let streamed_tool_call_parse_failed = streamed_tool_calls
771 .values()
772 .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
773 let mut tool_calls = streamed_tool_calls
774 .into_iter()
775 .filter_map(|(call_id, call)| {
776 if call.name.trim().is_empty() {
777 return None;
778 }
779 let tool_name = normalize_tool_name(&call.name);
780 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
781 Some(ParsedToolCall {
782 tool: tool_name,
783 args: parsed_args,
784 call_id: Some(call_id),
785 })
786 })
787 .collect::<Vec<_>>();
788 if tool_calls.is_empty() {
789 tool_calls = parse_tool_invocations_from_response(&completion)
790 .into_iter()
791 .map(|(tool, args)| ParsedToolCall {
792 tool,
793 args,
794 call_id: None,
795 })
796 .collect::<Vec<_>>();
797 }
798 let provider_tool_parse_failed = tool_calls.is_empty()
799 && (streamed_tool_call_parse_failed
800 || (streamed_tool_call_count > 0
801 && looks_like_unparsed_tool_payload(&completion))
802 || looks_like_unparsed_tool_payload(&completion));
803 if provider_tool_parse_failed {
804 latest_required_tool_failure_kind =
805 RequiredToolFailureKind::ToolCallParseFailed;
806 } else if tool_calls.is_empty() {
807 latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
808 }
809 if router_enabled
810 && matches!(requested_tool_mode, ToolMode::Auto)
811 && !auto_tools_escalated
812 && iteration == 1
813 && should_escalate_auto_tools(intent, &text, &completion)
814 {
815 auto_tools_escalated = true;
816 followup_context = Some(
817 "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
818 .to_string(),
819 );
820 self.event_bus.publish(EngineEvent::new(
821 "provider.call.iteration.finish",
822 json!({
823 "sessionID": session_id,
824 "messageID": user_message_id,
825 "iteration": iteration,
826 "finishReason": "auto_escalate",
827 "acceptedToolCalls": accepted_tool_calls_in_cycle,
828 "rejectedToolCalls": 0,
829 }),
830 ));
831 continue;
832 }
833 if tool_calls.is_empty()
834 && !auto_workspace_probe_attempted
835 && should_force_workspace_probe(&text, &completion)
836 && allowed_tool_names.contains("glob")
837 {
838 auto_workspace_probe_attempted = true;
839 tool_calls = vec![ParsedToolCall {
840 tool: "glob".to_string(),
841 args: json!({ "pattern": "*" }),
842 call_id: None,
843 }];
844 }
845 if !tool_calls.is_empty() {
846 let saw_tool_call_candidate = true;
847 let mut outputs = Vec::new();
848 let mut executed_productive_tool = false;
849 let mut write_tool_attempted_in_cycle = false;
850 let mut auth_required_hit_in_cycle = false;
851 let mut guard_budget_hit_in_cycle = false;
852 let mut duplicate_signature_hit_in_cycle = false;
853 let mut rejected_tool_call_in_cycle = false;
854 for ParsedToolCall {
855 tool,
856 args,
857 call_id,
858 } in tool_calls
859 {
860 if !agent_can_use_tool(&active_agent, &tool) {
861 rejected_tool_call_in_cycle = true;
862 continue;
863 }
864 let tool_key = normalize_tool_name(&tool);
865 if is_workspace_write_tool(&tool_key) {
866 write_tool_attempted_in_cycle = true;
867 }
868 if !allowed_tool_names.contains(&tool_key) {
869 rejected_tool_call_in_cycle = true;
870 let note = if offered_tool_preview.is_empty() {
871 format!(
872 "Tool `{}` call skipped: it is not available in this turn.",
873 tool_key
874 )
875 } else {
876 format!(
877 "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
878 tool_key, offered_tool_preview
879 )
880 };
881 self.event_bus.publish(EngineEvent::new(
882 "tool.call.rejected_unoffered",
883 json!({
884 "sessionID": session_id,
885 "messageID": user_message_id,
886 "iteration": iteration,
887 "tool": tool_key,
888 "offeredToolCount": allowed_tool_names.len()
889 }),
890 ));
891 if tool_name_looks_like_email_action(&tool_key) {
892 latest_email_action_note = Some(note.clone());
893 }
894 outputs.push(note);
895 continue;
896 }
897 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
898 if blocked_mcp_servers.contains(server) {
899 rejected_tool_call_in_cycle = true;
900 outputs.push(format!(
901 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
902 tool_key, server
903 ));
904 continue;
905 }
906 }
907 if tool_key == "question" {
908 question_tool_used = true;
909 }
910 if tool_key == "pack_builder" && pack_builder_executed {
911 rejected_tool_call_in_cycle = true;
912 outputs.push(
913 "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
914 .to_string(),
915 );
916 continue;
917 }
918 if websearch_query_blocked && tool_key == "websearch" {
919 rejected_tool_call_in_cycle = true;
920 outputs.push(
921 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
922 .to_string(),
923 );
924 continue;
925 }
926 let mut effective_args = args.clone();
927 if tool_key == "todo_write" {
928 effective_args = normalize_todo_write_args(effective_args, &completion);
929 if is_empty_todo_write_args(&effective_args) {
930 rejected_tool_call_in_cycle = true;
931 outputs.push(
932 "Tool `todo_write` call skipped: empty todo payload."
933 .to_string(),
934 );
935 continue;
936 }
937 }
938 let signature = if tool_key == "batch" {
939 batch_tool_signature(&args)
940 .unwrap_or_else(|| tool_signature(&tool_key, &args))
941 } else {
942 tool_signature(&tool_key, &args)
943 };
944 if is_shell_tool_name(&tool_key)
945 && shell_mismatch_signatures.contains(&signature)
946 {
947 rejected_tool_call_in_cycle = true;
948 outputs.push(
949 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
950 .to_string(),
951 );
952 continue;
953 }
954 let mut signature_count = 1usize;
955 if is_read_only_tool(&tool_key)
956 || (tool_key == "batch" && is_read_only_batch_call(&args))
957 {
958 let count = readonly_signature_counts
959 .entry(signature.clone())
960 .and_modify(|v| *v = v.saturating_add(1))
961 .or_insert(1);
962 signature_count = *count;
963 if tool_key == "websearch" {
964 if let Some(limit) = websearch_duplicate_signature_limit {
965 if *count > limit {
966 rejected_tool_call_in_cycle = true;
967 self.event_bus.publish(EngineEvent::new(
968 "tool.loop_guard.triggered",
969 json!({
970 "sessionID": session_id,
971 "messageID": user_message_id,
972 "tool": tool_key,
973 "reason": "duplicate_signature_retry_exhausted",
974 "duplicateLimit": limit,
975 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
976 "loop_guard_triggered": true
977 }),
978 ));
979 outputs.push(
980 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
981 .to_string(),
982 );
983 continue;
984 }
985 }
986 }
987 if tool_key != "websearch" && *count > 1 {
988 rejected_tool_call_in_cycle = true;
989 if let Some(cached) = readonly_tool_cache.get(&signature) {
990 outputs.push(cached.clone());
991 } else {
992 outputs.push(format!(
993 "Tool `{}` call skipped: duplicate call signature detected.",
994 tool_key
995 ));
996 }
997 continue;
998 }
999 }
1000 let is_read_only_signature = is_read_only_tool(&tool_key)
1001 || (tool_key == "batch" && is_read_only_batch_call(&args));
1002 if !is_read_only_signature {
1003 let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1004 let seen = mutable_signature_counts
1005 .entry(signature.clone())
1006 .and_modify(|v| *v = v.saturating_add(1))
1007 .or_insert(1);
1008 if *seen > duplicate_limit {
1009 rejected_tool_call_in_cycle = true;
1010 self.event_bus.publish(EngineEvent::new(
1011 "tool.loop_guard.triggered",
1012 json!({
1013 "sessionID": session_id,
1014 "messageID": user_message_id,
1015 "tool": tool_key,
1016 "reason": "duplicate_signature_retry_exhausted",
1017 "signatureHash": stable_hash(&signature),
1018 "duplicateLimit": duplicate_limit,
1019 "loop_guard_triggered": true
1020 }),
1021 ));
1022 outputs.push(format!(
1023 "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1024 tool_key, duplicate_limit
1025 ));
1026 duplicate_signature_hit_in_cycle = true;
1027 continue;
1028 }
1029 }
1030 let budget = tool_budget_for(&tool_key);
1031 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1032 if *entry >= budget {
1033 rejected_tool_call_in_cycle = true;
1034 outputs.push(format!(
1035 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1036 tool_key, budget
1037 ));
1038 guard_budget_hit_in_cycle = true;
1039 continue;
1040 }
1041 let mut finalized_part = WireMessagePart::tool_invocation(
1042 &session_id,
1043 &user_message_id,
1044 tool.clone(),
1045 effective_args.clone(),
1046 );
1047 if let Some(call_id) = call_id.clone() {
1048 finalized_part.id = Some(call_id);
1049 }
1050 finalized_part.state = Some("pending".to_string());
1051 self.event_bus.publish(EngineEvent::new(
1052 "message.part.updated",
1053 json!({"part": finalized_part}),
1054 ));
1055 *entry += 1;
1056 accepted_tool_calls_in_cycle =
1057 accepted_tool_calls_in_cycle.saturating_add(1);
1058 if let Some(output) = self
1059 .execute_tool_with_permission(
1060 &session_id,
1061 &user_message_id,
1062 tool,
1063 effective_args,
1064 call_id,
1065 active_agent.skills.as_deref(),
1066 &text,
1067 requested_write_required,
1068 Some(&completion),
1069 cancel.clone(),
1070 )
1071 .await?
1072 {
1073 let productive = is_productive_tool_output(&tool_key, &output);
1074 if output.contains("WEBSEARCH_QUERY_MISSING") {
1075 websearch_query_blocked = true;
1076 }
1077 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1078 {
1079 shell_mismatch_signatures.insert(signature.clone());
1080 }
1081 if is_read_only_tool(&tool_key)
1082 && tool_key != "websearch"
1083 && signature_count == 1
1084 {
1085 readonly_tool_cache.insert(signature, output.clone());
1086 }
1087 if productive {
1088 productive_tool_calls_total =
1089 productive_tool_calls_total.saturating_add(1);
1090 if is_workspace_write_tool(&tool_key) {
1091 productive_write_tool_calls_total =
1092 productive_write_tool_calls_total.saturating_add(1);
1093 }
1094 if is_workspace_inspection_tool(&tool_key) {
1095 productive_workspace_inspection_total =
1096 productive_workspace_inspection_total.saturating_add(1);
1097 }
1098 if tool_key == "read" {
1099 productive_concrete_read_total =
1100 productive_concrete_read_total.saturating_add(1);
1101 }
1102 if is_web_research_tool(&tool_key) {
1103 productive_web_research_total =
1104 productive_web_research_total.saturating_add(1);
1105 if is_successful_web_research_output(&tool_key, &output) {
1106 successful_web_research_total =
1107 successful_web_research_total.saturating_add(1);
1108 }
1109 }
1110 executed_productive_tool = true;
1111 if tool_key == "pack_builder" {
1112 pack_builder_executed = true;
1113 }
1114 }
1115 if tool_name_looks_like_email_action(&tool_key) {
1116 if productive {
1117 email_action_executed = true;
1118 } else {
1119 latest_email_action_note =
1120 Some(truncate_text(&output, 280).replace('\n', " "));
1121 }
1122 }
1123 if is_auth_required_tool_output(&output) {
1124 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1125 blocked_mcp_servers.insert(server.to_string());
1126 }
1127 auth_required_hit_in_cycle = true;
1128 }
1129 outputs.push(output);
1130 if auth_required_hit_in_cycle {
1131 break;
1132 }
1133 if guard_budget_hit_in_cycle {
1134 break;
1135 }
1136 }
1137 }
1138 if !outputs.is_empty() {
1139 last_tool_outputs = outputs.clone();
1140 if matches!(requested_tool_mode, ToolMode::Required)
1141 && productive_tool_calls_total == 0
1142 {
1143 latest_required_tool_failure_kind = classify_required_tool_failure(
1144 &outputs,
1145 saw_tool_call_candidate,
1146 accepted_tool_calls_in_cycle,
1147 provider_tool_parse_failed,
1148 rejected_tool_call_in_cycle,
1149 );
1150 if requested_write_required
1151 && write_tool_attempted_in_cycle
1152 && productive_write_tool_calls_total == 0
1153 && is_write_invalid_args_failure_kind(
1154 latest_required_tool_failure_kind,
1155 )
1156 {
1157 if required_write_retry_count + 1 < strict_write_retry_max_attempts
1158 {
1159 required_write_retry_count += 1;
1160 required_tool_retry_count += 1;
1161 followup_context = Some(build_write_required_retry_context(
1162 &offered_tool_preview,
1163 latest_required_tool_failure_kind,
1164 &text,
1165 &requested_prewrite_requirements,
1166 productive_workspace_inspection_total > 0,
1167 productive_concrete_read_total > 0,
1168 productive_web_research_total > 0,
1169 successful_web_research_total > 0,
1170 ));
1171 self.event_bus.publish(EngineEvent::new(
1172 "provider.call.iteration.finish",
1173 json!({
1174 "sessionID": session_id,
1175 "messageID": user_message_id,
1176 "iteration": iteration,
1177 "finishReason": "required_write_invalid_retry",
1178 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1179 "rejectedToolCalls": 0,
1180 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1181 }),
1182 ));
1183 continue;
1184 }
1185 }
1186 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1187 || productive_concrete_read_total > 0
1188 || productive_web_research_total > 0
1189 || successful_web_research_total > 0;
1190 if should_retry_nonproductive_required_tool_cycle(
1191 requested_write_required,
1192 write_tool_attempted_in_cycle,
1193 progress_made_in_cycle,
1194 required_tool_retry_count,
1195 ) {
1196 required_tool_retry_count += 1;
1197 followup_context =
1198 Some(build_required_tool_retry_context_for_task(
1199 &offered_tool_preview,
1200 latest_required_tool_failure_kind,
1201 &text,
1202 ));
1203 self.event_bus.publish(EngineEvent::new(
1204 "provider.call.iteration.finish",
1205 json!({
1206 "sessionID": session_id,
1207 "messageID": user_message_id,
1208 "iteration": iteration,
1209 "finishReason": "required_tool_retry",
1210 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1211 "rejectedToolCalls": 0,
1212 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1213 }),
1214 ));
1215 continue;
1216 }
1217 completion = required_tool_mode_unsatisfied_completion(
1218 latest_required_tool_failure_kind,
1219 );
1220 if !required_tool_unsatisfied_emitted {
1221 required_tool_unsatisfied_emitted = true;
1222 self.event_bus.publish(EngineEvent::new(
1223 "tool.mode.required.unsatisfied",
1224 json!({
1225 "sessionID": session_id,
1226 "messageID": user_message_id,
1227 "iteration": iteration,
1228 "selectedToolCount": allowed_tool_names.len(),
1229 "offeredToolsPreview": offered_tool_preview,
1230 "reason": latest_required_tool_failure_kind.code(),
1231 }),
1232 ));
1233 }
1234 self.event_bus.publish(EngineEvent::new(
1235 "provider.call.iteration.finish",
1236 json!({
1237 "sessionID": session_id,
1238 "messageID": user_message_id,
1239 "iteration": iteration,
1240 "finishReason": "required_tool_unsatisfied",
1241 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1242 "rejectedToolCalls": 0,
1243 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1244 }),
1245 ));
1246 break;
1247 }
1248 let prewrite_gate = evaluate_prewrite_gate(
1249 requested_write_required,
1250 &requested_prewrite_requirements,
1251 PrewriteProgress {
1252 productive_write_tool_calls_total,
1253 productive_workspace_inspection_total,
1254 productive_concrete_read_total,
1255 productive_web_research_total,
1256 successful_web_research_total,
1257 required_write_retry_count,
1258 unmet_prewrite_repair_retry_count,
1259 prewrite_gate_waived,
1260 },
1261 );
1262 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1263 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1264 if requested_write_required
1265 && productive_tool_calls_total > 0
1266 && productive_write_tool_calls_total == 0
1267 {
1268 if should_start_prewrite_repair_before_first_write(
1269 requested_prewrite_requirements.repair_on_unmet_requirements,
1270 productive_write_tool_calls_total,
1271 prewrite_satisfied,
1272 code_workflow_requested,
1273 ) {
1274 if unmet_prewrite_repair_retry_count
1275 < prewrite_repair_retry_max_attempts()
1276 {
1277 unmet_prewrite_repair_retry_count += 1;
1278 let repair_attempt = unmet_prewrite_repair_retry_count;
1279 let repair_attempts_remaining =
1280 prewrite_repair_retry_max_attempts()
1281 .saturating_sub(repair_attempt);
1282 followup_context = Some(build_prewrite_repair_retry_context(
1283 &offered_tool_preview,
1284 latest_required_tool_failure_kind,
1285 &text,
1286 &requested_prewrite_requirements,
1287 productive_workspace_inspection_total > 0,
1288 productive_concrete_read_total > 0,
1289 productive_web_research_total > 0,
1290 successful_web_research_total > 0,
1291 ));
1292 self.event_bus.publish(EngineEvent::new(
1293 "provider.call.iteration.finish",
1294 json!({
1295 "sessionID": session_id,
1296 "messageID": user_message_id,
1297 "iteration": iteration,
1298 "finishReason": "prewrite_repair_retry",
1299 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300 "rejectedToolCalls": 0,
1301 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302 "repair": prewrite_repair_event_payload(
1303 repair_attempt,
1304 repair_attempts_remaining,
1305 &unmet_prewrite_codes,
1306 false,
1307 ),
1308 }),
1309 ));
1310 continue;
1311 }
1312 if !prewrite_gate_waived {
1313 if prewrite_gate_strict_mode() {
1314 self.event_bus.publish(EngineEvent::new(
1317 "prewrite.gate.strict_mode.blocked",
1318 json!({
1319 "sessionID": session_id,
1320 "messageID": user_message_id,
1321 "iteration": iteration,
1322 "unmetCodes": unmet_prewrite_codes,
1323 }),
1324 ));
1325 continue;
1326 }
1327 prewrite_gate_waived = true;
1328 let repair_attempt = unmet_prewrite_repair_retry_count;
1329 let repair_attempts_remaining =
1330 prewrite_repair_retry_max_attempts()
1331 .saturating_sub(repair_attempt);
1332 followup_context = Some(build_prewrite_waived_write_context(
1333 &text,
1334 &unmet_prewrite_codes,
1335 ));
1336 self.event_bus.publish(EngineEvent::new(
1337 "prewrite.gate.waived.write_executed",
1338 json!({
1339 "sessionID": session_id,
1340 "messageID": user_message_id,
1341 "unmetCodes": unmet_prewrite_codes,
1342 }),
1343 ));
1344 self.event_bus.publish(EngineEvent::new(
1345 "provider.call.iteration.finish",
1346 json!({
1347 "sessionID": session_id,
1348 "messageID": user_message_id,
1349 "iteration": iteration,
1350 "finishReason": "prewrite_gate_waived",
1351 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1352 "rejectedToolCalls": 0,
1353 "prewriteGateWaived": true,
1354 "repair": prewrite_repair_event_payload(
1355 repair_attempt,
1356 repair_attempts_remaining,
1357 &unmet_prewrite_codes,
1358 true,
1359 ),
1360 }),
1361 ));
1362 continue;
1363 }
1364 }
1365 latest_required_tool_failure_kind =
1366 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1367 if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1368 required_write_retry_count += 1;
1369 followup_context = Some(build_write_required_retry_context(
1370 &offered_tool_preview,
1371 latest_required_tool_failure_kind,
1372 &text,
1373 &requested_prewrite_requirements,
1374 productive_workspace_inspection_total > 0,
1375 productive_concrete_read_total > 0,
1376 productive_web_research_total > 0,
1377 successful_web_research_total > 0,
1378 ));
1379 self.event_bus.publish(EngineEvent::new(
1380 "provider.call.iteration.finish",
1381 json!({
1382 "sessionID": session_id,
1383 "messageID": user_message_id,
1384 "iteration": iteration,
1385 "finishReason": "required_write_retry",
1386 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1387 "rejectedToolCalls": 0,
1388 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1389 }),
1390 ));
1391 continue;
1392 }
1393 completion = required_tool_mode_unsatisfied_completion(
1394 latest_required_tool_failure_kind,
1395 );
1396 if !required_tool_unsatisfied_emitted {
1397 required_tool_unsatisfied_emitted = true;
1398 self.event_bus.publish(EngineEvent::new(
1399 "tool.mode.required.unsatisfied",
1400 json!({
1401 "sessionID": session_id,
1402 "messageID": user_message_id,
1403 "iteration": iteration,
1404 "selectedToolCount": allowed_tool_names.len(),
1405 "offeredToolsPreview": offered_tool_preview,
1406 "reason": latest_required_tool_failure_kind.code(),
1407 }),
1408 ));
1409 }
1410 self.event_bus.publish(EngineEvent::new(
1411 "provider.call.iteration.finish",
1412 json!({
1413 "sessionID": session_id,
1414 "messageID": user_message_id,
1415 "iteration": iteration,
1416 "finishReason": "required_write_unsatisfied",
1417 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1418 "rejectedToolCalls": 0,
1419 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1420 }),
1421 ));
1422 break;
1423 }
1424 if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1425 if let Some(retry_context) =
1426 build_invalid_tool_args_retry_context_from_outputs(
1427 &outputs,
1428 invalid_tool_args_retry_count,
1429 )
1430 {
1431 invalid_tool_args_retry_count += 1;
1432 followup_context = Some(format!(
1433 "Previous tool call arguments were invalid. {}",
1434 retry_context
1435 ));
1436 self.event_bus.publish(EngineEvent::new(
1437 "provider.call.iteration.finish",
1438 json!({
1439 "sessionID": session_id,
1440 "messageID": user_message_id,
1441 "iteration": iteration,
1442 "finishReason": "invalid_tool_args_retry",
1443 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1444 "rejectedToolCalls": 0,
1445 }),
1446 ));
1447 continue;
1448 }
1449 }
1450 let guard_budget_hit =
1451 outputs.iter().any(|o| is_guard_budget_tool_output(o));
1452 if executed_productive_tool {
1453 let prewrite_gate = evaluate_prewrite_gate(
1454 requested_write_required,
1455 &requested_prewrite_requirements,
1456 PrewriteProgress {
1457 productive_write_tool_calls_total,
1458 productive_workspace_inspection_total,
1459 productive_concrete_read_total,
1460 productive_web_research_total,
1461 successful_web_research_total,
1462 required_write_retry_count,
1463 unmet_prewrite_repair_retry_count,
1464 prewrite_gate_waived,
1465 },
1466 );
1467 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1468 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1469 if requested_write_required
1470 && productive_write_tool_calls_total > 0
1471 && requested_prewrite_requirements.repair_on_unmet_requirements
1472 && unmet_prewrite_repair_retry_count
1473 < prewrite_repair_retry_max_attempts()
1474 && !prewrite_satisfied
1475 {
1476 unmet_prewrite_repair_retry_count += 1;
1477 let repair_attempt = unmet_prewrite_repair_retry_count;
1478 let repair_attempts_remaining =
1479 prewrite_repair_retry_max_attempts()
1480 .saturating_sub(repair_attempt);
1481 followup_context = Some(build_prewrite_repair_retry_context(
1482 &offered_tool_preview,
1483 latest_required_tool_failure_kind,
1484 &text,
1485 &requested_prewrite_requirements,
1486 productive_workspace_inspection_total > 0,
1487 productive_concrete_read_total > 0,
1488 productive_web_research_total > 0,
1489 successful_web_research_total > 0,
1490 ));
1491 self.event_bus.publish(EngineEvent::new(
1492 "provider.call.iteration.finish",
1493 json!({
1494 "sessionID": session_id,
1495 "messageID": user_message_id,
1496 "iteration": iteration,
1497 "finishReason": "prewrite_repair_retry",
1498 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1499 "rejectedToolCalls": 0,
1500 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1501 "repair": prewrite_repair_event_payload(
1502 repair_attempt,
1503 repair_attempts_remaining,
1504 &unmet_prewrite_codes,
1505 false,
1506 ),
1507 }),
1508 ));
1509 continue;
1510 }
1511 followup_context = Some(format!(
1512 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1513 summarize_tool_outputs(&outputs)
1514 ));
1515 self.event_bus.publish(EngineEvent::new(
1516 "provider.call.iteration.finish",
1517 json!({
1518 "sessionID": session_id,
1519 "messageID": user_message_id,
1520 "iteration": iteration,
1521 "finishReason": "tool_followup",
1522 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1523 "rejectedToolCalls": 0,
1524 }),
1525 ));
1526 continue;
1527 }
1528 if guard_budget_hit {
1529 completion = summarize_guard_budget_outputs(&outputs)
1530 .unwrap_or_else(|| {
1531 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1532 });
1533 } else if duplicate_signature_hit_in_cycle {
1534 completion = summarize_duplicate_signature_outputs(&outputs)
1535 .unwrap_or_else(|| {
1536 "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1537 });
1538 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1539 completion = summary;
1540 } else {
1541 completion.clear();
1542 }
1543 self.event_bus.publish(EngineEvent::new(
1544 "provider.call.iteration.finish",
1545 json!({
1546 "sessionID": session_id,
1547 "messageID": user_message_id,
1548 "iteration": iteration,
1549 "finishReason": "tool_summary",
1550 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1551 "rejectedToolCalls": 0,
1552 }),
1553 ));
1554 break;
1555 } else if matches!(requested_tool_mode, ToolMode::Required) {
1556 latest_required_tool_failure_kind = classify_required_tool_failure(
1557 &outputs,
1558 saw_tool_call_candidate,
1559 accepted_tool_calls_in_cycle,
1560 provider_tool_parse_failed,
1561 rejected_tool_call_in_cycle,
1562 );
1563 }
1564 }
1565
1566 {
1567 let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1568 if let Some(usage) = provider_usage {
1569 (
1570 usage.prompt_tokens,
1571 usage.completion_tokens,
1572 usage.total_tokens,
1573 "provider",
1574 )
1575 } else {
1576 let est_prompt = (estimated_prompt_chars / 4) as u64;
1580 let est_completion = (completion.len() / 4) as u64;
1581 tracing::debug!(
1582 session_id = %session_id,
1583 provider_id = %provider_id,
1584 "provider.usage missing from stream; using char-count estimate \
1585 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1586 completion.len()
1587 );
1588 (
1589 est_prompt,
1590 est_completion,
1591 est_prompt.saturating_add(est_completion),
1592 "estimated",
1593 )
1594 };
1595 self.event_bus.publish(EngineEvent::new(
1596 "provider.usage",
1597 json!({
1598 "sessionID": session_id,
1599 "correlationID": correlation_ref,
1600 "messageID": user_message_id,
1601 "promptTokens": prompt_tokens,
1602 "completionTokens": completion_tokens,
1603 "totalTokens": total_tokens,
1604 "usageSource": usage_source,
1605 }),
1606 ));
1607 }
1608
1609 if matches!(requested_tool_mode, ToolMode::Required)
1610 && productive_tool_calls_total == 0
1611 {
1612 if requested_write_required
1613 && required_write_retry_count > 0
1614 && productive_write_tool_calls_total == 0
1615 && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1616 {
1617 latest_required_tool_failure_kind =
1618 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1619 }
1620 if requested_write_required
1621 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1622 {
1623 required_write_retry_count += 1;
1624 followup_context = Some(build_write_required_retry_context(
1625 &offered_tool_preview,
1626 latest_required_tool_failure_kind,
1627 &text,
1628 &requested_prewrite_requirements,
1629 productive_workspace_inspection_total > 0,
1630 productive_concrete_read_total > 0,
1631 productive_web_research_total > 0,
1632 successful_web_research_total > 0,
1633 ));
1634 continue;
1635 }
1636 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1637 || productive_concrete_read_total > 0
1638 || productive_web_research_total > 0
1639 || successful_web_research_total > 0;
1640 if should_retry_nonproductive_required_tool_cycle(
1641 requested_write_required,
1642 false,
1643 progress_made_in_cycle,
1644 required_tool_retry_count,
1645 ) {
1646 required_tool_retry_count += 1;
1647 followup_context = Some(build_required_tool_retry_context_for_task(
1648 &offered_tool_preview,
1649 latest_required_tool_failure_kind,
1650 &text,
1651 ));
1652 continue;
1653 }
1654 completion = required_tool_mode_unsatisfied_completion(
1655 latest_required_tool_failure_kind,
1656 );
1657 if !required_tool_unsatisfied_emitted {
1658 required_tool_unsatisfied_emitted = true;
1659 self.event_bus.publish(EngineEvent::new(
1660 "tool.mode.required.unsatisfied",
1661 json!({
1662 "sessionID": session_id,
1663 "messageID": user_message_id,
1664 "iteration": iteration,
1665 "selectedToolCount": allowed_tool_names.len(),
1666 "offeredToolsPreview": offered_tool_preview,
1667 "reason": latest_required_tool_failure_kind.code(),
1668 }),
1669 ));
1670 }
1671 self.event_bus.publish(EngineEvent::new(
1672 "provider.call.iteration.finish",
1673 json!({
1674 "sessionID": session_id,
1675 "messageID": user_message_id,
1676 "iteration": iteration,
1677 "finishReason": "required_tool_unsatisfied",
1678 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1679 "rejectedToolCalls": 0,
1680 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1681 }),
1682 ));
1683 } else {
1684 if completion.trim().is_empty()
1685 && !last_tool_outputs.is_empty()
1686 && requested_write_required
1687 && empty_completion_retry_count == 0
1688 {
1689 empty_completion_retry_count += 1;
1690 followup_context = Some(build_empty_completion_retry_context(
1691 &offered_tool_preview,
1692 &text,
1693 &requested_prewrite_requirements,
1694 productive_workspace_inspection_total > 0,
1695 productive_concrete_read_total > 0,
1696 productive_web_research_total > 0,
1697 successful_web_research_total > 0,
1698 ));
1699 self.event_bus.publish(EngineEvent::new(
1700 "provider.call.iteration.finish",
1701 json!({
1702 "sessionID": session_id,
1703 "messageID": user_message_id,
1704 "iteration": iteration,
1705 "finishReason": "empty_completion_retry",
1706 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1707 "rejectedToolCalls": 0,
1708 }),
1709 ));
1710 continue;
1711 }
1712 let prewrite_gate = evaluate_prewrite_gate(
1713 requested_write_required,
1714 &requested_prewrite_requirements,
1715 PrewriteProgress {
1716 productive_write_tool_calls_total,
1717 productive_workspace_inspection_total,
1718 productive_concrete_read_total,
1719 productive_web_research_total,
1720 successful_web_research_total,
1721 required_write_retry_count,
1722 unmet_prewrite_repair_retry_count,
1723 prewrite_gate_waived,
1724 },
1725 );
1726 if should_start_prewrite_repair_before_first_write(
1727 requested_prewrite_requirements.repair_on_unmet_requirements,
1728 productive_write_tool_calls_total,
1729 prewrite_gate.prewrite_satisfied,
1730 code_workflow_requested,
1731 ) && !prewrite_gate_waived
1732 {
1733 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1734 if unmet_prewrite_repair_retry_count < prewrite_repair_retry_max_attempts()
1735 {
1736 unmet_prewrite_repair_retry_count += 1;
1737 let repair_attempt = unmet_prewrite_repair_retry_count;
1738 let repair_attempts_remaining =
1739 prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
1740 followup_context = Some(build_prewrite_repair_retry_context(
1741 &offered_tool_preview,
1742 latest_required_tool_failure_kind,
1743 &text,
1744 &requested_prewrite_requirements,
1745 productive_workspace_inspection_total > 0,
1746 productive_concrete_read_total > 0,
1747 productive_web_research_total > 0,
1748 successful_web_research_total > 0,
1749 ));
1750 self.event_bus.publish(EngineEvent::new(
1751 "provider.call.iteration.finish",
1752 json!({
1753 "sessionID": session_id,
1754 "messageID": user_message_id,
1755 "iteration": iteration,
1756 "finishReason": "prewrite_repair_retry",
1757 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1758 "rejectedToolCalls": 0,
1759 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1760 "repair": prewrite_repair_event_payload(
1761 repair_attempt,
1762 repair_attempts_remaining,
1763 &unmet_prewrite_codes,
1764 false,
1765 ),
1766 }),
1767 ));
1768 continue;
1769 }
1770 if prewrite_gate_strict_mode() {
1771 self.event_bus.publish(EngineEvent::new(
1772 "prewrite.gate.strict_mode.blocked",
1773 json!({
1774 "sessionID": session_id,
1775 "messageID": user_message_id,
1776 "iteration": iteration,
1777 "unmetCodes": unmet_prewrite_codes,
1778 }),
1779 ));
1780 continue;
1781 }
1782 prewrite_gate_waived = true;
1783 let repair_attempt = unmet_prewrite_repair_retry_count;
1784 let repair_attempts_remaining =
1785 prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
1786 followup_context = Some(build_prewrite_waived_write_context(
1787 &text,
1788 &unmet_prewrite_codes,
1789 ));
1790 self.event_bus.publish(EngineEvent::new(
1791 "prewrite.gate.waived.write_executed",
1792 json!({
1793 "sessionID": session_id,
1794 "messageID": user_message_id,
1795 "unmetCodes": unmet_prewrite_codes,
1796 }),
1797 ));
1798 self.event_bus.publish(EngineEvent::new(
1799 "provider.call.iteration.finish",
1800 json!({
1801 "sessionID": session_id,
1802 "messageID": user_message_id,
1803 "iteration": iteration,
1804 "finishReason": "prewrite_gate_waived",
1805 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1806 "rejectedToolCalls": 0,
1807 "prewriteGateWaived": true,
1808 "repair": prewrite_repair_event_payload(
1809 repair_attempt,
1810 repair_attempts_remaining,
1811 &unmet_prewrite_codes,
1812 true,
1813 ),
1814 }),
1815 ));
1816 continue;
1817 }
1818 if prewrite_gate_waived
1819 && requested_write_required
1820 && productive_write_tool_calls_total == 0
1821 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1822 {
1823 required_write_retry_count += 1;
1824 followup_context = Some(build_write_required_retry_context(
1825 &offered_tool_preview,
1826 latest_required_tool_failure_kind,
1827 &text,
1828 &requested_prewrite_requirements,
1829 productive_workspace_inspection_total > 0,
1830 productive_concrete_read_total > 0,
1831 productive_web_research_total > 0,
1832 successful_web_research_total > 0,
1833 ));
1834 self.event_bus.publish(EngineEvent::new(
1835 "provider.call.iteration.finish",
1836 json!({
1837 "sessionID": session_id,
1838 "messageID": user_message_id,
1839 "iteration": iteration,
1840 "finishReason": "waived_write_retry",
1841 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1842 "rejectedToolCalls": 0,
1843 }),
1844 ));
1845 continue;
1846 }
1847 self.event_bus.publish(EngineEvent::new(
1848 "provider.call.iteration.finish",
1849 json!({
1850 "sessionID": session_id,
1851 "messageID": user_message_id,
1852 "iteration": iteration,
1853 "finishReason": "provider_completion",
1854 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1855 "rejectedToolCalls": 0,
1856 }),
1857 ));
1858 }
1859 break;
1860 }
1861 if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
1862 {
1863 completion =
1864 required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
1865 if !required_tool_unsatisfied_emitted {
1866 self.event_bus.publish(EngineEvent::new(
1867 "tool.mode.required.unsatisfied",
1868 json!({
1869 "sessionID": session_id,
1870 "messageID": user_message_id,
1871 "selectedToolCount": tool_call_counts.len(),
1872 "reason": latest_required_tool_failure_kind.code(),
1873 }),
1874 ));
1875 }
1876 }
1877 if completion.trim().is_empty()
1878 && !last_tool_outputs.is_empty()
1879 && requested_write_required
1880 && productive_write_tool_calls_total > 0
1881 {
1882 let final_prewrite_satisfied = evaluate_prewrite_gate(
1883 requested_write_required,
1884 &requested_prewrite_requirements,
1885 PrewriteProgress {
1886 productive_write_tool_calls_total,
1887 productive_workspace_inspection_total,
1888 productive_concrete_read_total,
1889 productive_web_research_total,
1890 successful_web_research_total,
1891 required_write_retry_count,
1892 unmet_prewrite_repair_retry_count,
1893 prewrite_gate_waived,
1894 },
1895 )
1896 .prewrite_satisfied;
1897 completion = synthesize_artifact_write_completion_from_tool_state(
1898 &text,
1899 final_prewrite_satisfied,
1900 prewrite_gate_waived,
1901 );
1902 }
1903 if completion.trim().is_empty()
1904 && !last_tool_outputs.is_empty()
1905 && should_generate_post_tool_final_narrative(
1906 requested_tool_mode,
1907 productive_tool_calls_total,
1908 )
1909 {
1910 if let Some(narrative) = self
1911 .generate_final_narrative_without_tools(
1912 &session_id,
1913 &active_agent,
1914 Some(provider_id.as_str()),
1915 Some(model_id_value.as_str()),
1916 cancel.clone(),
1917 &last_tool_outputs,
1918 )
1919 .await
1920 {
1921 completion = narrative;
1922 }
1923 }
1924 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1925 if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
1926 completion = summary;
1927 } else if let Some(hint) =
1928 summarize_terminal_tool_failure_for_user(&last_tool_outputs)
1929 {
1930 completion = hint;
1931 } else {
1932 let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
1933 if preview.trim().is_empty() {
1934 completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
1935 } else {
1936 completion = format!(
1937 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
1938 preview
1939 );
1940 }
1941 }
1942 }
1943 if completion.trim().is_empty() {
1944 completion =
1945 "I couldn't produce a final response for that run. Please retry your request."
1946 .to_string();
1947 }
1948 if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
1957 let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
1958 .to_string();
1959 if let Some(note) = latest_email_action_note.as_ref() {
1960 fallback.push_str("\n\nLast email tool status: ");
1961 fallback.push_str(note);
1962 }
1963 fallback.push_str(
1964 "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
1965 );
1966 completion = fallback;
1967 }
1968 completion = strip_model_control_markers(&completion);
1969 truncate_text(&completion, 16_000)
1970 };
1971 emit_event(
1972 Level::INFO,
1973 ProcessKind::Engine,
1974 ObservabilityEvent {
1975 event: "provider.call.finish",
1976 component: "engine.loop",
1977 correlation_id: correlation_ref,
1978 session_id: Some(&session_id),
1979 run_id: None,
1980 message_id: Some(&user_message_id),
1981 provider_id: Some(provider_id.as_str()),
1982 model_id,
1983 status: Some("ok"),
1984 error_code: None,
1985 detail: Some("provider stream complete"),
1986 },
1987 );
1988 if active_agent.name.eq_ignore_ascii_case("plan") {
1989 emit_plan_todo_fallback(
1990 self.storage.clone(),
1991 &self.event_bus,
1992 &session_id,
1993 &user_message_id,
1994 &completion,
1995 )
1996 .await;
1997 let todos_after_fallback = self.storage.get_todos(&session_id).await;
1998 if todos_after_fallback.is_empty() && !question_tool_used {
1999 emit_plan_question_fallback(
2000 self.storage.clone(),
2001 &self.event_bus,
2002 &session_id,
2003 &user_message_id,
2004 &completion,
2005 )
2006 .await;
2007 }
2008 }
2009 if cancel.is_cancelled() {
2010 self.event_bus.publish(EngineEvent::new(
2011 "session.status",
2012 json!({"sessionID": session_id, "status":"cancelled"}),
2013 ));
2014 self.cancellations.remove(&session_id).await;
2015 return Ok(());
2016 }
2017 let assistant = Message::new(
2018 MessageRole::Assistant,
2019 vec![MessagePart::Text {
2020 text: completion.clone(),
2021 }],
2022 );
2023 let assistant_message_id = assistant.id.clone();
2024 self.storage.append_message(&session_id, assistant).await?;
2025 let final_part = WireMessagePart::text(
2026 &session_id,
2027 &assistant_message_id,
2028 truncate_text(&completion, 16_000),
2029 );
2030 self.event_bus.publish(EngineEvent::new(
2031 "message.part.updated",
2032 json!({"part": final_part}),
2033 ));
2034 self.event_bus.publish(EngineEvent::new(
2035 "session.updated",
2036 json!({"sessionID": session_id, "status":"idle"}),
2037 ));
2038 self.event_bus.publish(EngineEvent::new(
2039 "session.status",
2040 json!({"sessionID": session_id, "status":"idle"}),
2041 ));
2042 self.cancellations.remove(&session_id).await;
2043 Ok(())
2044 }
2045}