Skip to main content

tandem_core/engine_loop/
prompt_execution.rs

1use super::*;
2
3impl EngineLoop {
4    pub async fn run_prompt_async_with_context(
5        &self,
6        session_id: String,
7        req: SendMessageRequest,
8        correlation_id: Option<String>,
9    ) -> anyhow::Result<()> {
10        let session_model = self
11            .storage
12            .get_session(&session_id)
13            .await
14            .and_then(|s| s.model);
15        let (provider_id, model_id_value) =
16            resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17                anyhow::anyhow!(
18                "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19            )
20            })?;
21        let correlation_ref = correlation_id.as_deref();
22        let model_id = Some(model_id_value.as_str());
23        let cancel = self.cancellations.create(&session_id).await;
24        emit_event(
25            Level::INFO,
26            ProcessKind::Engine,
27            ObservabilityEvent {
28                event: "provider.call.start",
29                component: "engine.loop",
30                correlation_id: correlation_ref,
31                session_id: Some(&session_id),
32                run_id: None,
33                message_id: None,
34                provider_id: Some(provider_id.as_str()),
35                model_id,
36                status: Some("start"),
37                error_code: None,
38                detail: Some("run_prompt_async dispatch"),
39            },
40        );
41        self.event_bus.publish(EngineEvent::new(
42            "session.status",
43            json!({"sessionID": session_id, "status":"running"}),
44        ));
45        let request_parts = req.parts.clone();
46        let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47        let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48        let requested_write_required = req.write_required.unwrap_or(false);
49        let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50        let prewrite_repair_budget = prewrite_repair_retry_budget(&requested_prewrite_requirements);
51        let prewrite_fail_closed = prewrite_gate_strict_mode(&requested_prewrite_requirements);
52        let request_tool_allowlist = req
53            .tool_allowlist
54            .clone()
55            .unwrap_or_default()
56            .into_iter()
57            .map(|tool| normalize_tool_name(&tool))
58            .filter(|tool| !tool.trim().is_empty())
59            .collect::<HashSet<_>>();
60        let required_mcp_tools_before_write =
61            concrete_mcp_tools_required_before_write(&request_tool_allowlist);
62        // Propagate per-request tool allowlist to session-level enforcement so
63        // that execution-time checks (and mcp_list scoping) also respect it.
64        if !request_tool_allowlist.is_empty() {
65            self.set_session_allowed_tools(
66                &session_id,
67                request_tool_allowlist.iter().cloned().collect(),
68            )
69            .await;
70        }
71        let text = req
72            .parts
73            .iter()
74            .map(|p| match p {
75                MessagePartInput::Text { text } => text.clone(),
76                MessagePartInput::File {
77                    mime,
78                    filename,
79                    url,
80                } => format!(
81                    "[file mime={} name={} url={}]",
82                    mime,
83                    filename.clone().unwrap_or_else(|| "unknown".to_string()),
84                    url
85                ),
86            })
87            .collect::<Vec<_>>()
88            .join("\n");
89        let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
90        self.auto_rename_session_from_user_text(&session_id, &text)
91            .await;
92        let active_agent = self.agents.get(req.agent.as_deref()).await;
93        let mut user_message_id = self
94            .find_recent_matching_user_message_id(&session_id, &text)
95            .await;
96        if user_message_id.is_none() {
97            let user_message = Message::new(
98                MessageRole::User,
99                vec![MessagePart::Text { text: text.clone() }],
100            );
101            let created_message_id = user_message.id.clone();
102            self.storage
103                .append_message(&session_id, user_message)
104                .await?;
105
106            let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
107            self.event_bus.publish(EngineEvent::new(
108                "message.part.updated",
109                json!({
110                    "part": user_part,
111                    "delta": text,
112                    "agent": active_agent.name
113                }),
114            ));
115            user_message_id = Some(created_message_id);
116        }
117        let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
118
119        if cancel.is_cancelled() {
120            self.event_bus.publish(EngineEvent::new(
121                "session.status",
122                json!({"sessionID": session_id, "status":"cancelled"}),
123            ));
124            self.cancellations.remove(&session_id).await;
125            return Ok(());
126        }
127
128        let mut question_tool_used = false;
129        let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
130            if normalize_tool_name(&tool) == "question" {
131                question_tool_used = true;
132            }
133            if !agent_can_use_tool(&active_agent, &tool) {
134                format!(
135                    "Tool `{tool}` is not enabled for agent `{}`.",
136                    active_agent.name
137                )
138            } else {
139                match self
140                    .execute_tool_with_permission(
141                        &session_id,
142                        &user_message_id,
143                        tool.clone(),
144                        args,
145                        None,
146                        active_agent.skills.as_deref(),
147                        &text,
148                        requested_write_required,
149                        None,
150                        cancel.clone(),
151                    )
152                    .await
153                {
154                    Ok(output) => output.unwrap_or_default(),
155                    Err(err) => {
156                        self.mark_session_run_failed(&session_id, &err.to_string())
157                            .await;
158                        return Err(err);
159                    }
160                }
161            }
162        } else {
163            let mut completion = String::new();
164            let mut max_iterations = max_tool_iterations();
165            let mut followup_context: Option<String> = None;
166            let mut last_tool_outputs: Vec<String> = Vec::new();
167            let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
168            let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
169            let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
170            let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
171            let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
172            let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
173            let mut websearch_query_blocked = false;
174            let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
175            let mut pack_builder_executed = false;
176            let mut auto_workspace_probe_attempted = false;
177            let mut productive_tool_calls_total = 0usize;
178            let mut productive_write_tool_calls_total = 0usize;
179            let mut productive_workspace_inspection_total = 0usize;
180            let mut productive_web_research_total = 0usize;
181            let mut productive_concrete_read_total = 0usize;
182            let mut successful_web_research_total = 0usize;
183            let mut required_tool_retry_count = 0usize;
184            let mut required_write_retry_count = 0usize;
185            let mut unmet_prewrite_repair_retry_count = 0usize;
186            let mut empty_completion_retry_count = 0usize;
187            let mut prewrite_gate_waived = false;
188            let mut invalid_tool_args_retry_count = 0usize;
189            let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
190            let mut required_tool_unsatisfied_emitted = false;
191            let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
192            let email_delivery_requested = requires_email_delivery_prompt(&text);
193            let web_research_requested = requires_web_research_prompt(&text);
194            let code_workflow_requested = infer_code_workflow_from_text(&text);
195            let mut email_action_executed = false;
196            let mut latest_email_action_note: Option<String> = None;
197            let mut email_tools_ever_offered = false;
198            let intent = classify_intent(&text);
199            let router_enabled = tool_router_enabled();
200            let retrieval_enabled = semantic_tool_retrieval_enabled();
201            let retrieval_k = semantic_tool_retrieval_k();
202            let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
203                self.tools.mcp_server_names().await
204            } else {
205                Vec::new()
206            };
207            let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
208            let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
209                && runtime_attachments.is_empty()
210                && is_short_simple_prompt(&text)
211                && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
212
213            while max_iterations > 0 && !cancel.is_cancelled() {
214                let iteration = 26usize.saturating_sub(max_iterations);
215                max_iterations -= 1;
216                let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
217                    ChatHistoryProfile::Full
218                } else if matches!(requested_context_mode, ContextMode::Compact)
219                    || context_is_auto_compact
220                {
221                    ChatHistoryProfile::Compact
222                } else {
223                    ChatHistoryProfile::Standard
224                };
225                let mut messages =
226                    load_chat_history(self.storage.clone(), &session_id, context_profile).await;
227                if iteration == 1 && !runtime_attachments.is_empty() {
228                    attach_to_last_user_message(&mut messages, &runtime_attachments);
229                }
230                let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
231                self.event_bus.publish(EngineEvent::new(
232                    "context.profile.selected",
233                    json!({
234                        "sessionID": session_id,
235                        "messageID": user_message_id,
236                        "iteration": iteration,
237                        "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
238                        "historyMessageCount": messages.len(),
239                        "historyCharCount": history_char_count,
240                        "memoryInjected": false
241                    }),
242                ));
243                let mut system_parts = vec![tandem_runtime_system_prompt(
244                    &self.host_runtime_context,
245                    &mcp_server_names,
246                )];
247                if let Some(system) = active_agent.system_prompt.as_ref() {
248                    system_parts.push(system.clone());
249                }
250                messages.insert(
251                    0,
252                    ChatMessage {
253                        role: "system".to_string(),
254                        content: system_parts.join("\n\n"),
255                        attachments: Vec::new(),
256                    },
257                );
258                if let Some(extra) = followup_context.take() {
259                    messages.push(ChatMessage {
260                        role: "user".to_string(),
261                        content: extra,
262                        attachments: Vec::new(),
263                    });
264                }
265                if let Some(hook) = self.prompt_context_hook.read().await.clone() {
266                    let ctx = PromptContextHookContext {
267                        session_id: session_id.clone(),
268                        message_id: user_message_id.clone(),
269                        provider_id: provider_id.clone(),
270                        model_id: model_id_value.clone(),
271                        iteration,
272                    };
273                    let hook_timeout =
274                        Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
275                    match tokio::time::timeout(
276                        hook_timeout,
277                        hook.augment_provider_messages(ctx, messages.clone()),
278                    )
279                    .await
280                    {
281                        Ok(Ok(augmented)) => {
282                            messages = augmented;
283                        }
284                        Ok(Err(err)) => {
285                            self.event_bus.publish(EngineEvent::new(
286                                "memory.context.error",
287                                json!({
288                                    "sessionID": session_id,
289                                    "messageID": user_message_id,
290                                    "iteration": iteration,
291                                    "error": truncate_text(&err.to_string(), 500),
292                                }),
293                            ));
294                        }
295                        Err(_) => {
296                            self.event_bus.publish(EngineEvent::new(
297                                "memory.context.error",
298                                json!({
299                                    "sessionID": session_id,
300                                    "messageID": user_message_id,
301                                    "iteration": iteration,
302                                    "error": format!(
303                                        "prompt context hook timeout after {} ms",
304                                        hook_timeout.as_millis()
305                                    ),
306                                }),
307                            ));
308                        }
309                    }
310                }
311                let all_tools = self.tools.list().await;
312                let mut retrieval_fallback_reason: Option<&'static str> = None;
313                let mut candidate_tools = if retrieval_enabled {
314                    self.tools.retrieve(&text, retrieval_k).await
315                } else {
316                    all_tools.clone()
317                };
318                if retrieval_enabled {
319                    if candidate_tools.is_empty() && !all_tools.is_empty() {
320                        candidate_tools = all_tools.clone();
321                        retrieval_fallback_reason = Some("retrieval_empty_result");
322                    } else if web_research_requested
323                        && has_web_research_tools(&all_tools)
324                        && !has_web_research_tools(&candidate_tools)
325                        && required_write_retry_count == 0
326                    {
327                        candidate_tools = all_tools.clone();
328                        retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
329                    } else if email_delivery_requested
330                        && has_email_action_tools(&all_tools)
331                        && !has_email_action_tools(&candidate_tools)
332                    {
333                        candidate_tools = all_tools.clone();
334                        retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
335                    }
336                }
337                let mut tool_schemas = if !router_enabled {
338                    candidate_tools
339                } else {
340                    match requested_tool_mode {
341                        ToolMode::None => Vec::new(),
342                        ToolMode::Required => select_tool_subset(
343                            candidate_tools,
344                            intent,
345                            &request_tool_allowlist,
346                            iteration > 1,
347                        ),
348                        ToolMode::Auto => {
349                            if !auto_tools_escalated {
350                                Vec::new()
351                            } else {
352                                select_tool_subset(
353                                    candidate_tools,
354                                    intent,
355                                    &request_tool_allowlist,
356                                    iteration > 1,
357                                )
358                            }
359                        }
360                    }
361                };
362                let mut policy_patterns =
363                    request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
364                if let Some(agent_tools) = active_agent.tools.as_ref() {
365                    policy_patterns
366                        .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
367                }
368                let session_allowed_tools = self
369                    .session_allowed_tools
370                    .read()
371                    .await
372                    .get(&session_id)
373                    .cloned()
374                    .unwrap_or_default();
375                policy_patterns.extend(session_allowed_tools.iter().cloned());
376                if !policy_patterns.is_empty() {
377                    let mut included = tool_schemas
378                        .iter()
379                        .map(|schema| normalize_tool_name(&schema.name))
380                        .collect::<HashSet<_>>();
381                    for schema in &all_tools {
382                        let normalized = normalize_tool_name(&schema.name);
383                        if policy_patterns
384                            .iter()
385                            .any(|pattern| tool_name_matches_policy(pattern, &normalized))
386                            && included.insert(normalized)
387                        {
388                            tool_schemas.push(schema.clone());
389                        }
390                    }
391                }
392                if !request_tool_allowlist.is_empty() {
393                    tool_schemas.retain(|schema| {
394                        let tool = normalize_tool_name(&schema.name);
395                        request_tool_allowlist
396                            .iter()
397                            .any(|pattern| tool_name_matches_policy(pattern, &tool))
398                    });
399                }
400                let prewrite_gate = evaluate_prewrite_gate(
401                    requested_write_required,
402                    &requested_prewrite_requirements,
403                    PrewriteProgress {
404                        productive_write_tool_calls_total,
405                        productive_workspace_inspection_total,
406                        productive_concrete_read_total,
407                        productive_web_research_total,
408                        successful_web_research_total,
409                        required_write_retry_count,
410                        unmet_prewrite_repair_retry_count,
411                        prewrite_gate_waived,
412                    },
413                );
414                let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
415                let prewrite_gate_write = prewrite_gate.gate_write;
416                let force_write_only_retry = prewrite_gate.force_write_only_retry;
417                let allow_repair_tools = prewrite_gate.allow_repair_tools;
418                let required_mcp_tool_pending = has_unattempted_required_mcp_tool(
419                    &required_mcp_tools_before_write,
420                    &tool_call_counts,
421                );
422                if prewrite_gate_write {
423                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
424                }
425                if requested_prewrite_requirements.repair_on_unmet_requirements
426                    && productive_write_tool_calls_total >= 3
427                {
428                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
429                }
430                if allow_repair_tools {
431                    let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
432                    let repair_tools = tool_schemas
433                        .iter()
434                        .filter(|schema| {
435                            tool_matches_unmet_prewrite_repair_requirement(
436                                &schema.name,
437                                &unmet_prewrite_codes,
438                                productive_workspace_inspection_total > 0,
439                            )
440                        })
441                        .cloned()
442                        .collect::<Vec<_>>();
443                    if !repair_tools.is_empty() {
444                        tool_schemas = repair_tools;
445                    }
446                }
447                if force_write_only_retry && !allow_repair_tools && !required_mcp_tool_pending {
448                    tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
449                }
450                if active_agent.tools.is_some() {
451                    tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
452                }
453                tool_schemas.retain(|schema| {
454                    let normalized = normalize_tool_name(&schema.name);
455                    if let Some(server) = mcp_server_from_tool_name(&normalized) {
456                        !blocked_mcp_servers.contains(server)
457                    } else {
458                        true
459                    }
460                });
461                if let Some(allowed_tools) = self
462                    .session_allowed_tools
463                    .read()
464                    .await
465                    .get(&session_id)
466                    .cloned()
467                {
468                    if !allowed_tools.is_empty() {
469                        tool_schemas.retain(|schema| {
470                            let normalized = normalize_tool_name(&schema.name);
471                            any_policy_matches(&allowed_tools, &normalized)
472                        });
473                    }
474                }
475                if required_mcp_tool_pending {
476                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
477                }
478                if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
479                    let detail = validation_err.to_string();
480                    emit_event(
481                        Level::ERROR,
482                        ProcessKind::Engine,
483                        ObservabilityEvent {
484                            event: "provider.call.error",
485                            component: "engine.loop",
486                            correlation_id: correlation_ref,
487                            session_id: Some(&session_id),
488                            run_id: None,
489                            message_id: Some(&user_message_id),
490                            provider_id: Some(provider_id.as_str()),
491                            model_id,
492                            status: Some("failed"),
493                            error_code: Some("TOOL_SCHEMA_INVALID"),
494                            detail: Some(&detail),
495                        },
496                    );
497                    anyhow::bail!("{detail}");
498                }
499                let routing_decision = ToolRoutingDecision {
500                    pass: if auto_tools_escalated { 2 } else { 1 },
501                    mode: match requested_tool_mode {
502                        ToolMode::Auto => default_mode_name(),
503                        ToolMode::None => "none",
504                        ToolMode::Required => "required",
505                    },
506                    intent,
507                    selected_count: tool_schemas.len(),
508                    total_available_count: all_tools.len(),
509                    mcp_included: tool_schemas
510                        .iter()
511                        .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
512                };
513                self.event_bus.publish(EngineEvent::new(
514                    "tool.routing.decision",
515                    json!({
516                        "sessionID": session_id,
517                        "messageID": user_message_id,
518                        "iteration": iteration,
519                        "pass": routing_decision.pass,
520                        "mode": routing_decision.mode,
521                        "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
522                        "selectedToolCount": routing_decision.selected_count,
523                        "totalAvailableTools": routing_decision.total_available_count,
524                        "mcpIncluded": routing_decision.mcp_included,
525                        "retrievalEnabled": retrieval_enabled,
526                        "retrievalK": retrieval_k,
527                        "fallbackToFullTools": retrieval_fallback_reason.is_some(),
528                        "fallbackReason": retrieval_fallback_reason
529                    }),
530                ));
531                let allowed_tool_names = tool_schemas
532                    .iter()
533                    .map(|schema| normalize_tool_name(&schema.name))
534                    .collect::<HashSet<_>>();
535                if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
536                    email_tools_ever_offered = true;
537                }
538                let offered_tool_preview = tool_schemas
539                    .iter()
540                    .take(8)
541                    .map(|schema| normalize_tool_name(&schema.name))
542                    .collect::<Vec<_>>()
543                    .join(", ");
544                self.event_bus.publish(EngineEvent::new(
545                    "provider.call.iteration.start",
546                    json!({
547                        "sessionID": session_id,
548                        "messageID": user_message_id,
549                        "iteration": iteration,
550                        "selectedToolCount": allowed_tool_names.len(),
551                    }),
552                ));
553                let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
554                let provider_connect_timeout =
555                    Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
556                let provider_idle_timeout =
557                    Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
558                let provider_stream_retry_budget = provider_stream_decode_retry_attempts();
559                let mut provider_stream_retry_count = 0usize;
560                let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
561                let mut provider_usage: Option<TokenUsage>;
562                let mut accepted_tool_calls_in_cycle: usize;
563                'provider_stream_attempt: loop {
564                    completion.clear();
565                    streamed_tool_calls.clear();
566                    provider_usage = None;
567                    accepted_tool_calls_in_cycle = 0;
568                    let stream_result = tokio::time::timeout(
569                        provider_connect_timeout,
570                        self.providers.stream_for_provider(
571                            Some(provider_id.as_str()),
572                            Some(model_id_value.as_str()),
573                            messages.clone(),
574                            requested_tool_mode.clone(),
575                            Some(tool_schemas.clone()),
576                            cancel.clone(),
577                        ),
578                    )
579                    .await
580                    .map_err(|_| {
581                        anyhow::anyhow!(
582                            "provider stream connect timeout after {} ms",
583                            provider_connect_timeout.as_millis()
584                        )
585                    })
586                    .and_then(|result| result);
587                    let stream = match stream_result {
588                        Ok(stream) => stream,
589                        Err(err) => {
590                            let error_text = err.to_string();
591                            if is_transient_provider_stream_error(&error_text)
592                                && provider_stream_retry_count < provider_stream_retry_budget
593                            {
594                                provider_stream_retry_count =
595                                    provider_stream_retry_count.saturating_add(1);
596                                let detail = truncate_text(&error_text, 500);
597                                self.event_bus.publish(EngineEvent::new(
598                                    "provider.call.iteration.retry",
599                                    json!({
600                                        "sessionID": session_id,
601                                        "messageID": user_message_id,
602                                        "providerID": provider_id,
603                                        "modelID": model_id_value,
604                                        "iteration": iteration,
605                                        "error": detail,
606                                        "retry": provider_stream_retry_count,
607                                        "maxRetries": provider_stream_retry_budget,
608                                    }),
609                                ));
610                                continue 'provider_stream_attempt;
611                            }
612                            let error_code = provider_error_code(&error_text);
613                            let detail = truncate_text(&error_text, 500);
614                            emit_event(
615                                Level::ERROR,
616                                ProcessKind::Engine,
617                                ObservabilityEvent {
618                                    event: "provider.call.error",
619                                    component: "engine.loop",
620                                    correlation_id: correlation_ref,
621                                    session_id: Some(&session_id),
622                                    run_id: None,
623                                    message_id: Some(&user_message_id),
624                                    provider_id: Some(provider_id.as_str()),
625                                    model_id,
626                                    status: Some("failed"),
627                                    error_code: Some(error_code),
628                                    detail: Some(&detail),
629                                },
630                            );
631                            self.event_bus.publish(EngineEvent::new(
632                                "provider.call.iteration.error",
633                                json!({
634                                    "sessionID": session_id,
635                                    "messageID": user_message_id,
636                                    "iteration": iteration,
637                                    "error": detail,
638                                }),
639                            ));
640                            self.mark_session_run_failed(&session_id, &err.to_string())
641                                .await;
642                            return Err(err);
643                        }
644                    };
645                    tokio::pin!(stream);
646                    loop {
647                        let next_chunk_result =
648                            tokio::time::timeout(provider_idle_timeout, stream.next())
649                                .await
650                                .map_err(|_| {
651                                    anyhow::anyhow!(
652                                        "provider stream idle timeout after {} ms",
653                                        provider_idle_timeout.as_millis()
654                                    )
655                                });
656                        let next_chunk = match next_chunk_result {
657                            Ok(next_chunk) => next_chunk,
658                            Err(err) => {
659                                let error_text = err.to_string();
660                                self.event_bus.publish(EngineEvent::new(
661                                    "provider.call.iteration.error",
662                                    json!({
663                                        "sessionID": session_id,
664                                        "messageID": user_message_id,
665                                        "iteration": iteration,
666                                        "error": truncate_text(&error_text, 500),
667                                    }),
668                                ));
669                                self.mark_session_run_failed(&session_id, &error_text).await;
670                                return Err(err);
671                            }
672                        };
673                        let Some(chunk) = next_chunk else {
674                            break 'provider_stream_attempt;
675                        };
676                        let chunk = match chunk {
677                            Ok(chunk) => chunk,
678                            Err(err) => {
679                                let error_text = err.to_string();
680                                let stream_error_text =
681                                    format!("provider stream chunk error: {error_text}");
682                                if is_transient_provider_stream_error(&stream_error_text)
683                                    && provider_stream_retry_count < provider_stream_retry_budget
684                                {
685                                    provider_stream_retry_count =
686                                        provider_stream_retry_count.saturating_add(1);
687                                    let detail = truncate_text(&stream_error_text, 500);
688                                    self.event_bus.publish(EngineEvent::new(
689                                        "provider.call.iteration.retry",
690                                        json!({
691                                            "sessionID": session_id,
692                                            "messageID": user_message_id,
693                                            "providerID": provider_id,
694                                            "modelID": model_id_value,
695                                            "iteration": iteration,
696                                            "error": detail,
697                                            "retry": provider_stream_retry_count,
698                                            "maxRetries": provider_stream_retry_budget,
699                                        }),
700                                    ));
701                                    continue 'provider_stream_attempt;
702                                }
703                                let error_code = provider_error_code(&stream_error_text);
704                                let detail = truncate_text(&stream_error_text, 500);
705                                emit_event(
706                                    Level::ERROR,
707                                    ProcessKind::Engine,
708                                    ObservabilityEvent {
709                                        event: "provider.call.error",
710                                        component: "engine.loop",
711                                        correlation_id: correlation_ref,
712                                        session_id: Some(&session_id),
713                                        run_id: None,
714                                        message_id: Some(&user_message_id),
715                                        provider_id: Some(provider_id.as_str()),
716                                        model_id,
717                                        status: Some("failed"),
718                                        error_code: Some(error_code),
719                                        detail: Some(&detail),
720                                    },
721                                );
722                                self.event_bus.publish(EngineEvent::new(
723                                    "provider.call.iteration.error",
724                                    json!({
725                                        "sessionID": session_id,
726                                        "messageID": user_message_id,
727                                        "iteration": iteration,
728                                        "error": detail,
729                                    }),
730                                ));
731                                let err = anyhow::anyhow!("{stream_error_text}");
732                                self.mark_session_run_failed(&session_id, &err.to_string())
733                                    .await;
734                                return Err(err);
735                            }
736                        };
737                        match chunk {
738                            StreamChunk::TextDelta(delta) => {
739                                let delta = strip_model_control_markers(&delta);
740                                if delta.trim().is_empty() {
741                                    continue;
742                                }
743                                if completion.is_empty() {
744                                    emit_event(
745                                        Level::INFO,
746                                        ProcessKind::Engine,
747                                        ObservabilityEvent {
748                                            event: "provider.call.first_byte",
749                                            component: "engine.loop",
750                                            correlation_id: correlation_ref,
751                                            session_id: Some(&session_id),
752                                            run_id: None,
753                                            message_id: Some(&user_message_id),
754                                            provider_id: Some(provider_id.as_str()),
755                                            model_id,
756                                            status: Some("streaming"),
757                                            error_code: None,
758                                            detail: Some("first text delta"),
759                                        },
760                                    );
761                                }
762                                completion.push_str(&delta);
763                                let delta = truncate_text(&delta, 4_000);
764                                let delta_part = WireMessagePart::text(
765                                    &session_id,
766                                    &user_message_id,
767                                    delta.clone(),
768                                );
769                                self.event_bus.publish(EngineEvent::new(
770                                    "message.part.updated",
771                                    json!({"part": delta_part, "delta": delta}),
772                                ));
773                            }
774                            StreamChunk::ReasoningDelta(_reasoning) => {}
775                            StreamChunk::Done {
776                                finish_reason: _,
777                                usage,
778                            } => {
779                                if usage.is_some() {
780                                    provider_usage = usage;
781                                }
782                                break 'provider_stream_attempt;
783                            }
784                            StreamChunk::ToolCallStart { id, name } => {
785                                let entry = streamed_tool_calls.entry(id).or_default();
786                                if entry.name.is_empty() {
787                                    entry.name = name;
788                                }
789                            }
790                            StreamChunk::ToolCallDelta { id, args_delta } => {
791                                let entry = streamed_tool_calls.entry(id.clone()).or_default();
792                                entry.args.push_str(&args_delta);
793                                let tool_name = if entry.name.trim().is_empty() {
794                                    "tool".to_string()
795                                } else {
796                                    normalize_tool_name(&entry.name)
797                                };
798                                let parsed_preview = if entry.name.trim().is_empty() {
799                                    Value::String(truncate_text(&entry.args, 1_000))
800                                } else {
801                                    parse_streamed_tool_args(&tool_name, &entry.args)
802                                };
803                                let mut tool_part = WireMessagePart::tool_invocation(
804                                    &session_id,
805                                    &user_message_id,
806                                    tool_name.clone(),
807                                    parsed_preview.clone(),
808                                );
809                                tool_part.id = Some(id.clone());
810                                if tool_name == "write" {
811                                    tracing::info!(
812                                        session_id = %session_id,
813                                        message_id = %user_message_id,
814                                        tool_call_id = %id,
815                                        args_delta_len = args_delta.len(),
816                                        accumulated_args_len = entry.args.len(),
817                                        parsed_preview_empty = parsed_preview.is_null()
818                                            || parsed_preview.as_object().is_some_and(|value| value.is_empty())
819                                            || parsed_preview
820                                                .as_str()
821                                                .map(|value| value.trim().is_empty())
822                                                .unwrap_or(false),
823                                        "streamed write tool args delta received"
824                                    );
825                                }
826                                self.event_bus.publish(EngineEvent::new(
827                                    "message.part.updated",
828                                    json!({
829                                        "part": tool_part,
830                                        "toolCallDelta": {
831                                            "id": id,
832                                            "tool": tool_name,
833                                            "argsDelta": truncate_text(&args_delta, 1_000),
834                                            "rawArgsPreview": truncate_text(&entry.args, 2_000),
835                                            "parsedArgsPreview": parsed_preview
836                                        }
837                                    }),
838                                ));
839                            }
840                            StreamChunk::ToolCallEnd { id: _ } => {}
841                        }
842                        if cancel.is_cancelled() {
843                            break 'provider_stream_attempt;
844                        }
845                    }
846                }
847
848                let streamed_tool_call_count = streamed_tool_calls.len();
849                let streamed_tool_call_parse_failed = streamed_tool_calls
850                    .values()
851                    .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
852                let mut tool_calls = streamed_tool_calls
853                    .into_iter()
854                    .filter_map(|(call_id, call)| {
855                        if call.name.trim().is_empty() {
856                            return None;
857                        }
858                        let tool_name = normalize_tool_name(&call.name);
859                        let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
860                        Some(ParsedToolCall {
861                            tool: tool_name,
862                            args: parsed_args,
863                            call_id: Some(call_id),
864                        })
865                    })
866                    .collect::<Vec<_>>();
867                if tool_calls.is_empty() {
868                    tool_calls = parse_tool_invocations_from_response(&completion)
869                        .into_iter()
870                        .map(|(tool, args)| ParsedToolCall {
871                            tool,
872                            args,
873                            call_id: None,
874                        })
875                        .collect::<Vec<_>>();
876                }
877                let provider_tool_parse_failed = tool_calls.is_empty()
878                    && (streamed_tool_call_parse_failed
879                        || (streamed_tool_call_count > 0
880                            && looks_like_unparsed_tool_payload(&completion))
881                        || looks_like_unparsed_tool_payload(&completion));
882                if provider_tool_parse_failed {
883                    latest_required_tool_failure_kind =
884                        RequiredToolFailureKind::ToolCallParseFailed;
885                } else if tool_calls.is_empty() {
886                    latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
887                }
888                if router_enabled
889                    && matches!(requested_tool_mode, ToolMode::Auto)
890                    && !auto_tools_escalated
891                    && iteration == 1
892                    && should_escalate_auto_tools(intent, &text, &completion)
893                {
894                    auto_tools_escalated = true;
895                    followup_context = Some(
896                        "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
897                            .to_string(),
898                    );
899                    self.event_bus.publish(EngineEvent::new(
900                        "provider.call.iteration.finish",
901                        json!({
902                            "sessionID": session_id,
903                            "messageID": user_message_id,
904                            "iteration": iteration,
905                            "finishReason": "auto_escalate",
906                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
907                            "rejectedToolCalls": 0,
908                        }),
909                    ));
910                    continue;
911                }
912                if tool_calls.is_empty()
913                    && !auto_workspace_probe_attempted
914                    && should_force_workspace_probe(&text, &completion)
915                    && allowed_tool_names.contains("glob")
916                {
917                    auto_workspace_probe_attempted = true;
918                    tool_calls = vec![ParsedToolCall {
919                        tool: "glob".to_string(),
920                        args: json!({ "pattern": "*" }),
921                        call_id: None,
922                    }];
923                }
924                if !tool_calls.is_empty() {
925                    let saw_tool_call_candidate = true;
926                    let mut outputs = Vec::new();
927                    let mut executed_productive_tool = false;
928                    let mut write_tool_attempted_in_cycle = false;
929                    let mut auth_required_hit_in_cycle = false;
930                    let mut guard_budget_hit_in_cycle = false;
931                    let mut duplicate_signature_hit_in_cycle = false;
932                    let mut rejected_tool_call_in_cycle = false;
933                    for ParsedToolCall {
934                        tool,
935                        args,
936                        call_id,
937                    } in tool_calls
938                    {
939                        if !agent_can_use_tool(&active_agent, &tool) {
940                            rejected_tool_call_in_cycle = true;
941                            continue;
942                        }
943                        let tool_key = normalize_tool_name(&tool);
944                        if is_workspace_write_tool(&tool_key) {
945                            write_tool_attempted_in_cycle = true;
946                        }
947                        if !allowed_tool_names.contains(&tool_key) {
948                            rejected_tool_call_in_cycle = true;
949                            let note = if offered_tool_preview.is_empty() {
950                                format!(
951                                    "Tool `{}` call skipped: it is not available in this turn.",
952                                    tool_key
953                                )
954                            } else {
955                                format!(
956                                    "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
957                                    tool_key, offered_tool_preview
958                                )
959                            };
960                            self.event_bus.publish(EngineEvent::new(
961                                "tool.call.rejected_unoffered",
962                                json!({
963                                    "sessionID": session_id,
964                                    "messageID": user_message_id,
965                                    "iteration": iteration,
966                                    "tool": tool_key,
967                                    "offeredToolCount": allowed_tool_names.len()
968                                }),
969                            ));
970                            if tool_name_looks_like_email_action(&tool_key) {
971                                latest_email_action_note = Some(note.clone());
972                            }
973                            outputs.push(note);
974                            continue;
975                        }
976                        if let Some(server) = mcp_server_from_tool_name(&tool_key) {
977                            if blocked_mcp_servers.contains(server) {
978                                rejected_tool_call_in_cycle = true;
979                                outputs.push(format!(
980                                    "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
981                                    tool_key, server
982                                ));
983                                continue;
984                            }
985                        }
986                        if tool_key == "question" {
987                            question_tool_used = true;
988                        }
989                        if tool_key == "pack_builder" && pack_builder_executed {
990                            rejected_tool_call_in_cycle = true;
991                            outputs.push(
992                                "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
993                                    .to_string(),
994                            );
995                            continue;
996                        }
997                        if websearch_query_blocked && tool_key == "websearch" {
998                            rejected_tool_call_in_cycle = true;
999                            outputs.push(
1000                                "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
1001                                    .to_string(),
1002                            );
1003                            continue;
1004                        }
1005                        let mut effective_args = args.clone();
1006                        if tool_key == "todo_write" {
1007                            effective_args = normalize_todo_write_args(effective_args, &completion);
1008                            if is_empty_todo_write_args(&effective_args) {
1009                                rejected_tool_call_in_cycle = true;
1010                                outputs.push(
1011                                    "Tool `todo_write` call skipped: empty todo payload."
1012                                        .to_string(),
1013                                );
1014                                continue;
1015                            }
1016                        }
1017                        let signature = if tool_key == "batch" {
1018                            batch_tool_signature(&args)
1019                                .unwrap_or_else(|| tool_signature(&tool_key, &args))
1020                        } else {
1021                            tool_signature(&tool_key, &args)
1022                        };
1023                        if is_shell_tool_name(&tool_key)
1024                            && shell_mismatch_signatures.contains(&signature)
1025                        {
1026                            rejected_tool_call_in_cycle = true;
1027                            outputs.push(
1028                                "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
1029                                    .to_string(),
1030                            );
1031                            continue;
1032                        }
1033                        let mut signature_count = 1usize;
1034                        if is_read_only_tool(&tool_key)
1035                            || (tool_key == "batch" && is_read_only_batch_call(&args))
1036                        {
1037                            let count = readonly_signature_counts
1038                                .entry(signature.clone())
1039                                .and_modify(|v| *v = v.saturating_add(1))
1040                                .or_insert(1);
1041                            signature_count = *count;
1042                            if tool_key == "websearch" {
1043                                if let Some(limit) = websearch_duplicate_signature_limit {
1044                                    if *count > limit {
1045                                        rejected_tool_call_in_cycle = true;
1046                                        self.event_bus.publish(EngineEvent::new(
1047                                            "tool.loop_guard.triggered",
1048                                            json!({
1049                                                "sessionID": session_id,
1050                                                "messageID": user_message_id,
1051                                                "tool": tool_key,
1052                                                "reason": "duplicate_signature_retry_exhausted",
1053                                                "duplicateLimit": limit,
1054                                                "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
1055                                                "loop_guard_triggered": true
1056                                            }),
1057                                        ));
1058                                        outputs.push(
1059                                            "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
1060                                                .to_string(),
1061                                        );
1062                                        continue;
1063                                    }
1064                                }
1065                            }
1066                            if tool_key != "websearch" && *count > 1 {
1067                                rejected_tool_call_in_cycle = true;
1068                                if let Some(cached) = readonly_tool_cache.get(&signature) {
1069                                    outputs.push(cached.clone());
1070                                } else {
1071                                    outputs.push(format!(
1072                                        "Tool `{}` call skipped: duplicate call signature detected.",
1073                                        tool_key
1074                                    ));
1075                                }
1076                                continue;
1077                            }
1078                        }
1079                        let is_read_only_signature = is_read_only_tool(&tool_key)
1080                            || (tool_key == "batch" && is_read_only_batch_call(&args));
1081                        if !is_read_only_signature {
1082                            let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1083                            let seen = mutable_signature_counts
1084                                .entry(signature.clone())
1085                                .and_modify(|v| *v = v.saturating_add(1))
1086                                .or_insert(1);
1087                            if *seen > duplicate_limit {
1088                                rejected_tool_call_in_cycle = true;
1089                                self.event_bus.publish(EngineEvent::new(
1090                                    "tool.loop_guard.triggered",
1091                                    json!({
1092                                        "sessionID": session_id,
1093                                        "messageID": user_message_id,
1094                                        "tool": tool_key,
1095                                        "reason": "duplicate_signature_retry_exhausted",
1096                                        "signatureHash": stable_hash(&signature),
1097                                        "duplicateLimit": duplicate_limit,
1098                                        "loop_guard_triggered": true
1099                                    }),
1100                                ));
1101                                outputs.push(format!(
1102                                    "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1103                                    tool_key, duplicate_limit
1104                                ));
1105                                duplicate_signature_hit_in_cycle = true;
1106                                continue;
1107                            }
1108                        }
1109                        let budget = tool_budget_for(&tool_key);
1110                        let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1111                        if *entry >= budget {
1112                            rejected_tool_call_in_cycle = true;
1113                            outputs.push(format!(
1114                                "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1115                                tool_key, budget
1116                            ));
1117                            guard_budget_hit_in_cycle = true;
1118                            continue;
1119                        }
1120                        let mut finalized_part = WireMessagePart::tool_invocation(
1121                            &session_id,
1122                            &user_message_id,
1123                            tool.clone(),
1124                            effective_args.clone(),
1125                        );
1126                        if let Some(call_id) = call_id.clone() {
1127                            finalized_part.id = Some(call_id);
1128                        }
1129                        finalized_part.state = Some("pending".to_string());
1130                        self.event_bus.publish(EngineEvent::new(
1131                            "message.part.updated",
1132                            json!({"part": finalized_part}),
1133                        ));
1134                        *entry += 1;
1135                        accepted_tool_calls_in_cycle =
1136                            accepted_tool_calls_in_cycle.saturating_add(1);
1137                        let tool_output_result = self
1138                            .execute_tool_with_permission(
1139                                &session_id,
1140                                &user_message_id,
1141                                tool,
1142                                effective_args,
1143                                call_id,
1144                                active_agent.skills.as_deref(),
1145                                &text,
1146                                requested_write_required,
1147                                Some(&completion),
1148                                cancel.clone(),
1149                            )
1150                            .await;
1151                        let Some(output) = (match tool_output_result {
1152                            Ok(output) => output,
1153                            Err(err) => {
1154                                self.mark_session_run_failed(&session_id, &err.to_string())
1155                                    .await;
1156                                return Err(err);
1157                            }
1158                        }) else {
1159                            continue;
1160                        };
1161                        {
1162                            let productive = is_productive_tool_output(&tool_key, &output);
1163                            if output.contains("WEBSEARCH_QUERY_MISSING") {
1164                                websearch_query_blocked = true;
1165                            }
1166                            if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1167                            {
1168                                shell_mismatch_signatures.insert(signature.clone());
1169                            }
1170                            if is_read_only_tool(&tool_key)
1171                                && tool_key != "websearch"
1172                                && signature_count == 1
1173                            {
1174                                readonly_tool_cache.insert(signature, output.clone());
1175                            }
1176                            if productive {
1177                                productive_tool_calls_total =
1178                                    productive_tool_calls_total.saturating_add(1);
1179                                if is_workspace_write_tool(&tool_key) {
1180                                    productive_write_tool_calls_total =
1181                                        productive_write_tool_calls_total.saturating_add(1);
1182                                }
1183                                if is_workspace_inspection_tool(&tool_key) {
1184                                    productive_workspace_inspection_total =
1185                                        productive_workspace_inspection_total.saturating_add(1);
1186                                }
1187                                if tool_key == "read" {
1188                                    productive_concrete_read_total =
1189                                        productive_concrete_read_total.saturating_add(1);
1190                                }
1191                                if is_web_research_tool(&tool_key) {
1192                                    productive_web_research_total =
1193                                        productive_web_research_total.saturating_add(1);
1194                                    if is_successful_web_research_output(&tool_key, &output) {
1195                                        successful_web_research_total =
1196                                            successful_web_research_total.saturating_add(1);
1197                                    }
1198                                }
1199                                executed_productive_tool = true;
1200                                if tool_key == "pack_builder" {
1201                                    pack_builder_executed = true;
1202                                }
1203                            }
1204                            if tool_name_looks_like_email_action(&tool_key) {
1205                                if productive {
1206                                    email_action_executed = true;
1207                                } else {
1208                                    latest_email_action_note =
1209                                        Some(truncate_text(&output, 280).replace('\n', " "));
1210                                }
1211                            }
1212                            if is_auth_required_tool_output(&output) {
1213                                if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1214                                    blocked_mcp_servers.insert(server.to_string());
1215                                }
1216                                auth_required_hit_in_cycle = true;
1217                            }
1218                            outputs.push(output);
1219                            if auth_required_hit_in_cycle {
1220                                break;
1221                            }
1222                            if guard_budget_hit_in_cycle {
1223                                break;
1224                            }
1225                        }
1226                    }
1227                    if !outputs.is_empty() {
1228                        last_tool_outputs = outputs.clone();
1229                        if matches!(requested_tool_mode, ToolMode::Required)
1230                            && productive_tool_calls_total == 0
1231                        {
1232                            latest_required_tool_failure_kind = classify_required_tool_failure(
1233                                &outputs,
1234                                saw_tool_call_candidate,
1235                                accepted_tool_calls_in_cycle,
1236                                provider_tool_parse_failed,
1237                                rejected_tool_call_in_cycle,
1238                            );
1239                            if requested_write_required
1240                                && write_tool_attempted_in_cycle
1241                                && productive_write_tool_calls_total == 0
1242                                && is_write_invalid_args_failure_kind(
1243                                    latest_required_tool_failure_kind,
1244                                )
1245                            {
1246                                if required_write_retry_count + 1 < strict_write_retry_max_attempts
1247                                {
1248                                    required_write_retry_count += 1;
1249                                    required_tool_retry_count += 1;
1250                                    followup_context = Some(build_write_required_retry_context(
1251                                        &offered_tool_preview,
1252                                        latest_required_tool_failure_kind,
1253                                        &text,
1254                                        &requested_prewrite_requirements,
1255                                        productive_workspace_inspection_total > 0,
1256                                        productive_concrete_read_total > 0,
1257                                        productive_web_research_total > 0,
1258                                        successful_web_research_total > 0,
1259                                    ));
1260                                    self.event_bus.publish(EngineEvent::new(
1261                                        "provider.call.iteration.finish",
1262                                        json!({
1263                                            "sessionID": session_id,
1264                                            "messageID": user_message_id,
1265                                            "iteration": iteration,
1266                                            "finishReason": "required_write_invalid_retry",
1267                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1268                                            "rejectedToolCalls": 0,
1269                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1270                                        }),
1271                                    ));
1272                                    continue;
1273                                }
1274                            }
1275                            let progress_made_in_cycle = productive_workspace_inspection_total > 0
1276                                || productive_concrete_read_total > 0
1277                                || productive_web_research_total > 0
1278                                || successful_web_research_total > 0;
1279                            if should_retry_nonproductive_required_tool_cycle(
1280                                requested_write_required,
1281                                write_tool_attempted_in_cycle,
1282                                progress_made_in_cycle,
1283                                required_tool_retry_count,
1284                            ) {
1285                                required_tool_retry_count += 1;
1286                                followup_context =
1287                                    Some(build_required_tool_retry_context_for_task(
1288                                        &offered_tool_preview,
1289                                        latest_required_tool_failure_kind,
1290                                        &text,
1291                                    ));
1292                                self.event_bus.publish(EngineEvent::new(
1293                                    "provider.call.iteration.finish",
1294                                    json!({
1295                                        "sessionID": session_id,
1296                                        "messageID": user_message_id,
1297                                        "iteration": iteration,
1298                                        "finishReason": "required_tool_retry",
1299                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300                                        "rejectedToolCalls": 0,
1301                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302                                    }),
1303                                ));
1304                                continue;
1305                            }
1306                            completion = required_tool_mode_unsatisfied_completion(
1307                                latest_required_tool_failure_kind,
1308                            );
1309                            if !required_tool_unsatisfied_emitted {
1310                                required_tool_unsatisfied_emitted = true;
1311                                self.event_bus.publish(EngineEvent::new(
1312                                    "tool.mode.required.unsatisfied",
1313                                    json!({
1314                                        "sessionID": session_id,
1315                                        "messageID": user_message_id,
1316                                        "iteration": iteration,
1317                                        "selectedToolCount": allowed_tool_names.len(),
1318                                        "offeredToolsPreview": offered_tool_preview,
1319                                        "reason": latest_required_tool_failure_kind.code(),
1320                                    }),
1321                                ));
1322                            }
1323                            self.event_bus.publish(EngineEvent::new(
1324                                "provider.call.iteration.finish",
1325                                json!({
1326                                    "sessionID": session_id,
1327                                    "messageID": user_message_id,
1328                                    "iteration": iteration,
1329                                    "finishReason": "required_tool_unsatisfied",
1330                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1331                                    "rejectedToolCalls": 0,
1332                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1333                                }),
1334                            ));
1335                            break;
1336                        }
1337                        let prewrite_gate = evaluate_prewrite_gate(
1338                            requested_write_required,
1339                            &requested_prewrite_requirements,
1340                            PrewriteProgress {
1341                                productive_write_tool_calls_total,
1342                                productive_workspace_inspection_total,
1343                                productive_concrete_read_total,
1344                                productive_web_research_total,
1345                                successful_web_research_total,
1346                                required_write_retry_count,
1347                                unmet_prewrite_repair_retry_count,
1348                                prewrite_gate_waived,
1349                            },
1350                        );
1351                        let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1352                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1353                        if requested_write_required
1354                            && productive_tool_calls_total > 0
1355                            && productive_write_tool_calls_total == 0
1356                        {
1357                            if should_start_prewrite_repair_before_first_write(
1358                                requested_prewrite_requirements.repair_on_unmet_requirements,
1359                                productive_write_tool_calls_total,
1360                                prewrite_satisfied,
1361                                code_workflow_requested,
1362                            ) {
1363                                if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1364                                    unmet_prewrite_repair_retry_count += 1;
1365                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1366                                    let repair_attempts_remaining =
1367                                        prewrite_repair_budget.saturating_sub(repair_attempt);
1368                                    followup_context = Some(build_prewrite_repair_retry_context(
1369                                        &offered_tool_preview,
1370                                        latest_required_tool_failure_kind,
1371                                        &text,
1372                                        &requested_prewrite_requirements,
1373                                        productive_workspace_inspection_total > 0,
1374                                        productive_concrete_read_total > 0,
1375                                        productive_web_research_total > 0,
1376                                        successful_web_research_total > 0,
1377                                    ));
1378                                    self.event_bus.publish(EngineEvent::new(
1379                                        "provider.call.iteration.finish",
1380                                        json!({
1381                                            "sessionID": session_id,
1382                                            "messageID": user_message_id,
1383                                            "iteration": iteration,
1384                                            "finishReason": "prewrite_repair_retry",
1385                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1386                                            "rejectedToolCalls": 0,
1387                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1388                                            "repair": prewrite_repair_event_payload(
1389                                                repair_attempt,
1390                                                repair_attempts_remaining,
1391                                                &unmet_prewrite_codes,
1392                                                false,
1393                                            ),
1394                                        }),
1395                                    ));
1396                                    continue;
1397                                }
1398                                if !prewrite_gate_waived {
1399                                    if prewrite_fail_closed {
1400                                        let repair_attempt = unmet_prewrite_repair_retry_count;
1401                                        let repair_attempts_remaining =
1402                                            prewrite_repair_budget.saturating_sub(repair_attempt);
1403                                        completion = prewrite_requirements_exhausted_completion(
1404                                            &unmet_prewrite_codes,
1405                                            repair_attempt,
1406                                            repair_attempts_remaining,
1407                                        );
1408                                        self.event_bus.publish(EngineEvent::new(
1409                                            "prewrite.gate.strict_mode.blocked",
1410                                            json!({
1411                                                "sessionID": session_id,
1412                                                "messageID": user_message_id,
1413                                                "iteration": iteration,
1414                                                "unmetCodes": unmet_prewrite_codes,
1415                                            }),
1416                                        ));
1417                                        self.event_bus.publish(EngineEvent::new(
1418                                            "provider.call.iteration.finish",
1419                                            json!({
1420                                                "sessionID": session_id,
1421                                                "messageID": user_message_id,
1422                                                "iteration": iteration,
1423                                                "finishReason": "prewrite_requirements_exhausted",
1424                                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1425                                                "rejectedToolCalls": 0,
1426                                                "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1427                                                "repair": prewrite_repair_event_payload(
1428                                                    repair_attempt,
1429                                                    repair_attempts_remaining,
1430                                                    &unmet_prewrite_codes,
1431                                                    true,
1432                                                ),
1433                                            }),
1434                                        ));
1435                                        break;
1436                                    }
1437                                    prewrite_gate_waived = true;
1438                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1439                                    let repair_attempts_remaining =
1440                                        prewrite_repair_budget.saturating_sub(repair_attempt);
1441                                    followup_context = Some(build_prewrite_waived_write_context(
1442                                        &text,
1443                                        &unmet_prewrite_codes,
1444                                    ));
1445                                    self.event_bus.publish(EngineEvent::new(
1446                                        "prewrite.gate.waived.write_executed",
1447                                        json!({
1448                                            "sessionID": session_id,
1449                                            "messageID": user_message_id,
1450                                            "unmetCodes": unmet_prewrite_codes,
1451                                        }),
1452                                    ));
1453                                    self.event_bus.publish(EngineEvent::new(
1454                                        "provider.call.iteration.finish",
1455                                        json!({
1456                                            "sessionID": session_id,
1457                                            "messageID": user_message_id,
1458                                            "iteration": iteration,
1459                                            "finishReason": "prewrite_gate_waived",
1460                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1461                                            "rejectedToolCalls": 0,
1462                                            "prewriteGateWaived": true,
1463                                            "repair": prewrite_repair_event_payload(
1464                                                repair_attempt,
1465                                                repair_attempts_remaining,
1466                                                &unmet_prewrite_codes,
1467                                                true,
1468                                            ),
1469                                        }),
1470                                    ));
1471                                    continue;
1472                                }
1473                            }
1474                            latest_required_tool_failure_kind =
1475                                RequiredToolFailureKind::WriteRequiredNotSatisfied;
1476                            if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1477                                required_write_retry_count += 1;
1478                                followup_context = Some(build_write_required_retry_context(
1479                                    &offered_tool_preview,
1480                                    latest_required_tool_failure_kind,
1481                                    &text,
1482                                    &requested_prewrite_requirements,
1483                                    productive_workspace_inspection_total > 0,
1484                                    productive_concrete_read_total > 0,
1485                                    productive_web_research_total > 0,
1486                                    successful_web_research_total > 0,
1487                                ));
1488                                self.event_bus.publish(EngineEvent::new(
1489                                    "provider.call.iteration.finish",
1490                                    json!({
1491                                        "sessionID": session_id,
1492                                        "messageID": user_message_id,
1493                                        "iteration": iteration,
1494                                        "finishReason": "required_write_retry",
1495                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1496                                        "rejectedToolCalls": 0,
1497                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1498                                    }),
1499                                ));
1500                                continue;
1501                            }
1502                            completion = required_tool_mode_unsatisfied_completion(
1503                                latest_required_tool_failure_kind,
1504                            );
1505                            if !required_tool_unsatisfied_emitted {
1506                                required_tool_unsatisfied_emitted = true;
1507                                self.event_bus.publish(EngineEvent::new(
1508                                    "tool.mode.required.unsatisfied",
1509                                    json!({
1510                                        "sessionID": session_id,
1511                                        "messageID": user_message_id,
1512                                        "iteration": iteration,
1513                                        "selectedToolCount": allowed_tool_names.len(),
1514                                        "offeredToolsPreview": offered_tool_preview,
1515                                        "reason": latest_required_tool_failure_kind.code(),
1516                                    }),
1517                                ));
1518                            }
1519                            self.event_bus.publish(EngineEvent::new(
1520                                "provider.call.iteration.finish",
1521                                json!({
1522                                    "sessionID": session_id,
1523                                    "messageID": user_message_id,
1524                                    "iteration": iteration,
1525                                    "finishReason": "required_write_unsatisfied",
1526                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1527                                    "rejectedToolCalls": 0,
1528                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1529                                }),
1530                            ));
1531                            break;
1532                        }
1533                        if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1534                            if let Some(retry_context) =
1535                                build_invalid_tool_args_retry_context_from_outputs(
1536                                    &outputs,
1537                                    invalid_tool_args_retry_count,
1538                                )
1539                            {
1540                                invalid_tool_args_retry_count += 1;
1541                                followup_context = Some(format!(
1542                                    "Previous tool call arguments were invalid. {}",
1543                                    retry_context
1544                                ));
1545                                self.event_bus.publish(EngineEvent::new(
1546                                    "provider.call.iteration.finish",
1547                                    json!({
1548                                        "sessionID": session_id,
1549                                        "messageID": user_message_id,
1550                                        "iteration": iteration,
1551                                        "finishReason": "invalid_tool_args_retry",
1552                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1553                                        "rejectedToolCalls": 0,
1554                                    }),
1555                                ));
1556                                continue;
1557                            }
1558                        }
1559                        let guard_budget_hit =
1560                            outputs.iter().any(|o| is_guard_budget_tool_output(o));
1561                        if executed_productive_tool {
1562                            let prewrite_gate = evaluate_prewrite_gate(
1563                                requested_write_required,
1564                                &requested_prewrite_requirements,
1565                                PrewriteProgress {
1566                                    productive_write_tool_calls_total,
1567                                    productive_workspace_inspection_total,
1568                                    productive_concrete_read_total,
1569                                    productive_web_research_total,
1570                                    successful_web_research_total,
1571                                    required_write_retry_count,
1572                                    unmet_prewrite_repair_retry_count,
1573                                    prewrite_gate_waived,
1574                                },
1575                            );
1576                            let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1577                            let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1578                            if requested_write_required
1579                                && productive_write_tool_calls_total > 0
1580                                && requested_prewrite_requirements.repair_on_unmet_requirements
1581                                && unmet_prewrite_repair_retry_count < prewrite_repair_budget
1582                                && !prewrite_satisfied
1583                            {
1584                                unmet_prewrite_repair_retry_count += 1;
1585                                let repair_attempt = unmet_prewrite_repair_retry_count;
1586                                let repair_attempts_remaining =
1587                                    prewrite_repair_budget.saturating_sub(repair_attempt);
1588                                followup_context = Some(build_prewrite_repair_retry_context(
1589                                    &offered_tool_preview,
1590                                    latest_required_tool_failure_kind,
1591                                    &text,
1592                                    &requested_prewrite_requirements,
1593                                    productive_workspace_inspection_total > 0,
1594                                    productive_concrete_read_total > 0,
1595                                    productive_web_research_total > 0,
1596                                    successful_web_research_total > 0,
1597                                ));
1598                                self.event_bus.publish(EngineEvent::new(
1599                                    "provider.call.iteration.finish",
1600                                    json!({
1601                                        "sessionID": session_id,
1602                                        "messageID": user_message_id,
1603                                        "iteration": iteration,
1604                                        "finishReason": "prewrite_repair_retry",
1605                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1606                                        "rejectedToolCalls": 0,
1607                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1608                                        "repair": prewrite_repair_event_payload(
1609                                            repair_attempt,
1610                                            repair_attempts_remaining,
1611                                            &unmet_prewrite_codes,
1612                                            false,
1613                                        ),
1614                                    }),
1615                                ));
1616                                continue;
1617                            }
1618                            if requested_write_required
1619                                && productive_write_tool_calls_total > 0
1620                                && requested_prewrite_requirements.repair_on_unmet_requirements
1621                                && !prewrite_satisfied
1622                                && prewrite_fail_closed
1623                            {
1624                                let repair_attempt = unmet_prewrite_repair_retry_count;
1625                                let repair_attempts_remaining =
1626                                    prewrite_repair_budget.saturating_sub(repair_attempt);
1627                                completion = prewrite_requirements_exhausted_completion(
1628                                    &unmet_prewrite_codes,
1629                                    repair_attempt,
1630                                    repair_attempts_remaining,
1631                                );
1632                                self.event_bus.publish(EngineEvent::new(
1633                                    "prewrite.gate.strict_mode.blocked",
1634                                    json!({
1635                                        "sessionID": session_id,
1636                                        "messageID": user_message_id,
1637                                        "iteration": iteration,
1638                                        "unmetCodes": unmet_prewrite_codes,
1639                                    }),
1640                                ));
1641                                self.event_bus.publish(EngineEvent::new(
1642                                    "provider.call.iteration.finish",
1643                                    json!({
1644                                        "sessionID": session_id,
1645                                        "messageID": user_message_id,
1646                                        "iteration": iteration,
1647                                        "finishReason": "prewrite_requirements_exhausted",
1648                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1649                                        "rejectedToolCalls": 0,
1650                                        "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1651                                        "repair": prewrite_repair_event_payload(
1652                                            repair_attempt,
1653                                            repair_attempts_remaining,
1654                                            &unmet_prewrite_codes,
1655                                            true,
1656                                        ),
1657                                    }),
1658                                ));
1659                                break;
1660                            }
1661                            followup_context = Some(format!(
1662                                "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1663                                summarize_tool_outputs(&outputs)
1664                            ));
1665                            self.event_bus.publish(EngineEvent::new(
1666                                "provider.call.iteration.finish",
1667                                json!({
1668                                    "sessionID": session_id,
1669                                    "messageID": user_message_id,
1670                                    "iteration": iteration,
1671                                    "finishReason": "tool_followup",
1672                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1673                                    "rejectedToolCalls": 0,
1674                                }),
1675                            ));
1676                            continue;
1677                        }
1678                        if guard_budget_hit {
1679                            completion = summarize_guard_budget_outputs(&outputs)
1680                                .unwrap_or_else(|| {
1681                                    "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1682                                });
1683                        } else if duplicate_signature_hit_in_cycle {
1684                            completion = summarize_duplicate_signature_outputs(&outputs)
1685                                .unwrap_or_else(|| {
1686                                    "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1687                                });
1688                        } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1689                            completion = summary;
1690                        } else {
1691                            completion.clear();
1692                        }
1693                        self.event_bus.publish(EngineEvent::new(
1694                            "provider.call.iteration.finish",
1695                            json!({
1696                                "sessionID": session_id,
1697                                "messageID": user_message_id,
1698                                "iteration": iteration,
1699                                "finishReason": "tool_summary",
1700                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1701                                "rejectedToolCalls": 0,
1702                            }),
1703                        ));
1704                        break;
1705                    } else if matches!(requested_tool_mode, ToolMode::Required) {
1706                        latest_required_tool_failure_kind = classify_required_tool_failure(
1707                            &outputs,
1708                            saw_tool_call_candidate,
1709                            accepted_tool_calls_in_cycle,
1710                            provider_tool_parse_failed,
1711                            rejected_tool_call_in_cycle,
1712                        );
1713                    }
1714                }
1715
1716                {
1717                    let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1718                        if let Some(usage) = provider_usage {
1719                            (
1720                                usage.prompt_tokens,
1721                                usage.completion_tokens,
1722                                usage.total_tokens,
1723                                "provider",
1724                            )
1725                        } else {
1726                            // Provider did not return usage (e.g. streaming without
1727                            // include_usage, or a backend that omits it). Estimate from
1728                            // accumulated char counts using the ~4 chars-per-token heuristic.
1729                            let est_prompt = (estimated_prompt_chars / 4) as u64;
1730                            let est_completion = (completion.len() / 4) as u64;
1731                            tracing::debug!(
1732                                session_id = %session_id,
1733                                provider_id = %provider_id,
1734                                "provider.usage missing from stream; using char-count estimate \
1735                                 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1736                                completion.len()
1737                            );
1738                            (
1739                                est_prompt,
1740                                est_completion,
1741                                est_prompt.saturating_add(est_completion),
1742                                "estimated",
1743                            )
1744                        };
1745                    self.event_bus.publish(EngineEvent::new(
1746                        "provider.usage",
1747                        json!({
1748                            "sessionID": session_id,
1749                            "correlationID": correlation_ref,
1750                            "messageID": user_message_id,
1751                            "promptTokens": prompt_tokens,
1752                            "completionTokens": completion_tokens,
1753                            "totalTokens": total_tokens,
1754                            "usageSource": usage_source,
1755                        }),
1756                    ));
1757                }
1758
1759                if matches!(requested_tool_mode, ToolMode::Required)
1760                    && productive_tool_calls_total == 0
1761                {
1762                    if requested_write_required
1763                        && required_write_retry_count > 0
1764                        && productive_write_tool_calls_total == 0
1765                        && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1766                    {
1767                        latest_required_tool_failure_kind =
1768                            RequiredToolFailureKind::WriteRequiredNotSatisfied;
1769                    }
1770                    if requested_write_required
1771                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1772                    {
1773                        required_write_retry_count += 1;
1774                        followup_context = Some(build_write_required_retry_context(
1775                            &offered_tool_preview,
1776                            latest_required_tool_failure_kind,
1777                            &text,
1778                            &requested_prewrite_requirements,
1779                            productive_workspace_inspection_total > 0,
1780                            productive_concrete_read_total > 0,
1781                            productive_web_research_total > 0,
1782                            successful_web_research_total > 0,
1783                        ));
1784                        continue;
1785                    }
1786                    let progress_made_in_cycle = productive_workspace_inspection_total > 0
1787                        || productive_concrete_read_total > 0
1788                        || productive_web_research_total > 0
1789                        || successful_web_research_total > 0;
1790                    if should_retry_nonproductive_required_tool_cycle(
1791                        requested_write_required,
1792                        false,
1793                        progress_made_in_cycle,
1794                        required_tool_retry_count,
1795                    ) {
1796                        required_tool_retry_count += 1;
1797                        followup_context = Some(build_required_tool_retry_context_for_task(
1798                            &offered_tool_preview,
1799                            latest_required_tool_failure_kind,
1800                            &text,
1801                        ));
1802                        continue;
1803                    }
1804                    completion = required_tool_mode_unsatisfied_completion(
1805                        latest_required_tool_failure_kind,
1806                    );
1807                    if !required_tool_unsatisfied_emitted {
1808                        required_tool_unsatisfied_emitted = true;
1809                        self.event_bus.publish(EngineEvent::new(
1810                            "tool.mode.required.unsatisfied",
1811                            json!({
1812                                "sessionID": session_id,
1813                                "messageID": user_message_id,
1814                                "iteration": iteration,
1815                                "selectedToolCount": allowed_tool_names.len(),
1816                                "offeredToolsPreview": offered_tool_preview,
1817                                "reason": latest_required_tool_failure_kind.code(),
1818                            }),
1819                        ));
1820                    }
1821                    self.event_bus.publish(EngineEvent::new(
1822                        "provider.call.iteration.finish",
1823                        json!({
1824                            "sessionID": session_id,
1825                            "messageID": user_message_id,
1826                            "iteration": iteration,
1827                            "finishReason": "required_tool_unsatisfied",
1828                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1829                            "rejectedToolCalls": 0,
1830                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1831                        }),
1832                    ));
1833                } else {
1834                    if completion.trim().is_empty()
1835                        && !last_tool_outputs.is_empty()
1836                        && requested_write_required
1837                        && empty_completion_retry_count == 0
1838                    {
1839                        empty_completion_retry_count += 1;
1840                        followup_context = Some(build_empty_completion_retry_context(
1841                            &offered_tool_preview,
1842                            &text,
1843                            &requested_prewrite_requirements,
1844                            productive_workspace_inspection_total > 0,
1845                            productive_concrete_read_total > 0,
1846                            productive_web_research_total > 0,
1847                            successful_web_research_total > 0,
1848                        ));
1849                        self.event_bus.publish(EngineEvent::new(
1850                            "provider.call.iteration.finish",
1851                            json!({
1852                                "sessionID": session_id,
1853                                "messageID": user_message_id,
1854                                "iteration": iteration,
1855                                "finishReason": "empty_completion_retry",
1856                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1857                                "rejectedToolCalls": 0,
1858                            }),
1859                        ));
1860                        continue;
1861                    }
1862                    let prewrite_gate = evaluate_prewrite_gate(
1863                        requested_write_required,
1864                        &requested_prewrite_requirements,
1865                        PrewriteProgress {
1866                            productive_write_tool_calls_total,
1867                            productive_workspace_inspection_total,
1868                            productive_concrete_read_total,
1869                            productive_web_research_total,
1870                            successful_web_research_total,
1871                            required_write_retry_count,
1872                            unmet_prewrite_repair_retry_count,
1873                            prewrite_gate_waived,
1874                        },
1875                    );
1876                    if should_start_prewrite_repair_before_first_write(
1877                        requested_prewrite_requirements.repair_on_unmet_requirements,
1878                        productive_write_tool_calls_total,
1879                        prewrite_gate.prewrite_satisfied,
1880                        code_workflow_requested,
1881                    ) && !prewrite_gate_waived
1882                    {
1883                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1884                        if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1885                            unmet_prewrite_repair_retry_count += 1;
1886                            let repair_attempt = unmet_prewrite_repair_retry_count;
1887                            let repair_attempts_remaining =
1888                                prewrite_repair_budget.saturating_sub(repair_attempt);
1889                            followup_context = Some(build_prewrite_repair_retry_context(
1890                                &offered_tool_preview,
1891                                latest_required_tool_failure_kind,
1892                                &text,
1893                                &requested_prewrite_requirements,
1894                                productive_workspace_inspection_total > 0,
1895                                productive_concrete_read_total > 0,
1896                                productive_web_research_total > 0,
1897                                successful_web_research_total > 0,
1898                            ));
1899                            self.event_bus.publish(EngineEvent::new(
1900                                "provider.call.iteration.finish",
1901                                json!({
1902                                    "sessionID": session_id,
1903                                    "messageID": user_message_id,
1904                                    "iteration": iteration,
1905                                    "finishReason": "prewrite_repair_retry",
1906                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1907                                    "rejectedToolCalls": 0,
1908                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1909                                    "repair": prewrite_repair_event_payload(
1910                                        repair_attempt,
1911                                        repair_attempts_remaining,
1912                                        &unmet_prewrite_codes,
1913                                        false,
1914                                    ),
1915                                }),
1916                            ));
1917                            continue;
1918                        }
1919                        if prewrite_fail_closed {
1920                            let repair_attempt = unmet_prewrite_repair_retry_count;
1921                            let repair_attempts_remaining =
1922                                prewrite_repair_budget.saturating_sub(repair_attempt);
1923                            completion = prewrite_requirements_exhausted_completion(
1924                                &unmet_prewrite_codes,
1925                                repair_attempt,
1926                                repair_attempts_remaining,
1927                            );
1928                            self.event_bus.publish(EngineEvent::new(
1929                                "prewrite.gate.strict_mode.blocked",
1930                                json!({
1931                                    "sessionID": session_id,
1932                                    "messageID": user_message_id,
1933                                    "iteration": iteration,
1934                                    "unmetCodes": unmet_prewrite_codes,
1935                                }),
1936                            ));
1937                            self.event_bus.publish(EngineEvent::new(
1938                                "provider.call.iteration.finish",
1939                                json!({
1940                                    "sessionID": session_id,
1941                                    "messageID": user_message_id,
1942                                    "iteration": iteration,
1943                                    "finishReason": "prewrite_requirements_exhausted",
1944                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1945                                    "rejectedToolCalls": 0,
1946                                    "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1947                                    "repair": prewrite_repair_event_payload(
1948                                        repair_attempt,
1949                                        repair_attempts_remaining,
1950                                        &unmet_prewrite_codes,
1951                                        true,
1952                                    ),
1953                                }),
1954                            ));
1955                            break;
1956                        }
1957                        prewrite_gate_waived = true;
1958                        let repair_attempt = unmet_prewrite_repair_retry_count;
1959                        let repair_attempts_remaining =
1960                            prewrite_repair_budget.saturating_sub(repair_attempt);
1961                        followup_context = Some(build_prewrite_waived_write_context(
1962                            &text,
1963                            &unmet_prewrite_codes,
1964                        ));
1965                        self.event_bus.publish(EngineEvent::new(
1966                            "prewrite.gate.waived.write_executed",
1967                            json!({
1968                                "sessionID": session_id,
1969                                "messageID": user_message_id,
1970                                "unmetCodes": unmet_prewrite_codes,
1971                            }),
1972                        ));
1973                        self.event_bus.publish(EngineEvent::new(
1974                            "provider.call.iteration.finish",
1975                            json!({
1976                                "sessionID": session_id,
1977                                "messageID": user_message_id,
1978                                "iteration": iteration,
1979                                "finishReason": "prewrite_gate_waived",
1980                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1981                                "rejectedToolCalls": 0,
1982                                "prewriteGateWaived": true,
1983                                "repair": prewrite_repair_event_payload(
1984                                    repair_attempt,
1985                                    repair_attempts_remaining,
1986                                    &unmet_prewrite_codes,
1987                                    true,
1988                                ),
1989                            }),
1990                        ));
1991                        continue;
1992                    }
1993                    if prewrite_gate_waived
1994                        && requested_write_required
1995                        && productive_write_tool_calls_total == 0
1996                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1997                    {
1998                        required_write_retry_count += 1;
1999                        followup_context = Some(build_write_required_retry_context(
2000                            &offered_tool_preview,
2001                            latest_required_tool_failure_kind,
2002                            &text,
2003                            &requested_prewrite_requirements,
2004                            productive_workspace_inspection_total > 0,
2005                            productive_concrete_read_total > 0,
2006                            productive_web_research_total > 0,
2007                            successful_web_research_total > 0,
2008                        ));
2009                        self.event_bus.publish(EngineEvent::new(
2010                            "provider.call.iteration.finish",
2011                            json!({
2012                                "sessionID": session_id,
2013                                "messageID": user_message_id,
2014                                "iteration": iteration,
2015                                "finishReason": "waived_write_retry",
2016                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
2017                                "rejectedToolCalls": 0,
2018                            }),
2019                        ));
2020                        continue;
2021                    }
2022                    self.event_bus.publish(EngineEvent::new(
2023                        "provider.call.iteration.finish",
2024                        json!({
2025                            "sessionID": session_id,
2026                            "messageID": user_message_id,
2027                            "iteration": iteration,
2028                            "finishReason": "provider_completion",
2029                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
2030                            "rejectedToolCalls": 0,
2031                        }),
2032                    ));
2033                }
2034                break;
2035            }
2036            if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
2037            {
2038                completion =
2039                    required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
2040                if !required_tool_unsatisfied_emitted {
2041                    self.event_bus.publish(EngineEvent::new(
2042                        "tool.mode.required.unsatisfied",
2043                        json!({
2044                            "sessionID": session_id,
2045                            "messageID": user_message_id,
2046                            "selectedToolCount": tool_call_counts.len(),
2047                            "reason": latest_required_tool_failure_kind.code(),
2048                        }),
2049                    ));
2050                }
2051            }
2052            if completion.trim().is_empty()
2053                && !last_tool_outputs.is_empty()
2054                && requested_write_required
2055                && productive_write_tool_calls_total > 0
2056            {
2057                let final_prewrite_satisfied = evaluate_prewrite_gate(
2058                    requested_write_required,
2059                    &requested_prewrite_requirements,
2060                    PrewriteProgress {
2061                        productive_write_tool_calls_total,
2062                        productive_workspace_inspection_total,
2063                        productive_concrete_read_total,
2064                        productive_web_research_total,
2065                        successful_web_research_total,
2066                        required_write_retry_count,
2067                        unmet_prewrite_repair_retry_count,
2068                        prewrite_gate_waived,
2069                    },
2070                )
2071                .prewrite_satisfied;
2072                if prewrite_fail_closed && !final_prewrite_satisfied {
2073                    let unmet_prewrite_codes = evaluate_prewrite_gate(
2074                        requested_write_required,
2075                        &requested_prewrite_requirements,
2076                        PrewriteProgress {
2077                            productive_write_tool_calls_total,
2078                            productive_workspace_inspection_total,
2079                            productive_concrete_read_total,
2080                            productive_web_research_total,
2081                            successful_web_research_total,
2082                            required_write_retry_count,
2083                            unmet_prewrite_repair_retry_count,
2084                            prewrite_gate_waived,
2085                        },
2086                    )
2087                    .unmet_codes;
2088                    completion = prewrite_requirements_exhausted_completion(
2089                        &unmet_prewrite_codes,
2090                        unmet_prewrite_repair_retry_count,
2091                        prewrite_repair_budget.saturating_sub(unmet_prewrite_repair_retry_count),
2092                    );
2093                } else {
2094                    completion = synthesize_artifact_write_completion_from_tool_state(
2095                        &text,
2096                        final_prewrite_satisfied,
2097                        prewrite_gate_waived,
2098                    );
2099                }
2100            }
2101            if completion.trim().is_empty()
2102                && !last_tool_outputs.is_empty()
2103                && should_generate_post_tool_final_narrative(
2104                    requested_tool_mode,
2105                    productive_tool_calls_total,
2106                )
2107            {
2108                if let Some(narrative) = self
2109                    .generate_final_narrative_without_tools(
2110                        &session_id,
2111                        &active_agent,
2112                        Some(provider_id.as_str()),
2113                        Some(model_id_value.as_str()),
2114                        cancel.clone(),
2115                        &last_tool_outputs,
2116                    )
2117                    .await
2118                {
2119                    completion = narrative;
2120                }
2121            }
2122            if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
2123                if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
2124                    completion = summary;
2125                } else if let Some(hint) =
2126                    summarize_terminal_tool_failure_for_user(&last_tool_outputs)
2127                {
2128                    completion = hint;
2129                } else {
2130                    let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
2131                    if preview.trim().is_empty() {
2132                        completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
2133                    } else {
2134                        completion = format!(
2135                            "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
2136                            preview
2137                        );
2138                    }
2139                }
2140            }
2141            if completion.trim().is_empty() {
2142                completion =
2143                    "I couldn't produce a final response for that run. Please retry your request."
2144                        .to_string();
2145            }
2146            // M-3: Gate fires when email was requested AND email-action tools were
2147            // actually offered to the agent during at least one iteration but no
2148            // email action tool was executed. The completion text is NOT consulted —
2149            // this prevents the model from bypassing the gate by rephrasing, and
2150            // prevents false positives on legitimate text containing email keywords.
2151            // Skipping when no email tools were ever offered avoids clobbering
2152            // legitimate output with a delivery-failure message the agent could not
2153            // have avoided (e.g. prompts that mention gmail tool names as context).
2154            if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
2155                let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
2156                    .to_string();
2157                if let Some(note) = latest_email_action_note.as_ref() {
2158                    fallback.push_str("\n\nLast email tool status: ");
2159                    fallback.push_str(note);
2160                }
2161                fallback.push_str(
2162                    "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
2163                );
2164                completion = fallback;
2165            }
2166            completion = strip_model_control_markers(&completion);
2167            truncate_text(&completion, 16_000)
2168        };
2169        emit_event(
2170            Level::INFO,
2171            ProcessKind::Engine,
2172            ObservabilityEvent {
2173                event: "provider.call.finish",
2174                component: "engine.loop",
2175                correlation_id: correlation_ref,
2176                session_id: Some(&session_id),
2177                run_id: None,
2178                message_id: Some(&user_message_id),
2179                provider_id: Some(provider_id.as_str()),
2180                model_id,
2181                status: Some("ok"),
2182                error_code: None,
2183                detail: Some("provider stream complete"),
2184            },
2185        );
2186        if active_agent.name.eq_ignore_ascii_case("plan") {
2187            emit_plan_todo_fallback(
2188                self.storage.clone(),
2189                &self.event_bus,
2190                &session_id,
2191                &user_message_id,
2192                &completion,
2193            )
2194            .await;
2195            let todos_after_fallback = self.storage.get_todos(&session_id).await;
2196            if todos_after_fallback.is_empty() && !question_tool_used {
2197                emit_plan_question_fallback(
2198                    self.storage.clone(),
2199                    &self.event_bus,
2200                    &session_id,
2201                    &user_message_id,
2202                    &completion,
2203                )
2204                .await;
2205            }
2206        }
2207        if cancel.is_cancelled() {
2208            self.event_bus.publish(EngineEvent::new(
2209                "session.status",
2210                json!({"sessionID": session_id, "status":"cancelled"}),
2211            ));
2212            self.cancellations.remove(&session_id).await;
2213            return Ok(());
2214        }
2215        let assistant = Message::new(
2216            MessageRole::Assistant,
2217            vec![MessagePart::Text {
2218                text: completion.clone(),
2219            }],
2220        );
2221        let assistant_message_id = assistant.id.clone();
2222        self.storage.append_message(&session_id, assistant).await?;
2223        let final_part = WireMessagePart::text(
2224            &session_id,
2225            &assistant_message_id,
2226            truncate_text(&completion, 16_000),
2227        );
2228        self.event_bus.publish(EngineEvent::new(
2229            "message.part.updated",
2230            json!({"part": final_part}),
2231        ));
2232        self.event_bus.publish(EngineEvent::new(
2233            "session.updated",
2234            json!({"sessionID": session_id, "status":"idle"}),
2235        ));
2236        self.event_bus.publish(EngineEvent::new(
2237            "session.status",
2238            json!({"sessionID": session_id, "status":"idle"}),
2239        ));
2240        self.cancellations.remove(&session_id).await;
2241        Ok(())
2242    }
2243}