Skip to main content

tandem_core/engine_loop/
prompt_execution.rs

1use super::*;
2
3impl EngineLoop {
4    pub async fn run_prompt_async_with_context(
5        &self,
6        session_id: String,
7        req: SendMessageRequest,
8        correlation_id: Option<String>,
9    ) -> anyhow::Result<()> {
10        let session_model = self
11            .storage
12            .get_session(&session_id)
13            .await
14            .and_then(|s| s.model);
15        let (provider_id, model_id_value) =
16            resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17                anyhow::anyhow!(
18                "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19            )
20            })?;
21        let correlation_ref = correlation_id.as_deref();
22        let model_id = Some(model_id_value.as_str());
23        let cancel = self.cancellations.create(&session_id).await;
24        emit_event(
25            Level::INFO,
26            ProcessKind::Engine,
27            ObservabilityEvent {
28                event: "provider.call.start",
29                component: "engine.loop",
30                correlation_id: correlation_ref,
31                session_id: Some(&session_id),
32                run_id: None,
33                message_id: None,
34                provider_id: Some(provider_id.as_str()),
35                model_id,
36                status: Some("start"),
37                error_code: None,
38                detail: Some("run_prompt_async dispatch"),
39            },
40        );
41        self.event_bus.publish(EngineEvent::new(
42            "session.status",
43            json!({"sessionID": session_id, "status":"running"}),
44        ));
45        let request_parts = req.parts.clone();
46        let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47        let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48        let requested_write_required = req.write_required.unwrap_or(false);
49        let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50        let prewrite_repair_budget = prewrite_repair_retry_budget(&requested_prewrite_requirements);
51        let prewrite_fail_closed = prewrite_gate_strict_mode(&requested_prewrite_requirements);
52        let request_tool_allowlist = req
53            .tool_allowlist
54            .clone()
55            .unwrap_or_default()
56            .into_iter()
57            .map(|tool| normalize_tool_name(&tool))
58            .filter(|tool| !tool.trim().is_empty())
59            .collect::<HashSet<_>>();
60        // Propagate per-request tool allowlist to session-level enforcement so
61        // that execution-time checks (and mcp_list scoping) also respect it.
62        if !request_tool_allowlist.is_empty() {
63            self.set_session_allowed_tools(
64                &session_id,
65                request_tool_allowlist.iter().cloned().collect(),
66            )
67            .await;
68        }
69        let text = req
70            .parts
71            .iter()
72            .map(|p| match p {
73                MessagePartInput::Text { text } => text.clone(),
74                MessagePartInput::File {
75                    mime,
76                    filename,
77                    url,
78                } => format!(
79                    "[file mime={} name={} url={}]",
80                    mime,
81                    filename.clone().unwrap_or_else(|| "unknown".to_string()),
82                    url
83                ),
84            })
85            .collect::<Vec<_>>()
86            .join("\n");
87        let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
88        self.auto_rename_session_from_user_text(&session_id, &text)
89            .await;
90        let active_agent = self.agents.get(req.agent.as_deref()).await;
91        let mut user_message_id = self
92            .find_recent_matching_user_message_id(&session_id, &text)
93            .await;
94        if user_message_id.is_none() {
95            let user_message = Message::new(
96                MessageRole::User,
97                vec![MessagePart::Text { text: text.clone() }],
98            );
99            let created_message_id = user_message.id.clone();
100            self.storage
101                .append_message(&session_id, user_message)
102                .await?;
103
104            let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
105            self.event_bus.publish(EngineEvent::new(
106                "message.part.updated",
107                json!({
108                    "part": user_part,
109                    "delta": text,
110                    "agent": active_agent.name
111                }),
112            ));
113            user_message_id = Some(created_message_id);
114        }
115        let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
116
117        if cancel.is_cancelled() {
118            self.event_bus.publish(EngineEvent::new(
119                "session.status",
120                json!({"sessionID": session_id, "status":"cancelled"}),
121            ));
122            self.cancellations.remove(&session_id).await;
123            return Ok(());
124        }
125
126        let mut question_tool_used = false;
127        let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
128            if normalize_tool_name(&tool) == "question" {
129                question_tool_used = true;
130            }
131            if !agent_can_use_tool(&active_agent, &tool) {
132                format!(
133                    "Tool `{tool}` is not enabled for agent `{}`.",
134                    active_agent.name
135                )
136            } else {
137                self.execute_tool_with_permission(
138                    &session_id,
139                    &user_message_id,
140                    tool.clone(),
141                    args,
142                    None,
143                    active_agent.skills.as_deref(),
144                    &text,
145                    requested_write_required,
146                    None,
147                    cancel.clone(),
148                )
149                .await?
150                .unwrap_or_default()
151            }
152        } else {
153            let mut completion = String::new();
154            let mut max_iterations = max_tool_iterations();
155            let mut followup_context: Option<String> = None;
156            let mut last_tool_outputs: Vec<String> = Vec::new();
157            let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
158            let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
159            let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
160            let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
161            let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
162            let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
163            let mut websearch_query_blocked = false;
164            let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
165            let mut pack_builder_executed = false;
166            let mut auto_workspace_probe_attempted = false;
167            let mut productive_tool_calls_total = 0usize;
168            let mut productive_write_tool_calls_total = 0usize;
169            let mut productive_workspace_inspection_total = 0usize;
170            let mut productive_web_research_total = 0usize;
171            let mut productive_concrete_read_total = 0usize;
172            let mut successful_web_research_total = 0usize;
173            let mut required_tool_retry_count = 0usize;
174            let mut required_write_retry_count = 0usize;
175            let mut unmet_prewrite_repair_retry_count = 0usize;
176            let mut empty_completion_retry_count = 0usize;
177            let mut prewrite_gate_waived = false;
178            let mut invalid_tool_args_retry_count = 0usize;
179            let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
180            let mut required_tool_unsatisfied_emitted = false;
181            let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
182            let email_delivery_requested = requires_email_delivery_prompt(&text);
183            let web_research_requested = requires_web_research_prompt(&text);
184            let code_workflow_requested = infer_code_workflow_from_text(&text);
185            let mut email_action_executed = false;
186            let mut latest_email_action_note: Option<String> = None;
187            let mut email_tools_ever_offered = false;
188            let intent = classify_intent(&text);
189            let router_enabled = tool_router_enabled();
190            let retrieval_enabled = semantic_tool_retrieval_enabled();
191            let retrieval_k = semantic_tool_retrieval_k();
192            let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
193                self.tools.mcp_server_names().await
194            } else {
195                Vec::new()
196            };
197            let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
198            let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
199                && runtime_attachments.is_empty()
200                && is_short_simple_prompt(&text)
201                && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
202
203            while max_iterations > 0 && !cancel.is_cancelled() {
204                let iteration = 26usize.saturating_sub(max_iterations);
205                max_iterations -= 1;
206                let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
207                    ChatHistoryProfile::Full
208                } else if matches!(requested_context_mode, ContextMode::Compact)
209                    || context_is_auto_compact
210                {
211                    ChatHistoryProfile::Compact
212                } else {
213                    ChatHistoryProfile::Standard
214                };
215                let mut messages =
216                    load_chat_history(self.storage.clone(), &session_id, context_profile).await;
217                if iteration == 1 && !runtime_attachments.is_empty() {
218                    attach_to_last_user_message(&mut messages, &runtime_attachments);
219                }
220                let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
221                self.event_bus.publish(EngineEvent::new(
222                    "context.profile.selected",
223                    json!({
224                        "sessionID": session_id,
225                        "messageID": user_message_id,
226                        "iteration": iteration,
227                        "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
228                        "historyMessageCount": messages.len(),
229                        "historyCharCount": history_char_count,
230                        "memoryInjected": false
231                    }),
232                ));
233                let mut system_parts = vec![tandem_runtime_system_prompt(
234                    &self.host_runtime_context,
235                    &mcp_server_names,
236                )];
237                if let Some(system) = active_agent.system_prompt.as_ref() {
238                    system_parts.push(system.clone());
239                }
240                messages.insert(
241                    0,
242                    ChatMessage {
243                        role: "system".to_string(),
244                        content: system_parts.join("\n\n"),
245                        attachments: Vec::new(),
246                    },
247                );
248                if let Some(extra) = followup_context.take() {
249                    messages.push(ChatMessage {
250                        role: "user".to_string(),
251                        content: extra,
252                        attachments: Vec::new(),
253                    });
254                }
255                if let Some(hook) = self.prompt_context_hook.read().await.clone() {
256                    let ctx = PromptContextHookContext {
257                        session_id: session_id.clone(),
258                        message_id: user_message_id.clone(),
259                        provider_id: provider_id.clone(),
260                        model_id: model_id_value.clone(),
261                        iteration,
262                    };
263                    let hook_timeout =
264                        Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
265                    match tokio::time::timeout(
266                        hook_timeout,
267                        hook.augment_provider_messages(ctx, messages.clone()),
268                    )
269                    .await
270                    {
271                        Ok(Ok(augmented)) => {
272                            messages = augmented;
273                        }
274                        Ok(Err(err)) => {
275                            self.event_bus.publish(EngineEvent::new(
276                                "memory.context.error",
277                                json!({
278                                    "sessionID": session_id,
279                                    "messageID": user_message_id,
280                                    "iteration": iteration,
281                                    "error": truncate_text(&err.to_string(), 500),
282                                }),
283                            ));
284                        }
285                        Err(_) => {
286                            self.event_bus.publish(EngineEvent::new(
287                                "memory.context.error",
288                                json!({
289                                    "sessionID": session_id,
290                                    "messageID": user_message_id,
291                                    "iteration": iteration,
292                                    "error": format!(
293                                        "prompt context hook timeout after {} ms",
294                                        hook_timeout.as_millis()
295                                    ),
296                                }),
297                            ));
298                        }
299                    }
300                }
301                let all_tools = self.tools.list().await;
302                let mut retrieval_fallback_reason: Option<&'static str> = None;
303                let mut candidate_tools = if retrieval_enabled {
304                    self.tools.retrieve(&text, retrieval_k).await
305                } else {
306                    all_tools.clone()
307                };
308                if retrieval_enabled {
309                    if candidate_tools.is_empty() && !all_tools.is_empty() {
310                        candidate_tools = all_tools.clone();
311                        retrieval_fallback_reason = Some("retrieval_empty_result");
312                    } else if web_research_requested
313                        && has_web_research_tools(&all_tools)
314                        && !has_web_research_tools(&candidate_tools)
315                        && required_write_retry_count == 0
316                    {
317                        candidate_tools = all_tools.clone();
318                        retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
319                    } else if email_delivery_requested
320                        && has_email_action_tools(&all_tools)
321                        && !has_email_action_tools(&candidate_tools)
322                    {
323                        candidate_tools = all_tools.clone();
324                        retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
325                    }
326                }
327                let mut tool_schemas = if !router_enabled {
328                    candidate_tools
329                } else {
330                    match requested_tool_mode {
331                        ToolMode::None => Vec::new(),
332                        ToolMode::Required => select_tool_subset(
333                            candidate_tools,
334                            intent,
335                            &request_tool_allowlist,
336                            iteration > 1,
337                        ),
338                        ToolMode::Auto => {
339                            if !auto_tools_escalated {
340                                Vec::new()
341                            } else {
342                                select_tool_subset(
343                                    candidate_tools,
344                                    intent,
345                                    &request_tool_allowlist,
346                                    iteration > 1,
347                                )
348                            }
349                        }
350                    }
351                };
352                let mut policy_patterns =
353                    request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
354                if let Some(agent_tools) = active_agent.tools.as_ref() {
355                    policy_patterns
356                        .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
357                }
358                let session_allowed_tools = self
359                    .session_allowed_tools
360                    .read()
361                    .await
362                    .get(&session_id)
363                    .cloned()
364                    .unwrap_or_default();
365                policy_patterns.extend(session_allowed_tools.iter().cloned());
366                if !policy_patterns.is_empty() {
367                    let mut included = tool_schemas
368                        .iter()
369                        .map(|schema| normalize_tool_name(&schema.name))
370                        .collect::<HashSet<_>>();
371                    for schema in &all_tools {
372                        let normalized = normalize_tool_name(&schema.name);
373                        if policy_patterns
374                            .iter()
375                            .any(|pattern| tool_name_matches_policy(pattern, &normalized))
376                            && included.insert(normalized)
377                        {
378                            tool_schemas.push(schema.clone());
379                        }
380                    }
381                }
382                if !request_tool_allowlist.is_empty() {
383                    tool_schemas.retain(|schema| {
384                        let tool = normalize_tool_name(&schema.name);
385                        request_tool_allowlist
386                            .iter()
387                            .any(|pattern| tool_name_matches_policy(pattern, &tool))
388                    });
389                }
390                let prewrite_gate = evaluate_prewrite_gate(
391                    requested_write_required,
392                    &requested_prewrite_requirements,
393                    PrewriteProgress {
394                        productive_write_tool_calls_total,
395                        productive_workspace_inspection_total,
396                        productive_concrete_read_total,
397                        productive_web_research_total,
398                        successful_web_research_total,
399                        required_write_retry_count,
400                        unmet_prewrite_repair_retry_count,
401                        prewrite_gate_waived,
402                    },
403                );
404                let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
405                let prewrite_gate_write = prewrite_gate.gate_write;
406                let force_write_only_retry = prewrite_gate.force_write_only_retry;
407                let allow_repair_tools = prewrite_gate.allow_repair_tools;
408                if prewrite_gate_write {
409                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
410                }
411                if requested_prewrite_requirements.repair_on_unmet_requirements
412                    && productive_write_tool_calls_total >= 3
413                {
414                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
415                }
416                if allow_repair_tools {
417                    let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
418                    let repair_tools = tool_schemas
419                        .iter()
420                        .filter(|schema| {
421                            tool_matches_unmet_prewrite_repair_requirement(
422                                &schema.name,
423                                &unmet_prewrite_codes,
424                                productive_workspace_inspection_total > 0,
425                            )
426                        })
427                        .cloned()
428                        .collect::<Vec<_>>();
429                    if !repair_tools.is_empty() {
430                        tool_schemas = repair_tools;
431                    }
432                }
433                if force_write_only_retry && !allow_repair_tools {
434                    tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
435                }
436                if active_agent.tools.is_some() {
437                    tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
438                }
439                tool_schemas.retain(|schema| {
440                    let normalized = normalize_tool_name(&schema.name);
441                    if let Some(server) = mcp_server_from_tool_name(&normalized) {
442                        !blocked_mcp_servers.contains(server)
443                    } else {
444                        true
445                    }
446                });
447                if let Some(allowed_tools) = self
448                    .session_allowed_tools
449                    .read()
450                    .await
451                    .get(&session_id)
452                    .cloned()
453                {
454                    if !allowed_tools.is_empty() {
455                        tool_schemas.retain(|schema| {
456                            let normalized = normalize_tool_name(&schema.name);
457                            any_policy_matches(&allowed_tools, &normalized)
458                        });
459                    }
460                }
461                if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
462                    let detail = validation_err.to_string();
463                    emit_event(
464                        Level::ERROR,
465                        ProcessKind::Engine,
466                        ObservabilityEvent {
467                            event: "provider.call.error",
468                            component: "engine.loop",
469                            correlation_id: correlation_ref,
470                            session_id: Some(&session_id),
471                            run_id: None,
472                            message_id: Some(&user_message_id),
473                            provider_id: Some(provider_id.as_str()),
474                            model_id,
475                            status: Some("failed"),
476                            error_code: Some("TOOL_SCHEMA_INVALID"),
477                            detail: Some(&detail),
478                        },
479                    );
480                    anyhow::bail!("{detail}");
481                }
482                let routing_decision = ToolRoutingDecision {
483                    pass: if auto_tools_escalated { 2 } else { 1 },
484                    mode: match requested_tool_mode {
485                        ToolMode::Auto => default_mode_name(),
486                        ToolMode::None => "none",
487                        ToolMode::Required => "required",
488                    },
489                    intent,
490                    selected_count: tool_schemas.len(),
491                    total_available_count: all_tools.len(),
492                    mcp_included: tool_schemas
493                        .iter()
494                        .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
495                };
496                self.event_bus.publish(EngineEvent::new(
497                    "tool.routing.decision",
498                    json!({
499                        "sessionID": session_id,
500                        "messageID": user_message_id,
501                        "iteration": iteration,
502                        "pass": routing_decision.pass,
503                        "mode": routing_decision.mode,
504                        "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
505                        "selectedToolCount": routing_decision.selected_count,
506                        "totalAvailableTools": routing_decision.total_available_count,
507                        "mcpIncluded": routing_decision.mcp_included,
508                        "retrievalEnabled": retrieval_enabled,
509                        "retrievalK": retrieval_k,
510                        "fallbackToFullTools": retrieval_fallback_reason.is_some(),
511                        "fallbackReason": retrieval_fallback_reason
512                    }),
513                ));
514                let allowed_tool_names = tool_schemas
515                    .iter()
516                    .map(|schema| normalize_tool_name(&schema.name))
517                    .collect::<HashSet<_>>();
518                if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
519                    email_tools_ever_offered = true;
520                }
521                let offered_tool_preview = tool_schemas
522                    .iter()
523                    .take(8)
524                    .map(|schema| normalize_tool_name(&schema.name))
525                    .collect::<Vec<_>>()
526                    .join(", ");
527                self.event_bus.publish(EngineEvent::new(
528                    "provider.call.iteration.start",
529                    json!({
530                        "sessionID": session_id,
531                        "messageID": user_message_id,
532                        "iteration": iteration,
533                        "selectedToolCount": allowed_tool_names.len(),
534                    }),
535                ));
536                let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
537                let provider_connect_timeout =
538                    Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
539                let stream_result = tokio::time::timeout(
540                    provider_connect_timeout,
541                    self.providers.stream_for_provider(
542                        Some(provider_id.as_str()),
543                        Some(model_id_value.as_str()),
544                        messages,
545                        requested_tool_mode.clone(),
546                        Some(tool_schemas),
547                        cancel.clone(),
548                    ),
549                )
550                .await
551                .map_err(|_| {
552                    anyhow::anyhow!(
553                        "provider stream connect timeout after {} ms",
554                        provider_connect_timeout.as_millis()
555                    )
556                })
557                .and_then(|result| result);
558                let stream = match stream_result {
559                    Ok(stream) => stream,
560                    Err(err) => {
561                        let error_text = err.to_string();
562                        let error_code = provider_error_code(&error_text);
563                        let detail = truncate_text(&error_text, 500);
564                        emit_event(
565                            Level::ERROR,
566                            ProcessKind::Engine,
567                            ObservabilityEvent {
568                                event: "provider.call.error",
569                                component: "engine.loop",
570                                correlation_id: correlation_ref,
571                                session_id: Some(&session_id),
572                                run_id: None,
573                                message_id: Some(&user_message_id),
574                                provider_id: Some(provider_id.as_str()),
575                                model_id,
576                                status: Some("failed"),
577                                error_code: Some(error_code),
578                                detail: Some(&detail),
579                            },
580                        );
581                        self.event_bus.publish(EngineEvent::new(
582                            "provider.call.iteration.error",
583                            json!({
584                                "sessionID": session_id,
585                                "messageID": user_message_id,
586                                "iteration": iteration,
587                                "error": detail,
588                            }),
589                        ));
590                        return Err(err);
591                    }
592                };
593                tokio::pin!(stream);
594                completion.clear();
595                let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
596                let mut provider_usage: Option<TokenUsage> = None;
597                let mut accepted_tool_calls_in_cycle = 0usize;
598                let provider_idle_timeout =
599                    Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
600                loop {
601                    let next_chunk_result =
602                        tokio::time::timeout(provider_idle_timeout, stream.next())
603                            .await
604                            .map_err(|_| {
605                                anyhow::anyhow!(
606                                    "provider stream idle timeout after {} ms",
607                                    provider_idle_timeout.as_millis()
608                                )
609                            });
610                    let next_chunk = match next_chunk_result {
611                        Ok(next_chunk) => next_chunk,
612                        Err(err) => {
613                            self.event_bus.publish(EngineEvent::new(
614                                "provider.call.iteration.error",
615                                json!({
616                                    "sessionID": session_id,
617                                    "messageID": user_message_id,
618                                    "iteration": iteration,
619                                    "error": truncate_text(&err.to_string(), 500),
620                                }),
621                            ));
622                            return Err(err);
623                        }
624                    };
625                    let Some(chunk) = next_chunk else {
626                        break;
627                    };
628                    let chunk = match chunk {
629                        Ok(chunk) => chunk,
630                        Err(err) => {
631                            let error_text = err.to_string();
632                            let error_code = provider_error_code(&error_text);
633                            let detail = truncate_text(&error_text, 500);
634                            emit_event(
635                                Level::ERROR,
636                                ProcessKind::Engine,
637                                ObservabilityEvent {
638                                    event: "provider.call.error",
639                                    component: "engine.loop",
640                                    correlation_id: correlation_ref,
641                                    session_id: Some(&session_id),
642                                    run_id: None,
643                                    message_id: Some(&user_message_id),
644                                    provider_id: Some(provider_id.as_str()),
645                                    model_id,
646                                    status: Some("failed"),
647                                    error_code: Some(error_code),
648                                    detail: Some(&detail),
649                                },
650                            );
651                            self.event_bus.publish(EngineEvent::new(
652                                "provider.call.iteration.error",
653                                json!({
654                                    "sessionID": session_id,
655                                    "messageID": user_message_id,
656                                    "iteration": iteration,
657                                    "error": detail,
658                                }),
659                            ));
660                            return Err(anyhow::anyhow!(
661                                "provider stream chunk error: {error_text}"
662                            ));
663                        }
664                    };
665                    match chunk {
666                        StreamChunk::TextDelta(delta) => {
667                            let delta = strip_model_control_markers(&delta);
668                            if delta.trim().is_empty() {
669                                continue;
670                            }
671                            if completion.is_empty() {
672                                emit_event(
673                                    Level::INFO,
674                                    ProcessKind::Engine,
675                                    ObservabilityEvent {
676                                        event: "provider.call.first_byte",
677                                        component: "engine.loop",
678                                        correlation_id: correlation_ref,
679                                        session_id: Some(&session_id),
680                                        run_id: None,
681                                        message_id: Some(&user_message_id),
682                                        provider_id: Some(provider_id.as_str()),
683                                        model_id,
684                                        status: Some("streaming"),
685                                        error_code: None,
686                                        detail: Some("first text delta"),
687                                    },
688                                );
689                            }
690                            completion.push_str(&delta);
691                            let delta = truncate_text(&delta, 4_000);
692                            let delta_part =
693                                WireMessagePart::text(&session_id, &user_message_id, delta.clone());
694                            self.event_bus.publish(EngineEvent::new(
695                                "message.part.updated",
696                                json!({"part": delta_part, "delta": delta}),
697                            ));
698                        }
699                        StreamChunk::ReasoningDelta(_reasoning) => {}
700                        StreamChunk::Done {
701                            finish_reason: _,
702                            usage,
703                        } => {
704                            if usage.is_some() {
705                                provider_usage = usage;
706                            }
707                            break;
708                        }
709                        StreamChunk::ToolCallStart { id, name } => {
710                            let entry = streamed_tool_calls.entry(id).or_default();
711                            if entry.name.is_empty() {
712                                entry.name = name;
713                            }
714                        }
715                        StreamChunk::ToolCallDelta { id, args_delta } => {
716                            let entry = streamed_tool_calls.entry(id.clone()).or_default();
717                            entry.args.push_str(&args_delta);
718                            let tool_name = if entry.name.trim().is_empty() {
719                                "tool".to_string()
720                            } else {
721                                normalize_tool_name(&entry.name)
722                            };
723                            let parsed_preview = if entry.name.trim().is_empty() {
724                                Value::String(truncate_text(&entry.args, 1_000))
725                            } else {
726                                parse_streamed_tool_args(&tool_name, &entry.args)
727                            };
728                            let mut tool_part = WireMessagePart::tool_invocation(
729                                &session_id,
730                                &user_message_id,
731                                tool_name.clone(),
732                                parsed_preview.clone(),
733                            );
734                            tool_part.id = Some(id.clone());
735                            if tool_name == "write" {
736                                tracing::info!(
737                                    session_id = %session_id,
738                                    message_id = %user_message_id,
739                                    tool_call_id = %id,
740                                    args_delta_len = args_delta.len(),
741                                    accumulated_args_len = entry.args.len(),
742                                    parsed_preview_empty = parsed_preview.is_null()
743                                        || parsed_preview.as_object().is_some_and(|value| value.is_empty())
744                                        || parsed_preview
745                                            .as_str()
746                                            .map(|value| value.trim().is_empty())
747                                            .unwrap_or(false),
748                                    "streamed write tool args delta received"
749                                );
750                            }
751                            self.event_bus.publish(EngineEvent::new(
752                                "message.part.updated",
753                                json!({
754                                    "part": tool_part,
755                                    "toolCallDelta": {
756                                        "id": id,
757                                        "tool": tool_name,
758                                        "argsDelta": truncate_text(&args_delta, 1_000),
759                                        "rawArgsPreview": truncate_text(&entry.args, 2_000),
760                                        "parsedArgsPreview": parsed_preview
761                                    }
762                                }),
763                            ));
764                        }
765                        StreamChunk::ToolCallEnd { id: _ } => {}
766                    }
767                    if cancel.is_cancelled() {
768                        break;
769                    }
770                }
771
772                let streamed_tool_call_count = streamed_tool_calls.len();
773                let streamed_tool_call_parse_failed = streamed_tool_calls
774                    .values()
775                    .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
776                let mut tool_calls = streamed_tool_calls
777                    .into_iter()
778                    .filter_map(|(call_id, call)| {
779                        if call.name.trim().is_empty() {
780                            return None;
781                        }
782                        let tool_name = normalize_tool_name(&call.name);
783                        let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
784                        Some(ParsedToolCall {
785                            tool: tool_name,
786                            args: parsed_args,
787                            call_id: Some(call_id),
788                        })
789                    })
790                    .collect::<Vec<_>>();
791                if tool_calls.is_empty() {
792                    tool_calls = parse_tool_invocations_from_response(&completion)
793                        .into_iter()
794                        .map(|(tool, args)| ParsedToolCall {
795                            tool,
796                            args,
797                            call_id: None,
798                        })
799                        .collect::<Vec<_>>();
800                }
801                let provider_tool_parse_failed = tool_calls.is_empty()
802                    && (streamed_tool_call_parse_failed
803                        || (streamed_tool_call_count > 0
804                            && looks_like_unparsed_tool_payload(&completion))
805                        || looks_like_unparsed_tool_payload(&completion));
806                if provider_tool_parse_failed {
807                    latest_required_tool_failure_kind =
808                        RequiredToolFailureKind::ToolCallParseFailed;
809                } else if tool_calls.is_empty() {
810                    latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
811                }
812                if router_enabled
813                    && matches!(requested_tool_mode, ToolMode::Auto)
814                    && !auto_tools_escalated
815                    && iteration == 1
816                    && should_escalate_auto_tools(intent, &text, &completion)
817                {
818                    auto_tools_escalated = true;
819                    followup_context = Some(
820                        "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
821                            .to_string(),
822                    );
823                    self.event_bus.publish(EngineEvent::new(
824                        "provider.call.iteration.finish",
825                        json!({
826                            "sessionID": session_id,
827                            "messageID": user_message_id,
828                            "iteration": iteration,
829                            "finishReason": "auto_escalate",
830                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
831                            "rejectedToolCalls": 0,
832                        }),
833                    ));
834                    continue;
835                }
836                if tool_calls.is_empty()
837                    && !auto_workspace_probe_attempted
838                    && should_force_workspace_probe(&text, &completion)
839                    && allowed_tool_names.contains("glob")
840                {
841                    auto_workspace_probe_attempted = true;
842                    tool_calls = vec![ParsedToolCall {
843                        tool: "glob".to_string(),
844                        args: json!({ "pattern": "*" }),
845                        call_id: None,
846                    }];
847                }
848                if !tool_calls.is_empty() {
849                    let saw_tool_call_candidate = true;
850                    let mut outputs = Vec::new();
851                    let mut executed_productive_tool = false;
852                    let mut write_tool_attempted_in_cycle = false;
853                    let mut auth_required_hit_in_cycle = false;
854                    let mut guard_budget_hit_in_cycle = false;
855                    let mut duplicate_signature_hit_in_cycle = false;
856                    let mut rejected_tool_call_in_cycle = false;
857                    for ParsedToolCall {
858                        tool,
859                        args,
860                        call_id,
861                    } in tool_calls
862                    {
863                        if !agent_can_use_tool(&active_agent, &tool) {
864                            rejected_tool_call_in_cycle = true;
865                            continue;
866                        }
867                        let tool_key = normalize_tool_name(&tool);
868                        if is_workspace_write_tool(&tool_key) {
869                            write_tool_attempted_in_cycle = true;
870                        }
871                        if !allowed_tool_names.contains(&tool_key) {
872                            rejected_tool_call_in_cycle = true;
873                            let note = if offered_tool_preview.is_empty() {
874                                format!(
875                                    "Tool `{}` call skipped: it is not available in this turn.",
876                                    tool_key
877                                )
878                            } else {
879                                format!(
880                                    "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
881                                    tool_key, offered_tool_preview
882                                )
883                            };
884                            self.event_bus.publish(EngineEvent::new(
885                                "tool.call.rejected_unoffered",
886                                json!({
887                                    "sessionID": session_id,
888                                    "messageID": user_message_id,
889                                    "iteration": iteration,
890                                    "tool": tool_key,
891                                    "offeredToolCount": allowed_tool_names.len()
892                                }),
893                            ));
894                            if tool_name_looks_like_email_action(&tool_key) {
895                                latest_email_action_note = Some(note.clone());
896                            }
897                            outputs.push(note);
898                            continue;
899                        }
900                        if let Some(server) = mcp_server_from_tool_name(&tool_key) {
901                            if blocked_mcp_servers.contains(server) {
902                                rejected_tool_call_in_cycle = true;
903                                outputs.push(format!(
904                                    "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
905                                    tool_key, server
906                                ));
907                                continue;
908                            }
909                        }
910                        if tool_key == "question" {
911                            question_tool_used = true;
912                        }
913                        if tool_key == "pack_builder" && pack_builder_executed {
914                            rejected_tool_call_in_cycle = true;
915                            outputs.push(
916                                "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
917                                    .to_string(),
918                            );
919                            continue;
920                        }
921                        if websearch_query_blocked && tool_key == "websearch" {
922                            rejected_tool_call_in_cycle = true;
923                            outputs.push(
924                                "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
925                                    .to_string(),
926                            );
927                            continue;
928                        }
929                        let mut effective_args = args.clone();
930                        if tool_key == "todo_write" {
931                            effective_args = normalize_todo_write_args(effective_args, &completion);
932                            if is_empty_todo_write_args(&effective_args) {
933                                rejected_tool_call_in_cycle = true;
934                                outputs.push(
935                                    "Tool `todo_write` call skipped: empty todo payload."
936                                        .to_string(),
937                                );
938                                continue;
939                            }
940                        }
941                        let signature = if tool_key == "batch" {
942                            batch_tool_signature(&args)
943                                .unwrap_or_else(|| tool_signature(&tool_key, &args))
944                        } else {
945                            tool_signature(&tool_key, &args)
946                        };
947                        if is_shell_tool_name(&tool_key)
948                            && shell_mismatch_signatures.contains(&signature)
949                        {
950                            rejected_tool_call_in_cycle = true;
951                            outputs.push(
952                                "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
953                                    .to_string(),
954                            );
955                            continue;
956                        }
957                        let mut signature_count = 1usize;
958                        if is_read_only_tool(&tool_key)
959                            || (tool_key == "batch" && is_read_only_batch_call(&args))
960                        {
961                            let count = readonly_signature_counts
962                                .entry(signature.clone())
963                                .and_modify(|v| *v = v.saturating_add(1))
964                                .or_insert(1);
965                            signature_count = *count;
966                            if tool_key == "websearch" {
967                                if let Some(limit) = websearch_duplicate_signature_limit {
968                                    if *count > limit {
969                                        rejected_tool_call_in_cycle = true;
970                                        self.event_bus.publish(EngineEvent::new(
971                                            "tool.loop_guard.triggered",
972                                            json!({
973                                                "sessionID": session_id,
974                                                "messageID": user_message_id,
975                                                "tool": tool_key,
976                                                "reason": "duplicate_signature_retry_exhausted",
977                                                "duplicateLimit": limit,
978                                                "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
979                                                "loop_guard_triggered": true
980                                            }),
981                                        ));
982                                        outputs.push(
983                                            "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
984                                                .to_string(),
985                                        );
986                                        continue;
987                                    }
988                                }
989                            }
990                            if tool_key != "websearch" && *count > 1 {
991                                rejected_tool_call_in_cycle = true;
992                                if let Some(cached) = readonly_tool_cache.get(&signature) {
993                                    outputs.push(cached.clone());
994                                } else {
995                                    outputs.push(format!(
996                                        "Tool `{}` call skipped: duplicate call signature detected.",
997                                        tool_key
998                                    ));
999                                }
1000                                continue;
1001                            }
1002                        }
1003                        let is_read_only_signature = is_read_only_tool(&tool_key)
1004                            || (tool_key == "batch" && is_read_only_batch_call(&args));
1005                        if !is_read_only_signature {
1006                            let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1007                            let seen = mutable_signature_counts
1008                                .entry(signature.clone())
1009                                .and_modify(|v| *v = v.saturating_add(1))
1010                                .or_insert(1);
1011                            if *seen > duplicate_limit {
1012                                rejected_tool_call_in_cycle = true;
1013                                self.event_bus.publish(EngineEvent::new(
1014                                    "tool.loop_guard.triggered",
1015                                    json!({
1016                                        "sessionID": session_id,
1017                                        "messageID": user_message_id,
1018                                        "tool": tool_key,
1019                                        "reason": "duplicate_signature_retry_exhausted",
1020                                        "signatureHash": stable_hash(&signature),
1021                                        "duplicateLimit": duplicate_limit,
1022                                        "loop_guard_triggered": true
1023                                    }),
1024                                ));
1025                                outputs.push(format!(
1026                                    "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1027                                    tool_key, duplicate_limit
1028                                ));
1029                                duplicate_signature_hit_in_cycle = true;
1030                                continue;
1031                            }
1032                        }
1033                        let budget = tool_budget_for(&tool_key);
1034                        let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1035                        if *entry >= budget {
1036                            rejected_tool_call_in_cycle = true;
1037                            outputs.push(format!(
1038                                "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1039                                tool_key, budget
1040                            ));
1041                            guard_budget_hit_in_cycle = true;
1042                            continue;
1043                        }
1044                        let mut finalized_part = WireMessagePart::tool_invocation(
1045                            &session_id,
1046                            &user_message_id,
1047                            tool.clone(),
1048                            effective_args.clone(),
1049                        );
1050                        if let Some(call_id) = call_id.clone() {
1051                            finalized_part.id = Some(call_id);
1052                        }
1053                        finalized_part.state = Some("pending".to_string());
1054                        self.event_bus.publish(EngineEvent::new(
1055                            "message.part.updated",
1056                            json!({"part": finalized_part}),
1057                        ));
1058                        *entry += 1;
1059                        accepted_tool_calls_in_cycle =
1060                            accepted_tool_calls_in_cycle.saturating_add(1);
1061                        if let Some(output) = self
1062                            .execute_tool_with_permission(
1063                                &session_id,
1064                                &user_message_id,
1065                                tool,
1066                                effective_args,
1067                                call_id,
1068                                active_agent.skills.as_deref(),
1069                                &text,
1070                                requested_write_required,
1071                                Some(&completion),
1072                                cancel.clone(),
1073                            )
1074                            .await?
1075                        {
1076                            let productive = is_productive_tool_output(&tool_key, &output);
1077                            if output.contains("WEBSEARCH_QUERY_MISSING") {
1078                                websearch_query_blocked = true;
1079                            }
1080                            if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1081                            {
1082                                shell_mismatch_signatures.insert(signature.clone());
1083                            }
1084                            if is_read_only_tool(&tool_key)
1085                                && tool_key != "websearch"
1086                                && signature_count == 1
1087                            {
1088                                readonly_tool_cache.insert(signature, output.clone());
1089                            }
1090                            if productive {
1091                                productive_tool_calls_total =
1092                                    productive_tool_calls_total.saturating_add(1);
1093                                if is_workspace_write_tool(&tool_key) {
1094                                    productive_write_tool_calls_total =
1095                                        productive_write_tool_calls_total.saturating_add(1);
1096                                }
1097                                if is_workspace_inspection_tool(&tool_key) {
1098                                    productive_workspace_inspection_total =
1099                                        productive_workspace_inspection_total.saturating_add(1);
1100                                }
1101                                if tool_key == "read" {
1102                                    productive_concrete_read_total =
1103                                        productive_concrete_read_total.saturating_add(1);
1104                                }
1105                                if is_web_research_tool(&tool_key) {
1106                                    productive_web_research_total =
1107                                        productive_web_research_total.saturating_add(1);
1108                                    if is_successful_web_research_output(&tool_key, &output) {
1109                                        successful_web_research_total =
1110                                            successful_web_research_total.saturating_add(1);
1111                                    }
1112                                }
1113                                executed_productive_tool = true;
1114                                if tool_key == "pack_builder" {
1115                                    pack_builder_executed = true;
1116                                }
1117                            }
1118                            if tool_name_looks_like_email_action(&tool_key) {
1119                                if productive {
1120                                    email_action_executed = true;
1121                                } else {
1122                                    latest_email_action_note =
1123                                        Some(truncate_text(&output, 280).replace('\n', " "));
1124                                }
1125                            }
1126                            if is_auth_required_tool_output(&output) {
1127                                if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1128                                    blocked_mcp_servers.insert(server.to_string());
1129                                }
1130                                auth_required_hit_in_cycle = true;
1131                            }
1132                            outputs.push(output);
1133                            if auth_required_hit_in_cycle {
1134                                break;
1135                            }
1136                            if guard_budget_hit_in_cycle {
1137                                break;
1138                            }
1139                        }
1140                    }
1141                    if !outputs.is_empty() {
1142                        last_tool_outputs = outputs.clone();
1143                        if matches!(requested_tool_mode, ToolMode::Required)
1144                            && productive_tool_calls_total == 0
1145                        {
1146                            latest_required_tool_failure_kind = classify_required_tool_failure(
1147                                &outputs,
1148                                saw_tool_call_candidate,
1149                                accepted_tool_calls_in_cycle,
1150                                provider_tool_parse_failed,
1151                                rejected_tool_call_in_cycle,
1152                            );
1153                            if requested_write_required
1154                                && write_tool_attempted_in_cycle
1155                                && productive_write_tool_calls_total == 0
1156                                && is_write_invalid_args_failure_kind(
1157                                    latest_required_tool_failure_kind,
1158                                )
1159                            {
1160                                if required_write_retry_count + 1 < strict_write_retry_max_attempts
1161                                {
1162                                    required_write_retry_count += 1;
1163                                    required_tool_retry_count += 1;
1164                                    followup_context = Some(build_write_required_retry_context(
1165                                        &offered_tool_preview,
1166                                        latest_required_tool_failure_kind,
1167                                        &text,
1168                                        &requested_prewrite_requirements,
1169                                        productive_workspace_inspection_total > 0,
1170                                        productive_concrete_read_total > 0,
1171                                        productive_web_research_total > 0,
1172                                        successful_web_research_total > 0,
1173                                    ));
1174                                    self.event_bus.publish(EngineEvent::new(
1175                                        "provider.call.iteration.finish",
1176                                        json!({
1177                                            "sessionID": session_id,
1178                                            "messageID": user_message_id,
1179                                            "iteration": iteration,
1180                                            "finishReason": "required_write_invalid_retry",
1181                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1182                                            "rejectedToolCalls": 0,
1183                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1184                                        }),
1185                                    ));
1186                                    continue;
1187                                }
1188                            }
1189                            let progress_made_in_cycle = productive_workspace_inspection_total > 0
1190                                || productive_concrete_read_total > 0
1191                                || productive_web_research_total > 0
1192                                || successful_web_research_total > 0;
1193                            if should_retry_nonproductive_required_tool_cycle(
1194                                requested_write_required,
1195                                write_tool_attempted_in_cycle,
1196                                progress_made_in_cycle,
1197                                required_tool_retry_count,
1198                            ) {
1199                                required_tool_retry_count += 1;
1200                                followup_context =
1201                                    Some(build_required_tool_retry_context_for_task(
1202                                        &offered_tool_preview,
1203                                        latest_required_tool_failure_kind,
1204                                        &text,
1205                                    ));
1206                                self.event_bus.publish(EngineEvent::new(
1207                                    "provider.call.iteration.finish",
1208                                    json!({
1209                                        "sessionID": session_id,
1210                                        "messageID": user_message_id,
1211                                        "iteration": iteration,
1212                                        "finishReason": "required_tool_retry",
1213                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1214                                        "rejectedToolCalls": 0,
1215                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1216                                    }),
1217                                ));
1218                                continue;
1219                            }
1220                            completion = required_tool_mode_unsatisfied_completion(
1221                                latest_required_tool_failure_kind,
1222                            );
1223                            if !required_tool_unsatisfied_emitted {
1224                                required_tool_unsatisfied_emitted = true;
1225                                self.event_bus.publish(EngineEvent::new(
1226                                    "tool.mode.required.unsatisfied",
1227                                    json!({
1228                                        "sessionID": session_id,
1229                                        "messageID": user_message_id,
1230                                        "iteration": iteration,
1231                                        "selectedToolCount": allowed_tool_names.len(),
1232                                        "offeredToolsPreview": offered_tool_preview,
1233                                        "reason": latest_required_tool_failure_kind.code(),
1234                                    }),
1235                                ));
1236                            }
1237                            self.event_bus.publish(EngineEvent::new(
1238                                "provider.call.iteration.finish",
1239                                json!({
1240                                    "sessionID": session_id,
1241                                    "messageID": user_message_id,
1242                                    "iteration": iteration,
1243                                    "finishReason": "required_tool_unsatisfied",
1244                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1245                                    "rejectedToolCalls": 0,
1246                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1247                                }),
1248                            ));
1249                            break;
1250                        }
1251                        let prewrite_gate = evaluate_prewrite_gate(
1252                            requested_write_required,
1253                            &requested_prewrite_requirements,
1254                            PrewriteProgress {
1255                                productive_write_tool_calls_total,
1256                                productive_workspace_inspection_total,
1257                                productive_concrete_read_total,
1258                                productive_web_research_total,
1259                                successful_web_research_total,
1260                                required_write_retry_count,
1261                                unmet_prewrite_repair_retry_count,
1262                                prewrite_gate_waived,
1263                            },
1264                        );
1265                        let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1266                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1267                        if requested_write_required
1268                            && productive_tool_calls_total > 0
1269                            && productive_write_tool_calls_total == 0
1270                        {
1271                            if should_start_prewrite_repair_before_first_write(
1272                                requested_prewrite_requirements.repair_on_unmet_requirements,
1273                                productive_write_tool_calls_total,
1274                                prewrite_satisfied,
1275                                code_workflow_requested,
1276                            ) {
1277                                if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1278                                    unmet_prewrite_repair_retry_count += 1;
1279                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1280                                    let repair_attempts_remaining =
1281                                        prewrite_repair_budget.saturating_sub(repair_attempt);
1282                                    followup_context = Some(build_prewrite_repair_retry_context(
1283                                        &offered_tool_preview,
1284                                        latest_required_tool_failure_kind,
1285                                        &text,
1286                                        &requested_prewrite_requirements,
1287                                        productive_workspace_inspection_total > 0,
1288                                        productive_concrete_read_total > 0,
1289                                        productive_web_research_total > 0,
1290                                        successful_web_research_total > 0,
1291                                    ));
1292                                    self.event_bus.publish(EngineEvent::new(
1293                                        "provider.call.iteration.finish",
1294                                        json!({
1295                                            "sessionID": session_id,
1296                                            "messageID": user_message_id,
1297                                            "iteration": iteration,
1298                                            "finishReason": "prewrite_repair_retry",
1299                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300                                            "rejectedToolCalls": 0,
1301                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302                                            "repair": prewrite_repair_event_payload(
1303                                                repair_attempt,
1304                                                repair_attempts_remaining,
1305                                                &unmet_prewrite_codes,
1306                                                false,
1307                                            ),
1308                                        }),
1309                                    ));
1310                                    continue;
1311                                }
1312                                if !prewrite_gate_waived {
1313                                    if prewrite_fail_closed {
1314                                        let repair_attempt = unmet_prewrite_repair_retry_count;
1315                                        let repair_attempts_remaining =
1316                                            prewrite_repair_budget.saturating_sub(repair_attempt);
1317                                        completion = prewrite_requirements_exhausted_completion(
1318                                            &unmet_prewrite_codes,
1319                                            repair_attempt,
1320                                            repair_attempts_remaining,
1321                                        );
1322                                        self.event_bus.publish(EngineEvent::new(
1323                                            "prewrite.gate.strict_mode.blocked",
1324                                            json!({
1325                                                "sessionID": session_id,
1326                                                "messageID": user_message_id,
1327                                                "iteration": iteration,
1328                                                "unmetCodes": unmet_prewrite_codes,
1329                                            }),
1330                                        ));
1331                                        self.event_bus.publish(EngineEvent::new(
1332                                            "provider.call.iteration.finish",
1333                                            json!({
1334                                                "sessionID": session_id,
1335                                                "messageID": user_message_id,
1336                                                "iteration": iteration,
1337                                                "finishReason": "prewrite_requirements_exhausted",
1338                                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1339                                                "rejectedToolCalls": 0,
1340                                                "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1341                                                "repair": prewrite_repair_event_payload(
1342                                                    repair_attempt,
1343                                                    repair_attempts_remaining,
1344                                                    &unmet_prewrite_codes,
1345                                                    true,
1346                                                ),
1347                                            }),
1348                                        ));
1349                                        break;
1350                                    }
1351                                    prewrite_gate_waived = true;
1352                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1353                                    let repair_attempts_remaining =
1354                                        prewrite_repair_budget.saturating_sub(repair_attempt);
1355                                    followup_context = Some(build_prewrite_waived_write_context(
1356                                        &text,
1357                                        &unmet_prewrite_codes,
1358                                    ));
1359                                    self.event_bus.publish(EngineEvent::new(
1360                                        "prewrite.gate.waived.write_executed",
1361                                        json!({
1362                                            "sessionID": session_id,
1363                                            "messageID": user_message_id,
1364                                            "unmetCodes": unmet_prewrite_codes,
1365                                        }),
1366                                    ));
1367                                    self.event_bus.publish(EngineEvent::new(
1368                                        "provider.call.iteration.finish",
1369                                        json!({
1370                                            "sessionID": session_id,
1371                                            "messageID": user_message_id,
1372                                            "iteration": iteration,
1373                                            "finishReason": "prewrite_gate_waived",
1374                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1375                                            "rejectedToolCalls": 0,
1376                                            "prewriteGateWaived": true,
1377                                            "repair": prewrite_repair_event_payload(
1378                                                repair_attempt,
1379                                                repair_attempts_remaining,
1380                                                &unmet_prewrite_codes,
1381                                                true,
1382                                            ),
1383                                        }),
1384                                    ));
1385                                    continue;
1386                                }
1387                            }
1388                            latest_required_tool_failure_kind =
1389                                RequiredToolFailureKind::WriteRequiredNotSatisfied;
1390                            if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1391                                required_write_retry_count += 1;
1392                                followup_context = Some(build_write_required_retry_context(
1393                                    &offered_tool_preview,
1394                                    latest_required_tool_failure_kind,
1395                                    &text,
1396                                    &requested_prewrite_requirements,
1397                                    productive_workspace_inspection_total > 0,
1398                                    productive_concrete_read_total > 0,
1399                                    productive_web_research_total > 0,
1400                                    successful_web_research_total > 0,
1401                                ));
1402                                self.event_bus.publish(EngineEvent::new(
1403                                    "provider.call.iteration.finish",
1404                                    json!({
1405                                        "sessionID": session_id,
1406                                        "messageID": user_message_id,
1407                                        "iteration": iteration,
1408                                        "finishReason": "required_write_retry",
1409                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1410                                        "rejectedToolCalls": 0,
1411                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1412                                    }),
1413                                ));
1414                                continue;
1415                            }
1416                            completion = required_tool_mode_unsatisfied_completion(
1417                                latest_required_tool_failure_kind,
1418                            );
1419                            if !required_tool_unsatisfied_emitted {
1420                                required_tool_unsatisfied_emitted = true;
1421                                self.event_bus.publish(EngineEvent::new(
1422                                    "tool.mode.required.unsatisfied",
1423                                    json!({
1424                                        "sessionID": session_id,
1425                                        "messageID": user_message_id,
1426                                        "iteration": iteration,
1427                                        "selectedToolCount": allowed_tool_names.len(),
1428                                        "offeredToolsPreview": offered_tool_preview,
1429                                        "reason": latest_required_tool_failure_kind.code(),
1430                                    }),
1431                                ));
1432                            }
1433                            self.event_bus.publish(EngineEvent::new(
1434                                "provider.call.iteration.finish",
1435                                json!({
1436                                    "sessionID": session_id,
1437                                    "messageID": user_message_id,
1438                                    "iteration": iteration,
1439                                    "finishReason": "required_write_unsatisfied",
1440                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1441                                    "rejectedToolCalls": 0,
1442                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1443                                }),
1444                            ));
1445                            break;
1446                        }
1447                        if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1448                            if let Some(retry_context) =
1449                                build_invalid_tool_args_retry_context_from_outputs(
1450                                    &outputs,
1451                                    invalid_tool_args_retry_count,
1452                                )
1453                            {
1454                                invalid_tool_args_retry_count += 1;
1455                                followup_context = Some(format!(
1456                                    "Previous tool call arguments were invalid. {}",
1457                                    retry_context
1458                                ));
1459                                self.event_bus.publish(EngineEvent::new(
1460                                    "provider.call.iteration.finish",
1461                                    json!({
1462                                        "sessionID": session_id,
1463                                        "messageID": user_message_id,
1464                                        "iteration": iteration,
1465                                        "finishReason": "invalid_tool_args_retry",
1466                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1467                                        "rejectedToolCalls": 0,
1468                                    }),
1469                                ));
1470                                continue;
1471                            }
1472                        }
1473                        let guard_budget_hit =
1474                            outputs.iter().any(|o| is_guard_budget_tool_output(o));
1475                        if executed_productive_tool {
1476                            let prewrite_gate = evaluate_prewrite_gate(
1477                                requested_write_required,
1478                                &requested_prewrite_requirements,
1479                                PrewriteProgress {
1480                                    productive_write_tool_calls_total,
1481                                    productive_workspace_inspection_total,
1482                                    productive_concrete_read_total,
1483                                    productive_web_research_total,
1484                                    successful_web_research_total,
1485                                    required_write_retry_count,
1486                                    unmet_prewrite_repair_retry_count,
1487                                    prewrite_gate_waived,
1488                                },
1489                            );
1490                            let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1491                            let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1492                            if requested_write_required
1493                                && productive_write_tool_calls_total > 0
1494                                && requested_prewrite_requirements.repair_on_unmet_requirements
1495                                && unmet_prewrite_repair_retry_count < prewrite_repair_budget
1496                                && !prewrite_satisfied
1497                            {
1498                                unmet_prewrite_repair_retry_count += 1;
1499                                let repair_attempt = unmet_prewrite_repair_retry_count;
1500                                let repair_attempts_remaining =
1501                                    prewrite_repair_budget.saturating_sub(repair_attempt);
1502                                followup_context = Some(build_prewrite_repair_retry_context(
1503                                    &offered_tool_preview,
1504                                    latest_required_tool_failure_kind,
1505                                    &text,
1506                                    &requested_prewrite_requirements,
1507                                    productive_workspace_inspection_total > 0,
1508                                    productive_concrete_read_total > 0,
1509                                    productive_web_research_total > 0,
1510                                    successful_web_research_total > 0,
1511                                ));
1512                                self.event_bus.publish(EngineEvent::new(
1513                                    "provider.call.iteration.finish",
1514                                    json!({
1515                                        "sessionID": session_id,
1516                                        "messageID": user_message_id,
1517                                        "iteration": iteration,
1518                                        "finishReason": "prewrite_repair_retry",
1519                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1520                                        "rejectedToolCalls": 0,
1521                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1522                                        "repair": prewrite_repair_event_payload(
1523                                            repair_attempt,
1524                                            repair_attempts_remaining,
1525                                            &unmet_prewrite_codes,
1526                                            false,
1527                                        ),
1528                                    }),
1529                                ));
1530                                continue;
1531                            }
1532                            if requested_write_required
1533                                && productive_write_tool_calls_total > 0
1534                                && requested_prewrite_requirements.repair_on_unmet_requirements
1535                                && !prewrite_satisfied
1536                                && prewrite_fail_closed
1537                            {
1538                                let repair_attempt = unmet_prewrite_repair_retry_count;
1539                                let repair_attempts_remaining =
1540                                    prewrite_repair_budget.saturating_sub(repair_attempt);
1541                                completion = prewrite_requirements_exhausted_completion(
1542                                    &unmet_prewrite_codes,
1543                                    repair_attempt,
1544                                    repair_attempts_remaining,
1545                                );
1546                                self.event_bus.publish(EngineEvent::new(
1547                                    "prewrite.gate.strict_mode.blocked",
1548                                    json!({
1549                                        "sessionID": session_id,
1550                                        "messageID": user_message_id,
1551                                        "iteration": iteration,
1552                                        "unmetCodes": unmet_prewrite_codes,
1553                                    }),
1554                                ));
1555                                self.event_bus.publish(EngineEvent::new(
1556                                    "provider.call.iteration.finish",
1557                                    json!({
1558                                        "sessionID": session_id,
1559                                        "messageID": user_message_id,
1560                                        "iteration": iteration,
1561                                        "finishReason": "prewrite_requirements_exhausted",
1562                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1563                                        "rejectedToolCalls": 0,
1564                                        "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1565                                        "repair": prewrite_repair_event_payload(
1566                                            repair_attempt,
1567                                            repair_attempts_remaining,
1568                                            &unmet_prewrite_codes,
1569                                            true,
1570                                        ),
1571                                    }),
1572                                ));
1573                                break;
1574                            }
1575                            followup_context = Some(format!(
1576                                "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1577                                summarize_tool_outputs(&outputs)
1578                            ));
1579                            self.event_bus.publish(EngineEvent::new(
1580                                "provider.call.iteration.finish",
1581                                json!({
1582                                    "sessionID": session_id,
1583                                    "messageID": user_message_id,
1584                                    "iteration": iteration,
1585                                    "finishReason": "tool_followup",
1586                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1587                                    "rejectedToolCalls": 0,
1588                                }),
1589                            ));
1590                            continue;
1591                        }
1592                        if guard_budget_hit {
1593                            completion = summarize_guard_budget_outputs(&outputs)
1594                                .unwrap_or_else(|| {
1595                                    "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1596                                });
1597                        } else if duplicate_signature_hit_in_cycle {
1598                            completion = summarize_duplicate_signature_outputs(&outputs)
1599                                .unwrap_or_else(|| {
1600                                    "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1601                                });
1602                        } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1603                            completion = summary;
1604                        } else {
1605                            completion.clear();
1606                        }
1607                        self.event_bus.publish(EngineEvent::new(
1608                            "provider.call.iteration.finish",
1609                            json!({
1610                                "sessionID": session_id,
1611                                "messageID": user_message_id,
1612                                "iteration": iteration,
1613                                "finishReason": "tool_summary",
1614                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1615                                "rejectedToolCalls": 0,
1616                            }),
1617                        ));
1618                        break;
1619                    } else if matches!(requested_tool_mode, ToolMode::Required) {
1620                        latest_required_tool_failure_kind = classify_required_tool_failure(
1621                            &outputs,
1622                            saw_tool_call_candidate,
1623                            accepted_tool_calls_in_cycle,
1624                            provider_tool_parse_failed,
1625                            rejected_tool_call_in_cycle,
1626                        );
1627                    }
1628                }
1629
1630                {
1631                    let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1632                        if let Some(usage) = provider_usage {
1633                            (
1634                                usage.prompt_tokens,
1635                                usage.completion_tokens,
1636                                usage.total_tokens,
1637                                "provider",
1638                            )
1639                        } else {
1640                            // Provider did not return usage (e.g. streaming without
1641                            // include_usage, or a backend that omits it). Estimate from
1642                            // accumulated char counts using the ~4 chars-per-token heuristic.
1643                            let est_prompt = (estimated_prompt_chars / 4) as u64;
1644                            let est_completion = (completion.len() / 4) as u64;
1645                            tracing::debug!(
1646                                session_id = %session_id,
1647                                provider_id = %provider_id,
1648                                "provider.usage missing from stream; using char-count estimate \
1649                                 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1650                                completion.len()
1651                            );
1652                            (
1653                                est_prompt,
1654                                est_completion,
1655                                est_prompt.saturating_add(est_completion),
1656                                "estimated",
1657                            )
1658                        };
1659                    self.event_bus.publish(EngineEvent::new(
1660                        "provider.usage",
1661                        json!({
1662                            "sessionID": session_id,
1663                            "correlationID": correlation_ref,
1664                            "messageID": user_message_id,
1665                            "promptTokens": prompt_tokens,
1666                            "completionTokens": completion_tokens,
1667                            "totalTokens": total_tokens,
1668                            "usageSource": usage_source,
1669                        }),
1670                    ));
1671                }
1672
1673                if matches!(requested_tool_mode, ToolMode::Required)
1674                    && productive_tool_calls_total == 0
1675                {
1676                    if requested_write_required
1677                        && required_write_retry_count > 0
1678                        && productive_write_tool_calls_total == 0
1679                        && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1680                    {
1681                        latest_required_tool_failure_kind =
1682                            RequiredToolFailureKind::WriteRequiredNotSatisfied;
1683                    }
1684                    if requested_write_required
1685                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1686                    {
1687                        required_write_retry_count += 1;
1688                        followup_context = Some(build_write_required_retry_context(
1689                            &offered_tool_preview,
1690                            latest_required_tool_failure_kind,
1691                            &text,
1692                            &requested_prewrite_requirements,
1693                            productive_workspace_inspection_total > 0,
1694                            productive_concrete_read_total > 0,
1695                            productive_web_research_total > 0,
1696                            successful_web_research_total > 0,
1697                        ));
1698                        continue;
1699                    }
1700                    let progress_made_in_cycle = productive_workspace_inspection_total > 0
1701                        || productive_concrete_read_total > 0
1702                        || productive_web_research_total > 0
1703                        || successful_web_research_total > 0;
1704                    if should_retry_nonproductive_required_tool_cycle(
1705                        requested_write_required,
1706                        false,
1707                        progress_made_in_cycle,
1708                        required_tool_retry_count,
1709                    ) {
1710                        required_tool_retry_count += 1;
1711                        followup_context = Some(build_required_tool_retry_context_for_task(
1712                            &offered_tool_preview,
1713                            latest_required_tool_failure_kind,
1714                            &text,
1715                        ));
1716                        continue;
1717                    }
1718                    completion = required_tool_mode_unsatisfied_completion(
1719                        latest_required_tool_failure_kind,
1720                    );
1721                    if !required_tool_unsatisfied_emitted {
1722                        required_tool_unsatisfied_emitted = true;
1723                        self.event_bus.publish(EngineEvent::new(
1724                            "tool.mode.required.unsatisfied",
1725                            json!({
1726                                "sessionID": session_id,
1727                                "messageID": user_message_id,
1728                                "iteration": iteration,
1729                                "selectedToolCount": allowed_tool_names.len(),
1730                                "offeredToolsPreview": offered_tool_preview,
1731                                "reason": latest_required_tool_failure_kind.code(),
1732                            }),
1733                        ));
1734                    }
1735                    self.event_bus.publish(EngineEvent::new(
1736                        "provider.call.iteration.finish",
1737                        json!({
1738                            "sessionID": session_id,
1739                            "messageID": user_message_id,
1740                            "iteration": iteration,
1741                            "finishReason": "required_tool_unsatisfied",
1742                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1743                            "rejectedToolCalls": 0,
1744                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1745                        }),
1746                    ));
1747                } else {
1748                    if completion.trim().is_empty()
1749                        && !last_tool_outputs.is_empty()
1750                        && requested_write_required
1751                        && empty_completion_retry_count == 0
1752                    {
1753                        empty_completion_retry_count += 1;
1754                        followup_context = Some(build_empty_completion_retry_context(
1755                            &offered_tool_preview,
1756                            &text,
1757                            &requested_prewrite_requirements,
1758                            productive_workspace_inspection_total > 0,
1759                            productive_concrete_read_total > 0,
1760                            productive_web_research_total > 0,
1761                            successful_web_research_total > 0,
1762                        ));
1763                        self.event_bus.publish(EngineEvent::new(
1764                            "provider.call.iteration.finish",
1765                            json!({
1766                                "sessionID": session_id,
1767                                "messageID": user_message_id,
1768                                "iteration": iteration,
1769                                "finishReason": "empty_completion_retry",
1770                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1771                                "rejectedToolCalls": 0,
1772                            }),
1773                        ));
1774                        continue;
1775                    }
1776                    let prewrite_gate = evaluate_prewrite_gate(
1777                        requested_write_required,
1778                        &requested_prewrite_requirements,
1779                        PrewriteProgress {
1780                            productive_write_tool_calls_total,
1781                            productive_workspace_inspection_total,
1782                            productive_concrete_read_total,
1783                            productive_web_research_total,
1784                            successful_web_research_total,
1785                            required_write_retry_count,
1786                            unmet_prewrite_repair_retry_count,
1787                            prewrite_gate_waived,
1788                        },
1789                    );
1790                    if should_start_prewrite_repair_before_first_write(
1791                        requested_prewrite_requirements.repair_on_unmet_requirements,
1792                        productive_write_tool_calls_total,
1793                        prewrite_gate.prewrite_satisfied,
1794                        code_workflow_requested,
1795                    ) && !prewrite_gate_waived
1796                    {
1797                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1798                        if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1799                            unmet_prewrite_repair_retry_count += 1;
1800                            let repair_attempt = unmet_prewrite_repair_retry_count;
1801                            let repair_attempts_remaining =
1802                                prewrite_repair_budget.saturating_sub(repair_attempt);
1803                            followup_context = Some(build_prewrite_repair_retry_context(
1804                                &offered_tool_preview,
1805                                latest_required_tool_failure_kind,
1806                                &text,
1807                                &requested_prewrite_requirements,
1808                                productive_workspace_inspection_total > 0,
1809                                productive_concrete_read_total > 0,
1810                                productive_web_research_total > 0,
1811                                successful_web_research_total > 0,
1812                            ));
1813                            self.event_bus.publish(EngineEvent::new(
1814                                "provider.call.iteration.finish",
1815                                json!({
1816                                    "sessionID": session_id,
1817                                    "messageID": user_message_id,
1818                                    "iteration": iteration,
1819                                    "finishReason": "prewrite_repair_retry",
1820                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1821                                    "rejectedToolCalls": 0,
1822                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1823                                    "repair": prewrite_repair_event_payload(
1824                                        repair_attempt,
1825                                        repair_attempts_remaining,
1826                                        &unmet_prewrite_codes,
1827                                        false,
1828                                    ),
1829                                }),
1830                            ));
1831                            continue;
1832                        }
1833                        if prewrite_fail_closed {
1834                            let repair_attempt = unmet_prewrite_repair_retry_count;
1835                            let repair_attempts_remaining =
1836                                prewrite_repair_budget.saturating_sub(repair_attempt);
1837                            completion = prewrite_requirements_exhausted_completion(
1838                                &unmet_prewrite_codes,
1839                                repair_attempt,
1840                                repair_attempts_remaining,
1841                            );
1842                            self.event_bus.publish(EngineEvent::new(
1843                                "prewrite.gate.strict_mode.blocked",
1844                                json!({
1845                                    "sessionID": session_id,
1846                                    "messageID": user_message_id,
1847                                    "iteration": iteration,
1848                                    "unmetCodes": unmet_prewrite_codes,
1849                                }),
1850                            ));
1851                            self.event_bus.publish(EngineEvent::new(
1852                                "provider.call.iteration.finish",
1853                                json!({
1854                                    "sessionID": session_id,
1855                                    "messageID": user_message_id,
1856                                    "iteration": iteration,
1857                                    "finishReason": "prewrite_requirements_exhausted",
1858                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1859                                    "rejectedToolCalls": 0,
1860                                    "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1861                                    "repair": prewrite_repair_event_payload(
1862                                        repair_attempt,
1863                                        repair_attempts_remaining,
1864                                        &unmet_prewrite_codes,
1865                                        true,
1866                                    ),
1867                                }),
1868                            ));
1869                            break;
1870                        }
1871                        prewrite_gate_waived = true;
1872                        let repair_attempt = unmet_prewrite_repair_retry_count;
1873                        let repair_attempts_remaining =
1874                            prewrite_repair_budget.saturating_sub(repair_attempt);
1875                        followup_context = Some(build_prewrite_waived_write_context(
1876                            &text,
1877                            &unmet_prewrite_codes,
1878                        ));
1879                        self.event_bus.publish(EngineEvent::new(
1880                            "prewrite.gate.waived.write_executed",
1881                            json!({
1882                                "sessionID": session_id,
1883                                "messageID": user_message_id,
1884                                "unmetCodes": unmet_prewrite_codes,
1885                            }),
1886                        ));
1887                        self.event_bus.publish(EngineEvent::new(
1888                            "provider.call.iteration.finish",
1889                            json!({
1890                                "sessionID": session_id,
1891                                "messageID": user_message_id,
1892                                "iteration": iteration,
1893                                "finishReason": "prewrite_gate_waived",
1894                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1895                                "rejectedToolCalls": 0,
1896                                "prewriteGateWaived": true,
1897                                "repair": prewrite_repair_event_payload(
1898                                    repair_attempt,
1899                                    repair_attempts_remaining,
1900                                    &unmet_prewrite_codes,
1901                                    true,
1902                                ),
1903                            }),
1904                        ));
1905                        continue;
1906                    }
1907                    if prewrite_gate_waived
1908                        && requested_write_required
1909                        && productive_write_tool_calls_total == 0
1910                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1911                    {
1912                        required_write_retry_count += 1;
1913                        followup_context = Some(build_write_required_retry_context(
1914                            &offered_tool_preview,
1915                            latest_required_tool_failure_kind,
1916                            &text,
1917                            &requested_prewrite_requirements,
1918                            productive_workspace_inspection_total > 0,
1919                            productive_concrete_read_total > 0,
1920                            productive_web_research_total > 0,
1921                            successful_web_research_total > 0,
1922                        ));
1923                        self.event_bus.publish(EngineEvent::new(
1924                            "provider.call.iteration.finish",
1925                            json!({
1926                                "sessionID": session_id,
1927                                "messageID": user_message_id,
1928                                "iteration": iteration,
1929                                "finishReason": "waived_write_retry",
1930                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1931                                "rejectedToolCalls": 0,
1932                            }),
1933                        ));
1934                        continue;
1935                    }
1936                    self.event_bus.publish(EngineEvent::new(
1937                        "provider.call.iteration.finish",
1938                        json!({
1939                            "sessionID": session_id,
1940                            "messageID": user_message_id,
1941                            "iteration": iteration,
1942                            "finishReason": "provider_completion",
1943                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1944                            "rejectedToolCalls": 0,
1945                        }),
1946                    ));
1947                }
1948                break;
1949            }
1950            if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
1951            {
1952                completion =
1953                    required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
1954                if !required_tool_unsatisfied_emitted {
1955                    self.event_bus.publish(EngineEvent::new(
1956                        "tool.mode.required.unsatisfied",
1957                        json!({
1958                            "sessionID": session_id,
1959                            "messageID": user_message_id,
1960                            "selectedToolCount": tool_call_counts.len(),
1961                            "reason": latest_required_tool_failure_kind.code(),
1962                        }),
1963                    ));
1964                }
1965            }
1966            if completion.trim().is_empty()
1967                && !last_tool_outputs.is_empty()
1968                && requested_write_required
1969                && productive_write_tool_calls_total > 0
1970            {
1971                let final_prewrite_satisfied = evaluate_prewrite_gate(
1972                    requested_write_required,
1973                    &requested_prewrite_requirements,
1974                    PrewriteProgress {
1975                        productive_write_tool_calls_total,
1976                        productive_workspace_inspection_total,
1977                        productive_concrete_read_total,
1978                        productive_web_research_total,
1979                        successful_web_research_total,
1980                        required_write_retry_count,
1981                        unmet_prewrite_repair_retry_count,
1982                        prewrite_gate_waived,
1983                    },
1984                )
1985                .prewrite_satisfied;
1986                if prewrite_fail_closed && !final_prewrite_satisfied {
1987                    let unmet_prewrite_codes = evaluate_prewrite_gate(
1988                        requested_write_required,
1989                        &requested_prewrite_requirements,
1990                        PrewriteProgress {
1991                            productive_write_tool_calls_total,
1992                            productive_workspace_inspection_total,
1993                            productive_concrete_read_total,
1994                            productive_web_research_total,
1995                            successful_web_research_total,
1996                            required_write_retry_count,
1997                            unmet_prewrite_repair_retry_count,
1998                            prewrite_gate_waived,
1999                        },
2000                    )
2001                    .unmet_codes;
2002                    completion = prewrite_requirements_exhausted_completion(
2003                        &unmet_prewrite_codes,
2004                        unmet_prewrite_repair_retry_count,
2005                        prewrite_repair_budget.saturating_sub(unmet_prewrite_repair_retry_count),
2006                    );
2007                } else {
2008                    completion = synthesize_artifact_write_completion_from_tool_state(
2009                        &text,
2010                        final_prewrite_satisfied,
2011                        prewrite_gate_waived,
2012                    );
2013                }
2014            }
2015            if completion.trim().is_empty()
2016                && !last_tool_outputs.is_empty()
2017                && should_generate_post_tool_final_narrative(
2018                    requested_tool_mode,
2019                    productive_tool_calls_total,
2020                )
2021            {
2022                if let Some(narrative) = self
2023                    .generate_final_narrative_without_tools(
2024                        &session_id,
2025                        &active_agent,
2026                        Some(provider_id.as_str()),
2027                        Some(model_id_value.as_str()),
2028                        cancel.clone(),
2029                        &last_tool_outputs,
2030                    )
2031                    .await
2032                {
2033                    completion = narrative;
2034                }
2035            }
2036            if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
2037                if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
2038                    completion = summary;
2039                } else if let Some(hint) =
2040                    summarize_terminal_tool_failure_for_user(&last_tool_outputs)
2041                {
2042                    completion = hint;
2043                } else {
2044                    let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
2045                    if preview.trim().is_empty() {
2046                        completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
2047                    } else {
2048                        completion = format!(
2049                            "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
2050                            preview
2051                        );
2052                    }
2053                }
2054            }
2055            if completion.trim().is_empty() {
2056                completion =
2057                    "I couldn't produce a final response for that run. Please retry your request."
2058                        .to_string();
2059            }
2060            // M-3: Gate fires when email was requested AND email-action tools were
2061            // actually offered to the agent during at least one iteration but no
2062            // email action tool was executed. The completion text is NOT consulted —
2063            // this prevents the model from bypassing the gate by rephrasing, and
2064            // prevents false positives on legitimate text containing email keywords.
2065            // Skipping when no email tools were ever offered avoids clobbering
2066            // legitimate output with a delivery-failure message the agent could not
2067            // have avoided (e.g. prompts that mention gmail tool names as context).
2068            if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
2069                let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
2070                    .to_string();
2071                if let Some(note) = latest_email_action_note.as_ref() {
2072                    fallback.push_str("\n\nLast email tool status: ");
2073                    fallback.push_str(note);
2074                }
2075                fallback.push_str(
2076                    "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
2077                );
2078                completion = fallback;
2079            }
2080            completion = strip_model_control_markers(&completion);
2081            truncate_text(&completion, 16_000)
2082        };
2083        emit_event(
2084            Level::INFO,
2085            ProcessKind::Engine,
2086            ObservabilityEvent {
2087                event: "provider.call.finish",
2088                component: "engine.loop",
2089                correlation_id: correlation_ref,
2090                session_id: Some(&session_id),
2091                run_id: None,
2092                message_id: Some(&user_message_id),
2093                provider_id: Some(provider_id.as_str()),
2094                model_id,
2095                status: Some("ok"),
2096                error_code: None,
2097                detail: Some("provider stream complete"),
2098            },
2099        );
2100        if active_agent.name.eq_ignore_ascii_case("plan") {
2101            emit_plan_todo_fallback(
2102                self.storage.clone(),
2103                &self.event_bus,
2104                &session_id,
2105                &user_message_id,
2106                &completion,
2107            )
2108            .await;
2109            let todos_after_fallback = self.storage.get_todos(&session_id).await;
2110            if todos_after_fallback.is_empty() && !question_tool_used {
2111                emit_plan_question_fallback(
2112                    self.storage.clone(),
2113                    &self.event_bus,
2114                    &session_id,
2115                    &user_message_id,
2116                    &completion,
2117                )
2118                .await;
2119            }
2120        }
2121        if cancel.is_cancelled() {
2122            self.event_bus.publish(EngineEvent::new(
2123                "session.status",
2124                json!({"sessionID": session_id, "status":"cancelled"}),
2125            ));
2126            self.cancellations.remove(&session_id).await;
2127            return Ok(());
2128        }
2129        let assistant = Message::new(
2130            MessageRole::Assistant,
2131            vec![MessagePart::Text {
2132                text: completion.clone(),
2133            }],
2134        );
2135        let assistant_message_id = assistant.id.clone();
2136        self.storage.append_message(&session_id, assistant).await?;
2137        let final_part = WireMessagePart::text(
2138            &session_id,
2139            &assistant_message_id,
2140            truncate_text(&completion, 16_000),
2141        );
2142        self.event_bus.publish(EngineEvent::new(
2143            "message.part.updated",
2144            json!({"part": final_part}),
2145        ));
2146        self.event_bus.publish(EngineEvent::new(
2147            "session.updated",
2148            json!({"sessionID": session_id, "status":"idle"}),
2149        ));
2150        self.event_bus.publish(EngineEvent::new(
2151            "session.status",
2152            json!({"sessionID": session_id, "status":"idle"}),
2153        ));
2154        self.cancellations.remove(&session_id).await;
2155        Ok(())
2156    }
2157}