1use super::*;
2
3impl EngineLoop {
4 pub async fn run_prompt_async_with_context(
5 &self,
6 session_id: String,
7 req: SendMessageRequest,
8 correlation_id: Option<String>,
9 ) -> anyhow::Result<()> {
10 let session_model = self
11 .storage
12 .get_session(&session_id)
13 .await
14 .and_then(|s| s.model);
15 let (provider_id, model_id_value) =
16 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17 anyhow::anyhow!(
18 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19 )
20 })?;
21 let correlation_ref = correlation_id.as_deref();
22 let model_id = Some(model_id_value.as_str());
23 let cancel = self.cancellations.create(&session_id).await;
24 emit_event(
25 Level::INFO,
26 ProcessKind::Engine,
27 ObservabilityEvent {
28 event: "provider.call.start",
29 component: "engine.loop",
30 correlation_id: correlation_ref,
31 session_id: Some(&session_id),
32 run_id: None,
33 message_id: None,
34 provider_id: Some(provider_id.as_str()),
35 model_id,
36 status: Some("start"),
37 error_code: None,
38 detail: Some("run_prompt_async dispatch"),
39 },
40 );
41 self.event_bus.publish(EngineEvent::new(
42 "session.status",
43 json!({"sessionID": session_id, "status":"running"}),
44 ));
45 let request_parts = req.parts.clone();
46 let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47 let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48 let requested_write_required = req.write_required.unwrap_or(false);
49 let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50 let prewrite_repair_budget = prewrite_repair_retry_budget(&requested_prewrite_requirements);
51 let prewrite_fail_closed = prewrite_gate_strict_mode(&requested_prewrite_requirements);
52 let request_tool_allowlist = req
53 .tool_allowlist
54 .clone()
55 .unwrap_or_default()
56 .into_iter()
57 .map(|tool| normalize_tool_name(&tool))
58 .filter(|tool| !tool.trim().is_empty())
59 .collect::<HashSet<_>>();
60 if !request_tool_allowlist.is_empty() {
63 self.set_session_allowed_tools(
64 &session_id,
65 request_tool_allowlist.iter().cloned().collect(),
66 )
67 .await;
68 }
69 let text = req
70 .parts
71 .iter()
72 .map(|p| match p {
73 MessagePartInput::Text { text } => text.clone(),
74 MessagePartInput::File {
75 mime,
76 filename,
77 url,
78 } => format!(
79 "[file mime={} name={} url={}]",
80 mime,
81 filename.clone().unwrap_or_else(|| "unknown".to_string()),
82 url
83 ),
84 })
85 .collect::<Vec<_>>()
86 .join("\n");
87 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
88 self.auto_rename_session_from_user_text(&session_id, &text)
89 .await;
90 let active_agent = self.agents.get(req.agent.as_deref()).await;
91 let mut user_message_id = self
92 .find_recent_matching_user_message_id(&session_id, &text)
93 .await;
94 if user_message_id.is_none() {
95 let user_message = Message::new(
96 MessageRole::User,
97 vec![MessagePart::Text { text: text.clone() }],
98 );
99 let created_message_id = user_message.id.clone();
100 self.storage
101 .append_message(&session_id, user_message)
102 .await?;
103
104 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
105 self.event_bus.publish(EngineEvent::new(
106 "message.part.updated",
107 json!({
108 "part": user_part,
109 "delta": text,
110 "agent": active_agent.name
111 }),
112 ));
113 user_message_id = Some(created_message_id);
114 }
115 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
116
117 if cancel.is_cancelled() {
118 self.event_bus.publish(EngineEvent::new(
119 "session.status",
120 json!({"sessionID": session_id, "status":"cancelled"}),
121 ));
122 self.cancellations.remove(&session_id).await;
123 return Ok(());
124 }
125
126 let mut question_tool_used = false;
127 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
128 if normalize_tool_name(&tool) == "question" {
129 question_tool_used = true;
130 }
131 if !agent_can_use_tool(&active_agent, &tool) {
132 format!(
133 "Tool `{tool}` is not enabled for agent `{}`.",
134 active_agent.name
135 )
136 } else {
137 self.execute_tool_with_permission(
138 &session_id,
139 &user_message_id,
140 tool.clone(),
141 args,
142 None,
143 active_agent.skills.as_deref(),
144 &text,
145 requested_write_required,
146 None,
147 cancel.clone(),
148 )
149 .await?
150 .unwrap_or_default()
151 }
152 } else {
153 let mut completion = String::new();
154 let mut max_iterations = max_tool_iterations();
155 let mut followup_context: Option<String> = None;
156 let mut last_tool_outputs: Vec<String> = Vec::new();
157 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
158 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
159 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
160 let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
161 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
162 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
163 let mut websearch_query_blocked = false;
164 let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
165 let mut pack_builder_executed = false;
166 let mut auto_workspace_probe_attempted = false;
167 let mut productive_tool_calls_total = 0usize;
168 let mut productive_write_tool_calls_total = 0usize;
169 let mut productive_workspace_inspection_total = 0usize;
170 let mut productive_web_research_total = 0usize;
171 let mut productive_concrete_read_total = 0usize;
172 let mut successful_web_research_total = 0usize;
173 let mut required_tool_retry_count = 0usize;
174 let mut required_write_retry_count = 0usize;
175 let mut unmet_prewrite_repair_retry_count = 0usize;
176 let mut empty_completion_retry_count = 0usize;
177 let mut prewrite_gate_waived = false;
178 let mut invalid_tool_args_retry_count = 0usize;
179 let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
180 let mut required_tool_unsatisfied_emitted = false;
181 let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
182 let email_delivery_requested = requires_email_delivery_prompt(&text);
183 let web_research_requested = requires_web_research_prompt(&text);
184 let code_workflow_requested = infer_code_workflow_from_text(&text);
185 let mut email_action_executed = false;
186 let mut latest_email_action_note: Option<String> = None;
187 let mut email_tools_ever_offered = false;
188 let intent = classify_intent(&text);
189 let router_enabled = tool_router_enabled();
190 let retrieval_enabled = semantic_tool_retrieval_enabled();
191 let retrieval_k = semantic_tool_retrieval_k();
192 let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
193 self.tools.mcp_server_names().await
194 } else {
195 Vec::new()
196 };
197 let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
198 let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
199 && runtime_attachments.is_empty()
200 && is_short_simple_prompt(&text)
201 && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
202
203 while max_iterations > 0 && !cancel.is_cancelled() {
204 let iteration = 26usize.saturating_sub(max_iterations);
205 max_iterations -= 1;
206 let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
207 ChatHistoryProfile::Full
208 } else if matches!(requested_context_mode, ContextMode::Compact)
209 || context_is_auto_compact
210 {
211 ChatHistoryProfile::Compact
212 } else {
213 ChatHistoryProfile::Standard
214 };
215 let mut messages =
216 load_chat_history(self.storage.clone(), &session_id, context_profile).await;
217 if iteration == 1 && !runtime_attachments.is_empty() {
218 attach_to_last_user_message(&mut messages, &runtime_attachments);
219 }
220 let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
221 self.event_bus.publish(EngineEvent::new(
222 "context.profile.selected",
223 json!({
224 "sessionID": session_id,
225 "messageID": user_message_id,
226 "iteration": iteration,
227 "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
228 "historyMessageCount": messages.len(),
229 "historyCharCount": history_char_count,
230 "memoryInjected": false
231 }),
232 ));
233 let mut system_parts = vec![tandem_runtime_system_prompt(
234 &self.host_runtime_context,
235 &mcp_server_names,
236 )];
237 if let Some(system) = active_agent.system_prompt.as_ref() {
238 system_parts.push(system.clone());
239 }
240 messages.insert(
241 0,
242 ChatMessage {
243 role: "system".to_string(),
244 content: system_parts.join("\n\n"),
245 attachments: Vec::new(),
246 },
247 );
248 if let Some(extra) = followup_context.take() {
249 messages.push(ChatMessage {
250 role: "user".to_string(),
251 content: extra,
252 attachments: Vec::new(),
253 });
254 }
255 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
256 let ctx = PromptContextHookContext {
257 session_id: session_id.clone(),
258 message_id: user_message_id.clone(),
259 provider_id: provider_id.clone(),
260 model_id: model_id_value.clone(),
261 iteration,
262 };
263 let hook_timeout =
264 Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
265 match tokio::time::timeout(
266 hook_timeout,
267 hook.augment_provider_messages(ctx, messages.clone()),
268 )
269 .await
270 {
271 Ok(Ok(augmented)) => {
272 messages = augmented;
273 }
274 Ok(Err(err)) => {
275 self.event_bus.publish(EngineEvent::new(
276 "memory.context.error",
277 json!({
278 "sessionID": session_id,
279 "messageID": user_message_id,
280 "iteration": iteration,
281 "error": truncate_text(&err.to_string(), 500),
282 }),
283 ));
284 }
285 Err(_) => {
286 self.event_bus.publish(EngineEvent::new(
287 "memory.context.error",
288 json!({
289 "sessionID": session_id,
290 "messageID": user_message_id,
291 "iteration": iteration,
292 "error": format!(
293 "prompt context hook timeout after {} ms",
294 hook_timeout.as_millis()
295 ),
296 }),
297 ));
298 }
299 }
300 }
301 let all_tools = self.tools.list().await;
302 let mut retrieval_fallback_reason: Option<&'static str> = None;
303 let mut candidate_tools = if retrieval_enabled {
304 self.tools.retrieve(&text, retrieval_k).await
305 } else {
306 all_tools.clone()
307 };
308 if retrieval_enabled {
309 if candidate_tools.is_empty() && !all_tools.is_empty() {
310 candidate_tools = all_tools.clone();
311 retrieval_fallback_reason = Some("retrieval_empty_result");
312 } else if web_research_requested
313 && has_web_research_tools(&all_tools)
314 && !has_web_research_tools(&candidate_tools)
315 && required_write_retry_count == 0
316 {
317 candidate_tools = all_tools.clone();
318 retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
319 } else if email_delivery_requested
320 && has_email_action_tools(&all_tools)
321 && !has_email_action_tools(&candidate_tools)
322 {
323 candidate_tools = all_tools.clone();
324 retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
325 }
326 }
327 let mut tool_schemas = if !router_enabled {
328 candidate_tools
329 } else {
330 match requested_tool_mode {
331 ToolMode::None => Vec::new(),
332 ToolMode::Required => select_tool_subset(
333 candidate_tools,
334 intent,
335 &request_tool_allowlist,
336 iteration > 1,
337 ),
338 ToolMode::Auto => {
339 if !auto_tools_escalated {
340 Vec::new()
341 } else {
342 select_tool_subset(
343 candidate_tools,
344 intent,
345 &request_tool_allowlist,
346 iteration > 1,
347 )
348 }
349 }
350 }
351 };
352 let mut policy_patterns =
353 request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
354 if let Some(agent_tools) = active_agent.tools.as_ref() {
355 policy_patterns
356 .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
357 }
358 let session_allowed_tools = self
359 .session_allowed_tools
360 .read()
361 .await
362 .get(&session_id)
363 .cloned()
364 .unwrap_or_default();
365 policy_patterns.extend(session_allowed_tools.iter().cloned());
366 if !policy_patterns.is_empty() {
367 let mut included = tool_schemas
368 .iter()
369 .map(|schema| normalize_tool_name(&schema.name))
370 .collect::<HashSet<_>>();
371 for schema in &all_tools {
372 let normalized = normalize_tool_name(&schema.name);
373 if policy_patterns
374 .iter()
375 .any(|pattern| tool_name_matches_policy(pattern, &normalized))
376 && included.insert(normalized)
377 {
378 tool_schemas.push(schema.clone());
379 }
380 }
381 }
382 if !request_tool_allowlist.is_empty() {
383 tool_schemas.retain(|schema| {
384 let tool = normalize_tool_name(&schema.name);
385 request_tool_allowlist
386 .iter()
387 .any(|pattern| tool_name_matches_policy(pattern, &tool))
388 });
389 }
390 let prewrite_gate = evaluate_prewrite_gate(
391 requested_write_required,
392 &requested_prewrite_requirements,
393 PrewriteProgress {
394 productive_write_tool_calls_total,
395 productive_workspace_inspection_total,
396 productive_concrete_read_total,
397 productive_web_research_total,
398 successful_web_research_total,
399 required_write_retry_count,
400 unmet_prewrite_repair_retry_count,
401 prewrite_gate_waived,
402 },
403 );
404 let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
405 let prewrite_gate_write = prewrite_gate.gate_write;
406 let force_write_only_retry = prewrite_gate.force_write_only_retry;
407 let allow_repair_tools = prewrite_gate.allow_repair_tools;
408 if prewrite_gate_write {
409 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
410 }
411 if requested_prewrite_requirements.repair_on_unmet_requirements
412 && productive_write_tool_calls_total >= 3
413 {
414 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
415 }
416 if allow_repair_tools {
417 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
418 let repair_tools = tool_schemas
419 .iter()
420 .filter(|schema| {
421 tool_matches_unmet_prewrite_repair_requirement(
422 &schema.name,
423 &unmet_prewrite_codes,
424 productive_workspace_inspection_total > 0,
425 )
426 })
427 .cloned()
428 .collect::<Vec<_>>();
429 if !repair_tools.is_empty() {
430 tool_schemas = repair_tools;
431 }
432 }
433 if force_write_only_retry && !allow_repair_tools {
434 tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
435 }
436 if active_agent.tools.is_some() {
437 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
438 }
439 tool_schemas.retain(|schema| {
440 let normalized = normalize_tool_name(&schema.name);
441 if let Some(server) = mcp_server_from_tool_name(&normalized) {
442 !blocked_mcp_servers.contains(server)
443 } else {
444 true
445 }
446 });
447 if let Some(allowed_tools) = self
448 .session_allowed_tools
449 .read()
450 .await
451 .get(&session_id)
452 .cloned()
453 {
454 if !allowed_tools.is_empty() {
455 tool_schemas.retain(|schema| {
456 let normalized = normalize_tool_name(&schema.name);
457 any_policy_matches(&allowed_tools, &normalized)
458 });
459 }
460 }
461 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
462 let detail = validation_err.to_string();
463 emit_event(
464 Level::ERROR,
465 ProcessKind::Engine,
466 ObservabilityEvent {
467 event: "provider.call.error",
468 component: "engine.loop",
469 correlation_id: correlation_ref,
470 session_id: Some(&session_id),
471 run_id: None,
472 message_id: Some(&user_message_id),
473 provider_id: Some(provider_id.as_str()),
474 model_id,
475 status: Some("failed"),
476 error_code: Some("TOOL_SCHEMA_INVALID"),
477 detail: Some(&detail),
478 },
479 );
480 anyhow::bail!("{detail}");
481 }
482 let routing_decision = ToolRoutingDecision {
483 pass: if auto_tools_escalated { 2 } else { 1 },
484 mode: match requested_tool_mode {
485 ToolMode::Auto => default_mode_name(),
486 ToolMode::None => "none",
487 ToolMode::Required => "required",
488 },
489 intent,
490 selected_count: tool_schemas.len(),
491 total_available_count: all_tools.len(),
492 mcp_included: tool_schemas
493 .iter()
494 .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
495 };
496 self.event_bus.publish(EngineEvent::new(
497 "tool.routing.decision",
498 json!({
499 "sessionID": session_id,
500 "messageID": user_message_id,
501 "iteration": iteration,
502 "pass": routing_decision.pass,
503 "mode": routing_decision.mode,
504 "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
505 "selectedToolCount": routing_decision.selected_count,
506 "totalAvailableTools": routing_decision.total_available_count,
507 "mcpIncluded": routing_decision.mcp_included,
508 "retrievalEnabled": retrieval_enabled,
509 "retrievalK": retrieval_k,
510 "fallbackToFullTools": retrieval_fallback_reason.is_some(),
511 "fallbackReason": retrieval_fallback_reason
512 }),
513 ));
514 let allowed_tool_names = tool_schemas
515 .iter()
516 .map(|schema| normalize_tool_name(&schema.name))
517 .collect::<HashSet<_>>();
518 if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
519 email_tools_ever_offered = true;
520 }
521 let offered_tool_preview = tool_schemas
522 .iter()
523 .take(8)
524 .map(|schema| normalize_tool_name(&schema.name))
525 .collect::<Vec<_>>()
526 .join(", ");
527 self.event_bus.publish(EngineEvent::new(
528 "provider.call.iteration.start",
529 json!({
530 "sessionID": session_id,
531 "messageID": user_message_id,
532 "iteration": iteration,
533 "selectedToolCount": allowed_tool_names.len(),
534 }),
535 ));
536 let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
537 let provider_connect_timeout =
538 Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
539 let stream_result = tokio::time::timeout(
540 provider_connect_timeout,
541 self.providers.stream_for_provider(
542 Some(provider_id.as_str()),
543 Some(model_id_value.as_str()),
544 messages,
545 requested_tool_mode.clone(),
546 Some(tool_schemas),
547 cancel.clone(),
548 ),
549 )
550 .await
551 .map_err(|_| {
552 anyhow::anyhow!(
553 "provider stream connect timeout after {} ms",
554 provider_connect_timeout.as_millis()
555 )
556 })
557 .and_then(|result| result);
558 let stream = match stream_result {
559 Ok(stream) => stream,
560 Err(err) => {
561 let error_text = err.to_string();
562 let error_code = provider_error_code(&error_text);
563 let detail = truncate_text(&error_text, 500);
564 emit_event(
565 Level::ERROR,
566 ProcessKind::Engine,
567 ObservabilityEvent {
568 event: "provider.call.error",
569 component: "engine.loop",
570 correlation_id: correlation_ref,
571 session_id: Some(&session_id),
572 run_id: None,
573 message_id: Some(&user_message_id),
574 provider_id: Some(provider_id.as_str()),
575 model_id,
576 status: Some("failed"),
577 error_code: Some(error_code),
578 detail: Some(&detail),
579 },
580 );
581 self.event_bus.publish(EngineEvent::new(
582 "provider.call.iteration.error",
583 json!({
584 "sessionID": session_id,
585 "messageID": user_message_id,
586 "iteration": iteration,
587 "error": detail,
588 }),
589 ));
590 return Err(err);
591 }
592 };
593 tokio::pin!(stream);
594 completion.clear();
595 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
596 let mut provider_usage: Option<TokenUsage> = None;
597 let mut accepted_tool_calls_in_cycle = 0usize;
598 let provider_idle_timeout =
599 Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
600 loop {
601 let next_chunk_result =
602 tokio::time::timeout(provider_idle_timeout, stream.next())
603 .await
604 .map_err(|_| {
605 anyhow::anyhow!(
606 "provider stream idle timeout after {} ms",
607 provider_idle_timeout.as_millis()
608 )
609 });
610 let next_chunk = match next_chunk_result {
611 Ok(next_chunk) => next_chunk,
612 Err(err) => {
613 self.event_bus.publish(EngineEvent::new(
614 "provider.call.iteration.error",
615 json!({
616 "sessionID": session_id,
617 "messageID": user_message_id,
618 "iteration": iteration,
619 "error": truncate_text(&err.to_string(), 500),
620 }),
621 ));
622 return Err(err);
623 }
624 };
625 let Some(chunk) = next_chunk else {
626 break;
627 };
628 let chunk = match chunk {
629 Ok(chunk) => chunk,
630 Err(err) => {
631 let error_text = err.to_string();
632 let error_code = provider_error_code(&error_text);
633 let detail = truncate_text(&error_text, 500);
634 emit_event(
635 Level::ERROR,
636 ProcessKind::Engine,
637 ObservabilityEvent {
638 event: "provider.call.error",
639 component: "engine.loop",
640 correlation_id: correlation_ref,
641 session_id: Some(&session_id),
642 run_id: None,
643 message_id: Some(&user_message_id),
644 provider_id: Some(provider_id.as_str()),
645 model_id,
646 status: Some("failed"),
647 error_code: Some(error_code),
648 detail: Some(&detail),
649 },
650 );
651 self.event_bus.publish(EngineEvent::new(
652 "provider.call.iteration.error",
653 json!({
654 "sessionID": session_id,
655 "messageID": user_message_id,
656 "iteration": iteration,
657 "error": detail,
658 }),
659 ));
660 return Err(anyhow::anyhow!(
661 "provider stream chunk error: {error_text}"
662 ));
663 }
664 };
665 match chunk {
666 StreamChunk::TextDelta(delta) => {
667 let delta = strip_model_control_markers(&delta);
668 if delta.trim().is_empty() {
669 continue;
670 }
671 if completion.is_empty() {
672 emit_event(
673 Level::INFO,
674 ProcessKind::Engine,
675 ObservabilityEvent {
676 event: "provider.call.first_byte",
677 component: "engine.loop",
678 correlation_id: correlation_ref,
679 session_id: Some(&session_id),
680 run_id: None,
681 message_id: Some(&user_message_id),
682 provider_id: Some(provider_id.as_str()),
683 model_id,
684 status: Some("streaming"),
685 error_code: None,
686 detail: Some("first text delta"),
687 },
688 );
689 }
690 completion.push_str(&delta);
691 let delta = truncate_text(&delta, 4_000);
692 let delta_part =
693 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
694 self.event_bus.publish(EngineEvent::new(
695 "message.part.updated",
696 json!({"part": delta_part, "delta": delta}),
697 ));
698 }
699 StreamChunk::ReasoningDelta(_reasoning) => {}
700 StreamChunk::Done {
701 finish_reason: _,
702 usage,
703 } => {
704 if usage.is_some() {
705 provider_usage = usage;
706 }
707 break;
708 }
709 StreamChunk::ToolCallStart { id, name } => {
710 let entry = streamed_tool_calls.entry(id).or_default();
711 if entry.name.is_empty() {
712 entry.name = name;
713 }
714 }
715 StreamChunk::ToolCallDelta { id, args_delta } => {
716 let entry = streamed_tool_calls.entry(id.clone()).or_default();
717 entry.args.push_str(&args_delta);
718 let tool_name = if entry.name.trim().is_empty() {
719 "tool".to_string()
720 } else {
721 normalize_tool_name(&entry.name)
722 };
723 let parsed_preview = if entry.name.trim().is_empty() {
724 Value::String(truncate_text(&entry.args, 1_000))
725 } else {
726 parse_streamed_tool_args(&tool_name, &entry.args)
727 };
728 let mut tool_part = WireMessagePart::tool_invocation(
729 &session_id,
730 &user_message_id,
731 tool_name.clone(),
732 parsed_preview.clone(),
733 );
734 tool_part.id = Some(id.clone());
735 if tool_name == "write" {
736 tracing::info!(
737 session_id = %session_id,
738 message_id = %user_message_id,
739 tool_call_id = %id,
740 args_delta_len = args_delta.len(),
741 accumulated_args_len = entry.args.len(),
742 parsed_preview_empty = parsed_preview.is_null()
743 || parsed_preview.as_object().is_some_and(|value| value.is_empty())
744 || parsed_preview
745 .as_str()
746 .map(|value| value.trim().is_empty())
747 .unwrap_or(false),
748 "streamed write tool args delta received"
749 );
750 }
751 self.event_bus.publish(EngineEvent::new(
752 "message.part.updated",
753 json!({
754 "part": tool_part,
755 "toolCallDelta": {
756 "id": id,
757 "tool": tool_name,
758 "argsDelta": truncate_text(&args_delta, 1_000),
759 "rawArgsPreview": truncate_text(&entry.args, 2_000),
760 "parsedArgsPreview": parsed_preview
761 }
762 }),
763 ));
764 }
765 StreamChunk::ToolCallEnd { id: _ } => {}
766 }
767 if cancel.is_cancelled() {
768 break;
769 }
770 }
771
772 let streamed_tool_call_count = streamed_tool_calls.len();
773 let streamed_tool_call_parse_failed = streamed_tool_calls
774 .values()
775 .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
776 let mut tool_calls = streamed_tool_calls
777 .into_iter()
778 .filter_map(|(call_id, call)| {
779 if call.name.trim().is_empty() {
780 return None;
781 }
782 let tool_name = normalize_tool_name(&call.name);
783 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
784 Some(ParsedToolCall {
785 tool: tool_name,
786 args: parsed_args,
787 call_id: Some(call_id),
788 })
789 })
790 .collect::<Vec<_>>();
791 if tool_calls.is_empty() {
792 tool_calls = parse_tool_invocations_from_response(&completion)
793 .into_iter()
794 .map(|(tool, args)| ParsedToolCall {
795 tool,
796 args,
797 call_id: None,
798 })
799 .collect::<Vec<_>>();
800 }
801 let provider_tool_parse_failed = tool_calls.is_empty()
802 && (streamed_tool_call_parse_failed
803 || (streamed_tool_call_count > 0
804 && looks_like_unparsed_tool_payload(&completion))
805 || looks_like_unparsed_tool_payload(&completion));
806 if provider_tool_parse_failed {
807 latest_required_tool_failure_kind =
808 RequiredToolFailureKind::ToolCallParseFailed;
809 } else if tool_calls.is_empty() {
810 latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
811 }
812 if router_enabled
813 && matches!(requested_tool_mode, ToolMode::Auto)
814 && !auto_tools_escalated
815 && iteration == 1
816 && should_escalate_auto_tools(intent, &text, &completion)
817 {
818 auto_tools_escalated = true;
819 followup_context = Some(
820 "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
821 .to_string(),
822 );
823 self.event_bus.publish(EngineEvent::new(
824 "provider.call.iteration.finish",
825 json!({
826 "sessionID": session_id,
827 "messageID": user_message_id,
828 "iteration": iteration,
829 "finishReason": "auto_escalate",
830 "acceptedToolCalls": accepted_tool_calls_in_cycle,
831 "rejectedToolCalls": 0,
832 }),
833 ));
834 continue;
835 }
836 if tool_calls.is_empty()
837 && !auto_workspace_probe_attempted
838 && should_force_workspace_probe(&text, &completion)
839 && allowed_tool_names.contains("glob")
840 {
841 auto_workspace_probe_attempted = true;
842 tool_calls = vec![ParsedToolCall {
843 tool: "glob".to_string(),
844 args: json!({ "pattern": "*" }),
845 call_id: None,
846 }];
847 }
848 if !tool_calls.is_empty() {
849 let saw_tool_call_candidate = true;
850 let mut outputs = Vec::new();
851 let mut executed_productive_tool = false;
852 let mut write_tool_attempted_in_cycle = false;
853 let mut auth_required_hit_in_cycle = false;
854 let mut guard_budget_hit_in_cycle = false;
855 let mut duplicate_signature_hit_in_cycle = false;
856 let mut rejected_tool_call_in_cycle = false;
857 for ParsedToolCall {
858 tool,
859 args,
860 call_id,
861 } in tool_calls
862 {
863 if !agent_can_use_tool(&active_agent, &tool) {
864 rejected_tool_call_in_cycle = true;
865 continue;
866 }
867 let tool_key = normalize_tool_name(&tool);
868 if is_workspace_write_tool(&tool_key) {
869 write_tool_attempted_in_cycle = true;
870 }
871 if !allowed_tool_names.contains(&tool_key) {
872 rejected_tool_call_in_cycle = true;
873 let note = if offered_tool_preview.is_empty() {
874 format!(
875 "Tool `{}` call skipped: it is not available in this turn.",
876 tool_key
877 )
878 } else {
879 format!(
880 "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
881 tool_key, offered_tool_preview
882 )
883 };
884 self.event_bus.publish(EngineEvent::new(
885 "tool.call.rejected_unoffered",
886 json!({
887 "sessionID": session_id,
888 "messageID": user_message_id,
889 "iteration": iteration,
890 "tool": tool_key,
891 "offeredToolCount": allowed_tool_names.len()
892 }),
893 ));
894 if tool_name_looks_like_email_action(&tool_key) {
895 latest_email_action_note = Some(note.clone());
896 }
897 outputs.push(note);
898 continue;
899 }
900 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
901 if blocked_mcp_servers.contains(server) {
902 rejected_tool_call_in_cycle = true;
903 outputs.push(format!(
904 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
905 tool_key, server
906 ));
907 continue;
908 }
909 }
910 if tool_key == "question" {
911 question_tool_used = true;
912 }
913 if tool_key == "pack_builder" && pack_builder_executed {
914 rejected_tool_call_in_cycle = true;
915 outputs.push(
916 "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
917 .to_string(),
918 );
919 continue;
920 }
921 if websearch_query_blocked && tool_key == "websearch" {
922 rejected_tool_call_in_cycle = true;
923 outputs.push(
924 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
925 .to_string(),
926 );
927 continue;
928 }
929 let mut effective_args = args.clone();
930 if tool_key == "todo_write" {
931 effective_args = normalize_todo_write_args(effective_args, &completion);
932 if is_empty_todo_write_args(&effective_args) {
933 rejected_tool_call_in_cycle = true;
934 outputs.push(
935 "Tool `todo_write` call skipped: empty todo payload."
936 .to_string(),
937 );
938 continue;
939 }
940 }
941 let signature = if tool_key == "batch" {
942 batch_tool_signature(&args)
943 .unwrap_or_else(|| tool_signature(&tool_key, &args))
944 } else {
945 tool_signature(&tool_key, &args)
946 };
947 if is_shell_tool_name(&tool_key)
948 && shell_mismatch_signatures.contains(&signature)
949 {
950 rejected_tool_call_in_cycle = true;
951 outputs.push(
952 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
953 .to_string(),
954 );
955 continue;
956 }
957 let mut signature_count = 1usize;
958 if is_read_only_tool(&tool_key)
959 || (tool_key == "batch" && is_read_only_batch_call(&args))
960 {
961 let count = readonly_signature_counts
962 .entry(signature.clone())
963 .and_modify(|v| *v = v.saturating_add(1))
964 .or_insert(1);
965 signature_count = *count;
966 if tool_key == "websearch" {
967 if let Some(limit) = websearch_duplicate_signature_limit {
968 if *count > limit {
969 rejected_tool_call_in_cycle = true;
970 self.event_bus.publish(EngineEvent::new(
971 "tool.loop_guard.triggered",
972 json!({
973 "sessionID": session_id,
974 "messageID": user_message_id,
975 "tool": tool_key,
976 "reason": "duplicate_signature_retry_exhausted",
977 "duplicateLimit": limit,
978 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
979 "loop_guard_triggered": true
980 }),
981 ));
982 outputs.push(
983 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
984 .to_string(),
985 );
986 continue;
987 }
988 }
989 }
990 if tool_key != "websearch" && *count > 1 {
991 rejected_tool_call_in_cycle = true;
992 if let Some(cached) = readonly_tool_cache.get(&signature) {
993 outputs.push(cached.clone());
994 } else {
995 outputs.push(format!(
996 "Tool `{}` call skipped: duplicate call signature detected.",
997 tool_key
998 ));
999 }
1000 continue;
1001 }
1002 }
1003 let is_read_only_signature = is_read_only_tool(&tool_key)
1004 || (tool_key == "batch" && is_read_only_batch_call(&args));
1005 if !is_read_only_signature {
1006 let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1007 let seen = mutable_signature_counts
1008 .entry(signature.clone())
1009 .and_modify(|v| *v = v.saturating_add(1))
1010 .or_insert(1);
1011 if *seen > duplicate_limit {
1012 rejected_tool_call_in_cycle = true;
1013 self.event_bus.publish(EngineEvent::new(
1014 "tool.loop_guard.triggered",
1015 json!({
1016 "sessionID": session_id,
1017 "messageID": user_message_id,
1018 "tool": tool_key,
1019 "reason": "duplicate_signature_retry_exhausted",
1020 "signatureHash": stable_hash(&signature),
1021 "duplicateLimit": duplicate_limit,
1022 "loop_guard_triggered": true
1023 }),
1024 ));
1025 outputs.push(format!(
1026 "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1027 tool_key, duplicate_limit
1028 ));
1029 duplicate_signature_hit_in_cycle = true;
1030 continue;
1031 }
1032 }
1033 let budget = tool_budget_for(&tool_key);
1034 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1035 if *entry >= budget {
1036 rejected_tool_call_in_cycle = true;
1037 outputs.push(format!(
1038 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1039 tool_key, budget
1040 ));
1041 guard_budget_hit_in_cycle = true;
1042 continue;
1043 }
1044 let mut finalized_part = WireMessagePart::tool_invocation(
1045 &session_id,
1046 &user_message_id,
1047 tool.clone(),
1048 effective_args.clone(),
1049 );
1050 if let Some(call_id) = call_id.clone() {
1051 finalized_part.id = Some(call_id);
1052 }
1053 finalized_part.state = Some("pending".to_string());
1054 self.event_bus.publish(EngineEvent::new(
1055 "message.part.updated",
1056 json!({"part": finalized_part}),
1057 ));
1058 *entry += 1;
1059 accepted_tool_calls_in_cycle =
1060 accepted_tool_calls_in_cycle.saturating_add(1);
1061 if let Some(output) = self
1062 .execute_tool_with_permission(
1063 &session_id,
1064 &user_message_id,
1065 tool,
1066 effective_args,
1067 call_id,
1068 active_agent.skills.as_deref(),
1069 &text,
1070 requested_write_required,
1071 Some(&completion),
1072 cancel.clone(),
1073 )
1074 .await?
1075 {
1076 let productive = is_productive_tool_output(&tool_key, &output);
1077 if output.contains("WEBSEARCH_QUERY_MISSING") {
1078 websearch_query_blocked = true;
1079 }
1080 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1081 {
1082 shell_mismatch_signatures.insert(signature.clone());
1083 }
1084 if is_read_only_tool(&tool_key)
1085 && tool_key != "websearch"
1086 && signature_count == 1
1087 {
1088 readonly_tool_cache.insert(signature, output.clone());
1089 }
1090 if productive {
1091 productive_tool_calls_total =
1092 productive_tool_calls_total.saturating_add(1);
1093 if is_workspace_write_tool(&tool_key) {
1094 productive_write_tool_calls_total =
1095 productive_write_tool_calls_total.saturating_add(1);
1096 }
1097 if is_workspace_inspection_tool(&tool_key) {
1098 productive_workspace_inspection_total =
1099 productive_workspace_inspection_total.saturating_add(1);
1100 }
1101 if tool_key == "read" {
1102 productive_concrete_read_total =
1103 productive_concrete_read_total.saturating_add(1);
1104 }
1105 if is_web_research_tool(&tool_key) {
1106 productive_web_research_total =
1107 productive_web_research_total.saturating_add(1);
1108 if is_successful_web_research_output(&tool_key, &output) {
1109 successful_web_research_total =
1110 successful_web_research_total.saturating_add(1);
1111 }
1112 }
1113 executed_productive_tool = true;
1114 if tool_key == "pack_builder" {
1115 pack_builder_executed = true;
1116 }
1117 }
1118 if tool_name_looks_like_email_action(&tool_key) {
1119 if productive {
1120 email_action_executed = true;
1121 } else {
1122 latest_email_action_note =
1123 Some(truncate_text(&output, 280).replace('\n', " "));
1124 }
1125 }
1126 if is_auth_required_tool_output(&output) {
1127 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1128 blocked_mcp_servers.insert(server.to_string());
1129 }
1130 auth_required_hit_in_cycle = true;
1131 }
1132 outputs.push(output);
1133 if auth_required_hit_in_cycle {
1134 break;
1135 }
1136 if guard_budget_hit_in_cycle {
1137 break;
1138 }
1139 }
1140 }
1141 if !outputs.is_empty() {
1142 last_tool_outputs = outputs.clone();
1143 if matches!(requested_tool_mode, ToolMode::Required)
1144 && productive_tool_calls_total == 0
1145 {
1146 latest_required_tool_failure_kind = classify_required_tool_failure(
1147 &outputs,
1148 saw_tool_call_candidate,
1149 accepted_tool_calls_in_cycle,
1150 provider_tool_parse_failed,
1151 rejected_tool_call_in_cycle,
1152 );
1153 if requested_write_required
1154 && write_tool_attempted_in_cycle
1155 && productive_write_tool_calls_total == 0
1156 && is_write_invalid_args_failure_kind(
1157 latest_required_tool_failure_kind,
1158 )
1159 {
1160 if required_write_retry_count + 1 < strict_write_retry_max_attempts
1161 {
1162 required_write_retry_count += 1;
1163 required_tool_retry_count += 1;
1164 followup_context = Some(build_write_required_retry_context(
1165 &offered_tool_preview,
1166 latest_required_tool_failure_kind,
1167 &text,
1168 &requested_prewrite_requirements,
1169 productive_workspace_inspection_total > 0,
1170 productive_concrete_read_total > 0,
1171 productive_web_research_total > 0,
1172 successful_web_research_total > 0,
1173 ));
1174 self.event_bus.publish(EngineEvent::new(
1175 "provider.call.iteration.finish",
1176 json!({
1177 "sessionID": session_id,
1178 "messageID": user_message_id,
1179 "iteration": iteration,
1180 "finishReason": "required_write_invalid_retry",
1181 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1182 "rejectedToolCalls": 0,
1183 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1184 }),
1185 ));
1186 continue;
1187 }
1188 }
1189 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1190 || productive_concrete_read_total > 0
1191 || productive_web_research_total > 0
1192 || successful_web_research_total > 0;
1193 if should_retry_nonproductive_required_tool_cycle(
1194 requested_write_required,
1195 write_tool_attempted_in_cycle,
1196 progress_made_in_cycle,
1197 required_tool_retry_count,
1198 ) {
1199 required_tool_retry_count += 1;
1200 followup_context =
1201 Some(build_required_tool_retry_context_for_task(
1202 &offered_tool_preview,
1203 latest_required_tool_failure_kind,
1204 &text,
1205 ));
1206 self.event_bus.publish(EngineEvent::new(
1207 "provider.call.iteration.finish",
1208 json!({
1209 "sessionID": session_id,
1210 "messageID": user_message_id,
1211 "iteration": iteration,
1212 "finishReason": "required_tool_retry",
1213 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1214 "rejectedToolCalls": 0,
1215 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1216 }),
1217 ));
1218 continue;
1219 }
1220 completion = required_tool_mode_unsatisfied_completion(
1221 latest_required_tool_failure_kind,
1222 );
1223 if !required_tool_unsatisfied_emitted {
1224 required_tool_unsatisfied_emitted = true;
1225 self.event_bus.publish(EngineEvent::new(
1226 "tool.mode.required.unsatisfied",
1227 json!({
1228 "sessionID": session_id,
1229 "messageID": user_message_id,
1230 "iteration": iteration,
1231 "selectedToolCount": allowed_tool_names.len(),
1232 "offeredToolsPreview": offered_tool_preview,
1233 "reason": latest_required_tool_failure_kind.code(),
1234 }),
1235 ));
1236 }
1237 self.event_bus.publish(EngineEvent::new(
1238 "provider.call.iteration.finish",
1239 json!({
1240 "sessionID": session_id,
1241 "messageID": user_message_id,
1242 "iteration": iteration,
1243 "finishReason": "required_tool_unsatisfied",
1244 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1245 "rejectedToolCalls": 0,
1246 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1247 }),
1248 ));
1249 break;
1250 }
1251 let prewrite_gate = evaluate_prewrite_gate(
1252 requested_write_required,
1253 &requested_prewrite_requirements,
1254 PrewriteProgress {
1255 productive_write_tool_calls_total,
1256 productive_workspace_inspection_total,
1257 productive_concrete_read_total,
1258 productive_web_research_total,
1259 successful_web_research_total,
1260 required_write_retry_count,
1261 unmet_prewrite_repair_retry_count,
1262 prewrite_gate_waived,
1263 },
1264 );
1265 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1266 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1267 if requested_write_required
1268 && productive_tool_calls_total > 0
1269 && productive_write_tool_calls_total == 0
1270 {
1271 if should_start_prewrite_repair_before_first_write(
1272 requested_prewrite_requirements.repair_on_unmet_requirements,
1273 productive_write_tool_calls_total,
1274 prewrite_satisfied,
1275 code_workflow_requested,
1276 ) {
1277 if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1278 unmet_prewrite_repair_retry_count += 1;
1279 let repair_attempt = unmet_prewrite_repair_retry_count;
1280 let repair_attempts_remaining =
1281 prewrite_repair_budget.saturating_sub(repair_attempt);
1282 followup_context = Some(build_prewrite_repair_retry_context(
1283 &offered_tool_preview,
1284 latest_required_tool_failure_kind,
1285 &text,
1286 &requested_prewrite_requirements,
1287 productive_workspace_inspection_total > 0,
1288 productive_concrete_read_total > 0,
1289 productive_web_research_total > 0,
1290 successful_web_research_total > 0,
1291 ));
1292 self.event_bus.publish(EngineEvent::new(
1293 "provider.call.iteration.finish",
1294 json!({
1295 "sessionID": session_id,
1296 "messageID": user_message_id,
1297 "iteration": iteration,
1298 "finishReason": "prewrite_repair_retry",
1299 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300 "rejectedToolCalls": 0,
1301 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302 "repair": prewrite_repair_event_payload(
1303 repair_attempt,
1304 repair_attempts_remaining,
1305 &unmet_prewrite_codes,
1306 false,
1307 ),
1308 }),
1309 ));
1310 continue;
1311 }
1312 if !prewrite_gate_waived {
1313 if prewrite_fail_closed {
1314 let repair_attempt = unmet_prewrite_repair_retry_count;
1315 let repair_attempts_remaining =
1316 prewrite_repair_budget.saturating_sub(repair_attempt);
1317 completion = prewrite_requirements_exhausted_completion(
1318 &unmet_prewrite_codes,
1319 repair_attempt,
1320 repair_attempts_remaining,
1321 );
1322 self.event_bus.publish(EngineEvent::new(
1323 "prewrite.gate.strict_mode.blocked",
1324 json!({
1325 "sessionID": session_id,
1326 "messageID": user_message_id,
1327 "iteration": iteration,
1328 "unmetCodes": unmet_prewrite_codes,
1329 }),
1330 ));
1331 self.event_bus.publish(EngineEvent::new(
1332 "provider.call.iteration.finish",
1333 json!({
1334 "sessionID": session_id,
1335 "messageID": user_message_id,
1336 "iteration": iteration,
1337 "finishReason": "prewrite_requirements_exhausted",
1338 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1339 "rejectedToolCalls": 0,
1340 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1341 "repair": prewrite_repair_event_payload(
1342 repair_attempt,
1343 repair_attempts_remaining,
1344 &unmet_prewrite_codes,
1345 true,
1346 ),
1347 }),
1348 ));
1349 break;
1350 }
1351 prewrite_gate_waived = true;
1352 let repair_attempt = unmet_prewrite_repair_retry_count;
1353 let repair_attempts_remaining =
1354 prewrite_repair_budget.saturating_sub(repair_attempt);
1355 followup_context = Some(build_prewrite_waived_write_context(
1356 &text,
1357 &unmet_prewrite_codes,
1358 ));
1359 self.event_bus.publish(EngineEvent::new(
1360 "prewrite.gate.waived.write_executed",
1361 json!({
1362 "sessionID": session_id,
1363 "messageID": user_message_id,
1364 "unmetCodes": unmet_prewrite_codes,
1365 }),
1366 ));
1367 self.event_bus.publish(EngineEvent::new(
1368 "provider.call.iteration.finish",
1369 json!({
1370 "sessionID": session_id,
1371 "messageID": user_message_id,
1372 "iteration": iteration,
1373 "finishReason": "prewrite_gate_waived",
1374 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1375 "rejectedToolCalls": 0,
1376 "prewriteGateWaived": true,
1377 "repair": prewrite_repair_event_payload(
1378 repair_attempt,
1379 repair_attempts_remaining,
1380 &unmet_prewrite_codes,
1381 true,
1382 ),
1383 }),
1384 ));
1385 continue;
1386 }
1387 }
1388 latest_required_tool_failure_kind =
1389 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1390 if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1391 required_write_retry_count += 1;
1392 followup_context = Some(build_write_required_retry_context(
1393 &offered_tool_preview,
1394 latest_required_tool_failure_kind,
1395 &text,
1396 &requested_prewrite_requirements,
1397 productive_workspace_inspection_total > 0,
1398 productive_concrete_read_total > 0,
1399 productive_web_research_total > 0,
1400 successful_web_research_total > 0,
1401 ));
1402 self.event_bus.publish(EngineEvent::new(
1403 "provider.call.iteration.finish",
1404 json!({
1405 "sessionID": session_id,
1406 "messageID": user_message_id,
1407 "iteration": iteration,
1408 "finishReason": "required_write_retry",
1409 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1410 "rejectedToolCalls": 0,
1411 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1412 }),
1413 ));
1414 continue;
1415 }
1416 completion = required_tool_mode_unsatisfied_completion(
1417 latest_required_tool_failure_kind,
1418 );
1419 if !required_tool_unsatisfied_emitted {
1420 required_tool_unsatisfied_emitted = true;
1421 self.event_bus.publish(EngineEvent::new(
1422 "tool.mode.required.unsatisfied",
1423 json!({
1424 "sessionID": session_id,
1425 "messageID": user_message_id,
1426 "iteration": iteration,
1427 "selectedToolCount": allowed_tool_names.len(),
1428 "offeredToolsPreview": offered_tool_preview,
1429 "reason": latest_required_tool_failure_kind.code(),
1430 }),
1431 ));
1432 }
1433 self.event_bus.publish(EngineEvent::new(
1434 "provider.call.iteration.finish",
1435 json!({
1436 "sessionID": session_id,
1437 "messageID": user_message_id,
1438 "iteration": iteration,
1439 "finishReason": "required_write_unsatisfied",
1440 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1441 "rejectedToolCalls": 0,
1442 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1443 }),
1444 ));
1445 break;
1446 }
1447 if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1448 if let Some(retry_context) =
1449 build_invalid_tool_args_retry_context_from_outputs(
1450 &outputs,
1451 invalid_tool_args_retry_count,
1452 )
1453 {
1454 invalid_tool_args_retry_count += 1;
1455 followup_context = Some(format!(
1456 "Previous tool call arguments were invalid. {}",
1457 retry_context
1458 ));
1459 self.event_bus.publish(EngineEvent::new(
1460 "provider.call.iteration.finish",
1461 json!({
1462 "sessionID": session_id,
1463 "messageID": user_message_id,
1464 "iteration": iteration,
1465 "finishReason": "invalid_tool_args_retry",
1466 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1467 "rejectedToolCalls": 0,
1468 }),
1469 ));
1470 continue;
1471 }
1472 }
1473 let guard_budget_hit =
1474 outputs.iter().any(|o| is_guard_budget_tool_output(o));
1475 if executed_productive_tool {
1476 let prewrite_gate = evaluate_prewrite_gate(
1477 requested_write_required,
1478 &requested_prewrite_requirements,
1479 PrewriteProgress {
1480 productive_write_tool_calls_total,
1481 productive_workspace_inspection_total,
1482 productive_concrete_read_total,
1483 productive_web_research_total,
1484 successful_web_research_total,
1485 required_write_retry_count,
1486 unmet_prewrite_repair_retry_count,
1487 prewrite_gate_waived,
1488 },
1489 );
1490 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1491 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1492 if requested_write_required
1493 && productive_write_tool_calls_total > 0
1494 && requested_prewrite_requirements.repair_on_unmet_requirements
1495 && unmet_prewrite_repair_retry_count < prewrite_repair_budget
1496 && !prewrite_satisfied
1497 {
1498 unmet_prewrite_repair_retry_count += 1;
1499 let repair_attempt = unmet_prewrite_repair_retry_count;
1500 let repair_attempts_remaining =
1501 prewrite_repair_budget.saturating_sub(repair_attempt);
1502 followup_context = Some(build_prewrite_repair_retry_context(
1503 &offered_tool_preview,
1504 latest_required_tool_failure_kind,
1505 &text,
1506 &requested_prewrite_requirements,
1507 productive_workspace_inspection_total > 0,
1508 productive_concrete_read_total > 0,
1509 productive_web_research_total > 0,
1510 successful_web_research_total > 0,
1511 ));
1512 self.event_bus.publish(EngineEvent::new(
1513 "provider.call.iteration.finish",
1514 json!({
1515 "sessionID": session_id,
1516 "messageID": user_message_id,
1517 "iteration": iteration,
1518 "finishReason": "prewrite_repair_retry",
1519 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1520 "rejectedToolCalls": 0,
1521 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1522 "repair": prewrite_repair_event_payload(
1523 repair_attempt,
1524 repair_attempts_remaining,
1525 &unmet_prewrite_codes,
1526 false,
1527 ),
1528 }),
1529 ));
1530 continue;
1531 }
1532 if requested_write_required
1533 && productive_write_tool_calls_total > 0
1534 && requested_prewrite_requirements.repair_on_unmet_requirements
1535 && !prewrite_satisfied
1536 && prewrite_fail_closed
1537 {
1538 let repair_attempt = unmet_prewrite_repair_retry_count;
1539 let repair_attempts_remaining =
1540 prewrite_repair_budget.saturating_sub(repair_attempt);
1541 completion = prewrite_requirements_exhausted_completion(
1542 &unmet_prewrite_codes,
1543 repair_attempt,
1544 repair_attempts_remaining,
1545 );
1546 self.event_bus.publish(EngineEvent::new(
1547 "prewrite.gate.strict_mode.blocked",
1548 json!({
1549 "sessionID": session_id,
1550 "messageID": user_message_id,
1551 "iteration": iteration,
1552 "unmetCodes": unmet_prewrite_codes,
1553 }),
1554 ));
1555 self.event_bus.publish(EngineEvent::new(
1556 "provider.call.iteration.finish",
1557 json!({
1558 "sessionID": session_id,
1559 "messageID": user_message_id,
1560 "iteration": iteration,
1561 "finishReason": "prewrite_requirements_exhausted",
1562 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1563 "rejectedToolCalls": 0,
1564 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1565 "repair": prewrite_repair_event_payload(
1566 repair_attempt,
1567 repair_attempts_remaining,
1568 &unmet_prewrite_codes,
1569 true,
1570 ),
1571 }),
1572 ));
1573 break;
1574 }
1575 followup_context = Some(format!(
1576 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1577 summarize_tool_outputs(&outputs)
1578 ));
1579 self.event_bus.publish(EngineEvent::new(
1580 "provider.call.iteration.finish",
1581 json!({
1582 "sessionID": session_id,
1583 "messageID": user_message_id,
1584 "iteration": iteration,
1585 "finishReason": "tool_followup",
1586 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1587 "rejectedToolCalls": 0,
1588 }),
1589 ));
1590 continue;
1591 }
1592 if guard_budget_hit {
1593 completion = summarize_guard_budget_outputs(&outputs)
1594 .unwrap_or_else(|| {
1595 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1596 });
1597 } else if duplicate_signature_hit_in_cycle {
1598 completion = summarize_duplicate_signature_outputs(&outputs)
1599 .unwrap_or_else(|| {
1600 "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1601 });
1602 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1603 completion = summary;
1604 } else {
1605 completion.clear();
1606 }
1607 self.event_bus.publish(EngineEvent::new(
1608 "provider.call.iteration.finish",
1609 json!({
1610 "sessionID": session_id,
1611 "messageID": user_message_id,
1612 "iteration": iteration,
1613 "finishReason": "tool_summary",
1614 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1615 "rejectedToolCalls": 0,
1616 }),
1617 ));
1618 break;
1619 } else if matches!(requested_tool_mode, ToolMode::Required) {
1620 latest_required_tool_failure_kind = classify_required_tool_failure(
1621 &outputs,
1622 saw_tool_call_candidate,
1623 accepted_tool_calls_in_cycle,
1624 provider_tool_parse_failed,
1625 rejected_tool_call_in_cycle,
1626 );
1627 }
1628 }
1629
1630 {
1631 let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1632 if let Some(usage) = provider_usage {
1633 (
1634 usage.prompt_tokens,
1635 usage.completion_tokens,
1636 usage.total_tokens,
1637 "provider",
1638 )
1639 } else {
1640 let est_prompt = (estimated_prompt_chars / 4) as u64;
1644 let est_completion = (completion.len() / 4) as u64;
1645 tracing::debug!(
1646 session_id = %session_id,
1647 provider_id = %provider_id,
1648 "provider.usage missing from stream; using char-count estimate \
1649 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1650 completion.len()
1651 );
1652 (
1653 est_prompt,
1654 est_completion,
1655 est_prompt.saturating_add(est_completion),
1656 "estimated",
1657 )
1658 };
1659 self.event_bus.publish(EngineEvent::new(
1660 "provider.usage",
1661 json!({
1662 "sessionID": session_id,
1663 "correlationID": correlation_ref,
1664 "messageID": user_message_id,
1665 "promptTokens": prompt_tokens,
1666 "completionTokens": completion_tokens,
1667 "totalTokens": total_tokens,
1668 "usageSource": usage_source,
1669 }),
1670 ));
1671 }
1672
1673 if matches!(requested_tool_mode, ToolMode::Required)
1674 && productive_tool_calls_total == 0
1675 {
1676 if requested_write_required
1677 && required_write_retry_count > 0
1678 && productive_write_tool_calls_total == 0
1679 && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1680 {
1681 latest_required_tool_failure_kind =
1682 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1683 }
1684 if requested_write_required
1685 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1686 {
1687 required_write_retry_count += 1;
1688 followup_context = Some(build_write_required_retry_context(
1689 &offered_tool_preview,
1690 latest_required_tool_failure_kind,
1691 &text,
1692 &requested_prewrite_requirements,
1693 productive_workspace_inspection_total > 0,
1694 productive_concrete_read_total > 0,
1695 productive_web_research_total > 0,
1696 successful_web_research_total > 0,
1697 ));
1698 continue;
1699 }
1700 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1701 || productive_concrete_read_total > 0
1702 || productive_web_research_total > 0
1703 || successful_web_research_total > 0;
1704 if should_retry_nonproductive_required_tool_cycle(
1705 requested_write_required,
1706 false,
1707 progress_made_in_cycle,
1708 required_tool_retry_count,
1709 ) {
1710 required_tool_retry_count += 1;
1711 followup_context = Some(build_required_tool_retry_context_for_task(
1712 &offered_tool_preview,
1713 latest_required_tool_failure_kind,
1714 &text,
1715 ));
1716 continue;
1717 }
1718 completion = required_tool_mode_unsatisfied_completion(
1719 latest_required_tool_failure_kind,
1720 );
1721 if !required_tool_unsatisfied_emitted {
1722 required_tool_unsatisfied_emitted = true;
1723 self.event_bus.publish(EngineEvent::new(
1724 "tool.mode.required.unsatisfied",
1725 json!({
1726 "sessionID": session_id,
1727 "messageID": user_message_id,
1728 "iteration": iteration,
1729 "selectedToolCount": allowed_tool_names.len(),
1730 "offeredToolsPreview": offered_tool_preview,
1731 "reason": latest_required_tool_failure_kind.code(),
1732 }),
1733 ));
1734 }
1735 self.event_bus.publish(EngineEvent::new(
1736 "provider.call.iteration.finish",
1737 json!({
1738 "sessionID": session_id,
1739 "messageID": user_message_id,
1740 "iteration": iteration,
1741 "finishReason": "required_tool_unsatisfied",
1742 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1743 "rejectedToolCalls": 0,
1744 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1745 }),
1746 ));
1747 } else {
1748 if completion.trim().is_empty()
1749 && !last_tool_outputs.is_empty()
1750 && requested_write_required
1751 && empty_completion_retry_count == 0
1752 {
1753 empty_completion_retry_count += 1;
1754 followup_context = Some(build_empty_completion_retry_context(
1755 &offered_tool_preview,
1756 &text,
1757 &requested_prewrite_requirements,
1758 productive_workspace_inspection_total > 0,
1759 productive_concrete_read_total > 0,
1760 productive_web_research_total > 0,
1761 successful_web_research_total > 0,
1762 ));
1763 self.event_bus.publish(EngineEvent::new(
1764 "provider.call.iteration.finish",
1765 json!({
1766 "sessionID": session_id,
1767 "messageID": user_message_id,
1768 "iteration": iteration,
1769 "finishReason": "empty_completion_retry",
1770 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1771 "rejectedToolCalls": 0,
1772 }),
1773 ));
1774 continue;
1775 }
1776 let prewrite_gate = evaluate_prewrite_gate(
1777 requested_write_required,
1778 &requested_prewrite_requirements,
1779 PrewriteProgress {
1780 productive_write_tool_calls_total,
1781 productive_workspace_inspection_total,
1782 productive_concrete_read_total,
1783 productive_web_research_total,
1784 successful_web_research_total,
1785 required_write_retry_count,
1786 unmet_prewrite_repair_retry_count,
1787 prewrite_gate_waived,
1788 },
1789 );
1790 if should_start_prewrite_repair_before_first_write(
1791 requested_prewrite_requirements.repair_on_unmet_requirements,
1792 productive_write_tool_calls_total,
1793 prewrite_gate.prewrite_satisfied,
1794 code_workflow_requested,
1795 ) && !prewrite_gate_waived
1796 {
1797 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1798 if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1799 unmet_prewrite_repair_retry_count += 1;
1800 let repair_attempt = unmet_prewrite_repair_retry_count;
1801 let repair_attempts_remaining =
1802 prewrite_repair_budget.saturating_sub(repair_attempt);
1803 followup_context = Some(build_prewrite_repair_retry_context(
1804 &offered_tool_preview,
1805 latest_required_tool_failure_kind,
1806 &text,
1807 &requested_prewrite_requirements,
1808 productive_workspace_inspection_total > 0,
1809 productive_concrete_read_total > 0,
1810 productive_web_research_total > 0,
1811 successful_web_research_total > 0,
1812 ));
1813 self.event_bus.publish(EngineEvent::new(
1814 "provider.call.iteration.finish",
1815 json!({
1816 "sessionID": session_id,
1817 "messageID": user_message_id,
1818 "iteration": iteration,
1819 "finishReason": "prewrite_repair_retry",
1820 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1821 "rejectedToolCalls": 0,
1822 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1823 "repair": prewrite_repair_event_payload(
1824 repair_attempt,
1825 repair_attempts_remaining,
1826 &unmet_prewrite_codes,
1827 false,
1828 ),
1829 }),
1830 ));
1831 continue;
1832 }
1833 if prewrite_fail_closed {
1834 let repair_attempt = unmet_prewrite_repair_retry_count;
1835 let repair_attempts_remaining =
1836 prewrite_repair_budget.saturating_sub(repair_attempt);
1837 completion = prewrite_requirements_exhausted_completion(
1838 &unmet_prewrite_codes,
1839 repair_attempt,
1840 repair_attempts_remaining,
1841 );
1842 self.event_bus.publish(EngineEvent::new(
1843 "prewrite.gate.strict_mode.blocked",
1844 json!({
1845 "sessionID": session_id,
1846 "messageID": user_message_id,
1847 "iteration": iteration,
1848 "unmetCodes": unmet_prewrite_codes,
1849 }),
1850 ));
1851 self.event_bus.publish(EngineEvent::new(
1852 "provider.call.iteration.finish",
1853 json!({
1854 "sessionID": session_id,
1855 "messageID": user_message_id,
1856 "iteration": iteration,
1857 "finishReason": "prewrite_requirements_exhausted",
1858 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1859 "rejectedToolCalls": 0,
1860 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1861 "repair": prewrite_repair_event_payload(
1862 repair_attempt,
1863 repair_attempts_remaining,
1864 &unmet_prewrite_codes,
1865 true,
1866 ),
1867 }),
1868 ));
1869 break;
1870 }
1871 prewrite_gate_waived = true;
1872 let repair_attempt = unmet_prewrite_repair_retry_count;
1873 let repair_attempts_remaining =
1874 prewrite_repair_budget.saturating_sub(repair_attempt);
1875 followup_context = Some(build_prewrite_waived_write_context(
1876 &text,
1877 &unmet_prewrite_codes,
1878 ));
1879 self.event_bus.publish(EngineEvent::new(
1880 "prewrite.gate.waived.write_executed",
1881 json!({
1882 "sessionID": session_id,
1883 "messageID": user_message_id,
1884 "unmetCodes": unmet_prewrite_codes,
1885 }),
1886 ));
1887 self.event_bus.publish(EngineEvent::new(
1888 "provider.call.iteration.finish",
1889 json!({
1890 "sessionID": session_id,
1891 "messageID": user_message_id,
1892 "iteration": iteration,
1893 "finishReason": "prewrite_gate_waived",
1894 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1895 "rejectedToolCalls": 0,
1896 "prewriteGateWaived": true,
1897 "repair": prewrite_repair_event_payload(
1898 repair_attempt,
1899 repair_attempts_remaining,
1900 &unmet_prewrite_codes,
1901 true,
1902 ),
1903 }),
1904 ));
1905 continue;
1906 }
1907 if prewrite_gate_waived
1908 && requested_write_required
1909 && productive_write_tool_calls_total == 0
1910 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1911 {
1912 required_write_retry_count += 1;
1913 followup_context = Some(build_write_required_retry_context(
1914 &offered_tool_preview,
1915 latest_required_tool_failure_kind,
1916 &text,
1917 &requested_prewrite_requirements,
1918 productive_workspace_inspection_total > 0,
1919 productive_concrete_read_total > 0,
1920 productive_web_research_total > 0,
1921 successful_web_research_total > 0,
1922 ));
1923 self.event_bus.publish(EngineEvent::new(
1924 "provider.call.iteration.finish",
1925 json!({
1926 "sessionID": session_id,
1927 "messageID": user_message_id,
1928 "iteration": iteration,
1929 "finishReason": "waived_write_retry",
1930 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1931 "rejectedToolCalls": 0,
1932 }),
1933 ));
1934 continue;
1935 }
1936 self.event_bus.publish(EngineEvent::new(
1937 "provider.call.iteration.finish",
1938 json!({
1939 "sessionID": session_id,
1940 "messageID": user_message_id,
1941 "iteration": iteration,
1942 "finishReason": "provider_completion",
1943 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1944 "rejectedToolCalls": 0,
1945 }),
1946 ));
1947 }
1948 break;
1949 }
1950 if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
1951 {
1952 completion =
1953 required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
1954 if !required_tool_unsatisfied_emitted {
1955 self.event_bus.publish(EngineEvent::new(
1956 "tool.mode.required.unsatisfied",
1957 json!({
1958 "sessionID": session_id,
1959 "messageID": user_message_id,
1960 "selectedToolCount": tool_call_counts.len(),
1961 "reason": latest_required_tool_failure_kind.code(),
1962 }),
1963 ));
1964 }
1965 }
1966 if completion.trim().is_empty()
1967 && !last_tool_outputs.is_empty()
1968 && requested_write_required
1969 && productive_write_tool_calls_total > 0
1970 {
1971 let final_prewrite_satisfied = evaluate_prewrite_gate(
1972 requested_write_required,
1973 &requested_prewrite_requirements,
1974 PrewriteProgress {
1975 productive_write_tool_calls_total,
1976 productive_workspace_inspection_total,
1977 productive_concrete_read_total,
1978 productive_web_research_total,
1979 successful_web_research_total,
1980 required_write_retry_count,
1981 unmet_prewrite_repair_retry_count,
1982 prewrite_gate_waived,
1983 },
1984 )
1985 .prewrite_satisfied;
1986 if prewrite_fail_closed && !final_prewrite_satisfied {
1987 let unmet_prewrite_codes = evaluate_prewrite_gate(
1988 requested_write_required,
1989 &requested_prewrite_requirements,
1990 PrewriteProgress {
1991 productive_write_tool_calls_total,
1992 productive_workspace_inspection_total,
1993 productive_concrete_read_total,
1994 productive_web_research_total,
1995 successful_web_research_total,
1996 required_write_retry_count,
1997 unmet_prewrite_repair_retry_count,
1998 prewrite_gate_waived,
1999 },
2000 )
2001 .unmet_codes;
2002 completion = prewrite_requirements_exhausted_completion(
2003 &unmet_prewrite_codes,
2004 unmet_prewrite_repair_retry_count,
2005 prewrite_repair_budget.saturating_sub(unmet_prewrite_repair_retry_count),
2006 );
2007 } else {
2008 completion = synthesize_artifact_write_completion_from_tool_state(
2009 &text,
2010 final_prewrite_satisfied,
2011 prewrite_gate_waived,
2012 );
2013 }
2014 }
2015 if completion.trim().is_empty()
2016 && !last_tool_outputs.is_empty()
2017 && should_generate_post_tool_final_narrative(
2018 requested_tool_mode,
2019 productive_tool_calls_total,
2020 )
2021 {
2022 if let Some(narrative) = self
2023 .generate_final_narrative_without_tools(
2024 &session_id,
2025 &active_agent,
2026 Some(provider_id.as_str()),
2027 Some(model_id_value.as_str()),
2028 cancel.clone(),
2029 &last_tool_outputs,
2030 )
2031 .await
2032 {
2033 completion = narrative;
2034 }
2035 }
2036 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
2037 if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
2038 completion = summary;
2039 } else if let Some(hint) =
2040 summarize_terminal_tool_failure_for_user(&last_tool_outputs)
2041 {
2042 completion = hint;
2043 } else {
2044 let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
2045 if preview.trim().is_empty() {
2046 completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
2047 } else {
2048 completion = format!(
2049 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
2050 preview
2051 );
2052 }
2053 }
2054 }
2055 if completion.trim().is_empty() {
2056 completion =
2057 "I couldn't produce a final response for that run. Please retry your request."
2058 .to_string();
2059 }
2060 if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
2069 let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
2070 .to_string();
2071 if let Some(note) = latest_email_action_note.as_ref() {
2072 fallback.push_str("\n\nLast email tool status: ");
2073 fallback.push_str(note);
2074 }
2075 fallback.push_str(
2076 "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
2077 );
2078 completion = fallback;
2079 }
2080 completion = strip_model_control_markers(&completion);
2081 truncate_text(&completion, 16_000)
2082 };
2083 emit_event(
2084 Level::INFO,
2085 ProcessKind::Engine,
2086 ObservabilityEvent {
2087 event: "provider.call.finish",
2088 component: "engine.loop",
2089 correlation_id: correlation_ref,
2090 session_id: Some(&session_id),
2091 run_id: None,
2092 message_id: Some(&user_message_id),
2093 provider_id: Some(provider_id.as_str()),
2094 model_id,
2095 status: Some("ok"),
2096 error_code: None,
2097 detail: Some("provider stream complete"),
2098 },
2099 );
2100 if active_agent.name.eq_ignore_ascii_case("plan") {
2101 emit_plan_todo_fallback(
2102 self.storage.clone(),
2103 &self.event_bus,
2104 &session_id,
2105 &user_message_id,
2106 &completion,
2107 )
2108 .await;
2109 let todos_after_fallback = self.storage.get_todos(&session_id).await;
2110 if todos_after_fallback.is_empty() && !question_tool_used {
2111 emit_plan_question_fallback(
2112 self.storage.clone(),
2113 &self.event_bus,
2114 &session_id,
2115 &user_message_id,
2116 &completion,
2117 )
2118 .await;
2119 }
2120 }
2121 if cancel.is_cancelled() {
2122 self.event_bus.publish(EngineEvent::new(
2123 "session.status",
2124 json!({"sessionID": session_id, "status":"cancelled"}),
2125 ));
2126 self.cancellations.remove(&session_id).await;
2127 return Ok(());
2128 }
2129 let assistant = Message::new(
2130 MessageRole::Assistant,
2131 vec![MessagePart::Text {
2132 text: completion.clone(),
2133 }],
2134 );
2135 let assistant_message_id = assistant.id.clone();
2136 self.storage.append_message(&session_id, assistant).await?;
2137 let final_part = WireMessagePart::text(
2138 &session_id,
2139 &assistant_message_id,
2140 truncate_text(&completion, 16_000),
2141 );
2142 self.event_bus.publish(EngineEvent::new(
2143 "message.part.updated",
2144 json!({"part": final_part}),
2145 ));
2146 self.event_bus.publish(EngineEvent::new(
2147 "session.updated",
2148 json!({"sessionID": session_id, "status":"idle"}),
2149 ));
2150 self.event_bus.publish(EngineEvent::new(
2151 "session.status",
2152 json!({"sessionID": session_id, "status":"idle"}),
2153 ));
2154 self.cancellations.remove(&session_id).await;
2155 Ok(())
2156 }
2157}