Skip to main content

tandem_core/engine_loop/
prompt_execution.rs

1use super::*;
2
3impl EngineLoop {
4    pub async fn run_prompt_async_with_context(
5        &self,
6        session_id: String,
7        req: SendMessageRequest,
8        correlation_id: Option<String>,
9    ) -> anyhow::Result<()> {
10        let session_model = self
11            .storage
12            .get_session(&session_id)
13            .await
14            .and_then(|s| s.model);
15        let (provider_id, model_id_value) =
16            resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17                anyhow::anyhow!(
18                "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19            )
20            })?;
21        let correlation_ref = correlation_id.as_deref();
22        let model_id = Some(model_id_value.as_str());
23        let cancel = self.cancellations.create(&session_id).await;
24        emit_event(
25            Level::INFO,
26            ProcessKind::Engine,
27            ObservabilityEvent {
28                event: "provider.call.start",
29                component: "engine.loop",
30                correlation_id: correlation_ref,
31                session_id: Some(&session_id),
32                run_id: None,
33                message_id: None,
34                provider_id: Some(provider_id.as_str()),
35                model_id,
36                status: Some("start"),
37                error_code: None,
38                detail: Some("run_prompt_async dispatch"),
39            },
40        );
41        self.event_bus.publish(EngineEvent::new(
42            "session.status",
43            json!({"sessionID": session_id, "status":"running"}),
44        ));
45        let request_parts = req.parts.clone();
46        let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47        let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48        let requested_write_required = req.write_required.unwrap_or(false);
49        let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50        let request_tool_allowlist = req
51            .tool_allowlist
52            .clone()
53            .unwrap_or_default()
54            .into_iter()
55            .map(|tool| normalize_tool_name(&tool))
56            .filter(|tool| !tool.trim().is_empty())
57            .collect::<HashSet<_>>();
58        // Propagate per-request tool allowlist to session-level enforcement so
59        // that execution-time checks (and mcp_list scoping) also respect it.
60        if !request_tool_allowlist.is_empty() {
61            self.set_session_allowed_tools(
62                &session_id,
63                request_tool_allowlist.iter().cloned().collect(),
64            )
65            .await;
66        }
67        let text = req
68            .parts
69            .iter()
70            .map(|p| match p {
71                MessagePartInput::Text { text } => text.clone(),
72                MessagePartInput::File {
73                    mime,
74                    filename,
75                    url,
76                } => format!(
77                    "[file mime={} name={} url={}]",
78                    mime,
79                    filename.clone().unwrap_or_else(|| "unknown".to_string()),
80                    url
81                ),
82            })
83            .collect::<Vec<_>>()
84            .join("\n");
85        let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
86        self.auto_rename_session_from_user_text(&session_id, &text)
87            .await;
88        let active_agent = self.agents.get(req.agent.as_deref()).await;
89        let mut user_message_id = self
90            .find_recent_matching_user_message_id(&session_id, &text)
91            .await;
92        if user_message_id.is_none() {
93            let user_message = Message::new(
94                MessageRole::User,
95                vec![MessagePart::Text { text: text.clone() }],
96            );
97            let created_message_id = user_message.id.clone();
98            self.storage
99                .append_message(&session_id, user_message)
100                .await?;
101
102            let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
103            self.event_bus.publish(EngineEvent::new(
104                "message.part.updated",
105                json!({
106                    "part": user_part,
107                    "delta": text,
108                    "agent": active_agent.name
109                }),
110            ));
111            user_message_id = Some(created_message_id);
112        }
113        let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
114
115        if cancel.is_cancelled() {
116            self.event_bus.publish(EngineEvent::new(
117                "session.status",
118                json!({"sessionID": session_id, "status":"cancelled"}),
119            ));
120            self.cancellations.remove(&session_id).await;
121            return Ok(());
122        }
123
124        let mut question_tool_used = false;
125        let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
126            if normalize_tool_name(&tool) == "question" {
127                question_tool_used = true;
128            }
129            if !agent_can_use_tool(&active_agent, &tool) {
130                format!(
131                    "Tool `{tool}` is not enabled for agent `{}`.",
132                    active_agent.name
133                )
134            } else {
135                self.execute_tool_with_permission(
136                    &session_id,
137                    &user_message_id,
138                    tool.clone(),
139                    args,
140                    None,
141                    active_agent.skills.as_deref(),
142                    &text,
143                    requested_write_required,
144                    None,
145                    cancel.clone(),
146                )
147                .await?
148                .unwrap_or_default()
149            }
150        } else {
151            let mut completion = String::new();
152            let mut max_iterations = max_tool_iterations();
153            let mut followup_context: Option<String> = None;
154            let mut last_tool_outputs: Vec<String> = Vec::new();
155            let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
156            let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
157            let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
158            let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
159            let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
160            let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
161            let mut websearch_query_blocked = false;
162            let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
163            let mut pack_builder_executed = false;
164            let mut auto_workspace_probe_attempted = false;
165            let mut productive_tool_calls_total = 0usize;
166            let mut productive_write_tool_calls_total = 0usize;
167            let mut productive_workspace_inspection_total = 0usize;
168            let mut productive_web_research_total = 0usize;
169            let mut productive_concrete_read_total = 0usize;
170            let mut successful_web_research_total = 0usize;
171            let mut required_tool_retry_count = 0usize;
172            let mut required_write_retry_count = 0usize;
173            let mut unmet_prewrite_repair_retry_count = 0usize;
174            let mut empty_completion_retry_count = 0usize;
175            let mut prewrite_gate_waived = false;
176            let mut invalid_tool_args_retry_count = 0usize;
177            let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
178            let mut required_tool_unsatisfied_emitted = false;
179            let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
180            let email_delivery_requested = requires_email_delivery_prompt(&text);
181            let web_research_requested = requires_web_research_prompt(&text);
182            let code_workflow_requested = infer_code_workflow_from_text(&text);
183            let mut email_action_executed = false;
184            let mut latest_email_action_note: Option<String> = None;
185            let mut email_tools_ever_offered = false;
186            let intent = classify_intent(&text);
187            let router_enabled = tool_router_enabled();
188            let retrieval_enabled = semantic_tool_retrieval_enabled();
189            let retrieval_k = semantic_tool_retrieval_k();
190            let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
191                self.tools.mcp_server_names().await
192            } else {
193                Vec::new()
194            };
195            let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
196            let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
197                && runtime_attachments.is_empty()
198                && is_short_simple_prompt(&text)
199                && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
200
201            while max_iterations > 0 && !cancel.is_cancelled() {
202                let iteration = 26usize.saturating_sub(max_iterations);
203                max_iterations -= 1;
204                let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
205                    ChatHistoryProfile::Full
206                } else if matches!(requested_context_mode, ContextMode::Compact)
207                    || context_is_auto_compact
208                {
209                    ChatHistoryProfile::Compact
210                } else {
211                    ChatHistoryProfile::Standard
212                };
213                let mut messages =
214                    load_chat_history(self.storage.clone(), &session_id, context_profile).await;
215                if iteration == 1 && !runtime_attachments.is_empty() {
216                    attach_to_last_user_message(&mut messages, &runtime_attachments);
217                }
218                let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
219                self.event_bus.publish(EngineEvent::new(
220                    "context.profile.selected",
221                    json!({
222                        "sessionID": session_id,
223                        "messageID": user_message_id,
224                        "iteration": iteration,
225                        "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
226                        "historyMessageCount": messages.len(),
227                        "historyCharCount": history_char_count,
228                        "memoryInjected": false
229                    }),
230                ));
231                let mut system_parts = vec![tandem_runtime_system_prompt(
232                    &self.host_runtime_context,
233                    &mcp_server_names,
234                )];
235                if let Some(system) = active_agent.system_prompt.as_ref() {
236                    system_parts.push(system.clone());
237                }
238                messages.insert(
239                    0,
240                    ChatMessage {
241                        role: "system".to_string(),
242                        content: system_parts.join("\n\n"),
243                        attachments: Vec::new(),
244                    },
245                );
246                if let Some(extra) = followup_context.take() {
247                    messages.push(ChatMessage {
248                        role: "user".to_string(),
249                        content: extra,
250                        attachments: Vec::new(),
251                    });
252                }
253                if let Some(hook) = self.prompt_context_hook.read().await.clone() {
254                    let ctx = PromptContextHookContext {
255                        session_id: session_id.clone(),
256                        message_id: user_message_id.clone(),
257                        provider_id: provider_id.clone(),
258                        model_id: model_id_value.clone(),
259                        iteration,
260                    };
261                    let hook_timeout =
262                        Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
263                    match tokio::time::timeout(
264                        hook_timeout,
265                        hook.augment_provider_messages(ctx, messages.clone()),
266                    )
267                    .await
268                    {
269                        Ok(Ok(augmented)) => {
270                            messages = augmented;
271                        }
272                        Ok(Err(err)) => {
273                            self.event_bus.publish(EngineEvent::new(
274                                "memory.context.error",
275                                json!({
276                                    "sessionID": session_id,
277                                    "messageID": user_message_id,
278                                    "iteration": iteration,
279                                    "error": truncate_text(&err.to_string(), 500),
280                                }),
281                            ));
282                        }
283                        Err(_) => {
284                            self.event_bus.publish(EngineEvent::new(
285                                "memory.context.error",
286                                json!({
287                                    "sessionID": session_id,
288                                    "messageID": user_message_id,
289                                    "iteration": iteration,
290                                    "error": format!(
291                                        "prompt context hook timeout after {} ms",
292                                        hook_timeout.as_millis()
293                                    ),
294                                }),
295                            ));
296                        }
297                    }
298                }
299                let all_tools = self.tools.list().await;
300                let mut retrieval_fallback_reason: Option<&'static str> = None;
301                let mut candidate_tools = if retrieval_enabled {
302                    self.tools.retrieve(&text, retrieval_k).await
303                } else {
304                    all_tools.clone()
305                };
306                if retrieval_enabled {
307                    if candidate_tools.is_empty() && !all_tools.is_empty() {
308                        candidate_tools = all_tools.clone();
309                        retrieval_fallback_reason = Some("retrieval_empty_result");
310                    } else if web_research_requested
311                        && has_web_research_tools(&all_tools)
312                        && !has_web_research_tools(&candidate_tools)
313                        && required_write_retry_count == 0
314                    {
315                        candidate_tools = all_tools.clone();
316                        retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
317                    } else if email_delivery_requested
318                        && has_email_action_tools(&all_tools)
319                        && !has_email_action_tools(&candidate_tools)
320                    {
321                        candidate_tools = all_tools.clone();
322                        retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
323                    }
324                }
325                let mut tool_schemas = if !router_enabled {
326                    candidate_tools
327                } else {
328                    match requested_tool_mode {
329                        ToolMode::None => Vec::new(),
330                        ToolMode::Required => select_tool_subset(
331                            candidate_tools,
332                            intent,
333                            &request_tool_allowlist,
334                            iteration > 1,
335                        ),
336                        ToolMode::Auto => {
337                            if !auto_tools_escalated {
338                                Vec::new()
339                            } else {
340                                select_tool_subset(
341                                    candidate_tools,
342                                    intent,
343                                    &request_tool_allowlist,
344                                    iteration > 1,
345                                )
346                            }
347                        }
348                    }
349                };
350                let mut policy_patterns =
351                    request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
352                if let Some(agent_tools) = active_agent.tools.as_ref() {
353                    policy_patterns
354                        .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
355                }
356                let session_allowed_tools = self
357                    .session_allowed_tools
358                    .read()
359                    .await
360                    .get(&session_id)
361                    .cloned()
362                    .unwrap_or_default();
363                policy_patterns.extend(session_allowed_tools.iter().cloned());
364                if !policy_patterns.is_empty() {
365                    let mut included = tool_schemas
366                        .iter()
367                        .map(|schema| normalize_tool_name(&schema.name))
368                        .collect::<HashSet<_>>();
369                    for schema in &all_tools {
370                        let normalized = normalize_tool_name(&schema.name);
371                        if policy_patterns
372                            .iter()
373                            .any(|pattern| tool_name_matches_policy(pattern, &normalized))
374                            && included.insert(normalized)
375                        {
376                            tool_schemas.push(schema.clone());
377                        }
378                    }
379                }
380                if !request_tool_allowlist.is_empty() {
381                    tool_schemas.retain(|schema| {
382                        let tool = normalize_tool_name(&schema.name);
383                        request_tool_allowlist
384                            .iter()
385                            .any(|pattern| tool_name_matches_policy(pattern, &tool))
386                    });
387                }
388                let prewrite_gate = evaluate_prewrite_gate(
389                    requested_write_required,
390                    &requested_prewrite_requirements,
391                    PrewriteProgress {
392                        productive_write_tool_calls_total,
393                        productive_workspace_inspection_total,
394                        productive_concrete_read_total,
395                        productive_web_research_total,
396                        successful_web_research_total,
397                        required_write_retry_count,
398                        unmet_prewrite_repair_retry_count,
399                        prewrite_gate_waived,
400                    },
401                );
402                let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
403                let prewrite_gate_write = prewrite_gate.gate_write;
404                let force_write_only_retry = prewrite_gate.force_write_only_retry;
405                let allow_repair_tools = prewrite_gate.allow_repair_tools;
406                if prewrite_gate_write {
407                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
408                }
409                if requested_prewrite_requirements.repair_on_unmet_requirements
410                    && productive_write_tool_calls_total >= 3
411                {
412                    tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
413                }
414                if allow_repair_tools {
415                    let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
416                    let repair_tools = tool_schemas
417                        .iter()
418                        .filter(|schema| {
419                            tool_matches_unmet_prewrite_repair_requirement(
420                                &schema.name,
421                                &unmet_prewrite_codes,
422                            )
423                        })
424                        .cloned()
425                        .collect::<Vec<_>>();
426                    if !repair_tools.is_empty() {
427                        tool_schemas = repair_tools;
428                    }
429                }
430                if force_write_only_retry && !allow_repair_tools {
431                    tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
432                }
433                if active_agent.tools.is_some() {
434                    tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
435                }
436                tool_schemas.retain(|schema| {
437                    let normalized = normalize_tool_name(&schema.name);
438                    if let Some(server) = mcp_server_from_tool_name(&normalized) {
439                        !blocked_mcp_servers.contains(server)
440                    } else {
441                        true
442                    }
443                });
444                if let Some(allowed_tools) = self
445                    .session_allowed_tools
446                    .read()
447                    .await
448                    .get(&session_id)
449                    .cloned()
450                {
451                    if !allowed_tools.is_empty() {
452                        tool_schemas.retain(|schema| {
453                            let normalized = normalize_tool_name(&schema.name);
454                            any_policy_matches(&allowed_tools, &normalized)
455                        });
456                    }
457                }
458                if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
459                    let detail = validation_err.to_string();
460                    emit_event(
461                        Level::ERROR,
462                        ProcessKind::Engine,
463                        ObservabilityEvent {
464                            event: "provider.call.error",
465                            component: "engine.loop",
466                            correlation_id: correlation_ref,
467                            session_id: Some(&session_id),
468                            run_id: None,
469                            message_id: Some(&user_message_id),
470                            provider_id: Some(provider_id.as_str()),
471                            model_id,
472                            status: Some("failed"),
473                            error_code: Some("TOOL_SCHEMA_INVALID"),
474                            detail: Some(&detail),
475                        },
476                    );
477                    anyhow::bail!("{detail}");
478                }
479                let routing_decision = ToolRoutingDecision {
480                    pass: if auto_tools_escalated { 2 } else { 1 },
481                    mode: match requested_tool_mode {
482                        ToolMode::Auto => default_mode_name(),
483                        ToolMode::None => "none",
484                        ToolMode::Required => "required",
485                    },
486                    intent,
487                    selected_count: tool_schemas.len(),
488                    total_available_count: all_tools.len(),
489                    mcp_included: tool_schemas
490                        .iter()
491                        .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
492                };
493                self.event_bus.publish(EngineEvent::new(
494                    "tool.routing.decision",
495                    json!({
496                        "sessionID": session_id,
497                        "messageID": user_message_id,
498                        "iteration": iteration,
499                        "pass": routing_decision.pass,
500                        "mode": routing_decision.mode,
501                        "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
502                        "selectedToolCount": routing_decision.selected_count,
503                        "totalAvailableTools": routing_decision.total_available_count,
504                        "mcpIncluded": routing_decision.mcp_included,
505                        "retrievalEnabled": retrieval_enabled,
506                        "retrievalK": retrieval_k,
507                        "fallbackToFullTools": retrieval_fallback_reason.is_some(),
508                        "fallbackReason": retrieval_fallback_reason
509                    }),
510                ));
511                let allowed_tool_names = tool_schemas
512                    .iter()
513                    .map(|schema| normalize_tool_name(&schema.name))
514                    .collect::<HashSet<_>>();
515                if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
516                    email_tools_ever_offered = true;
517                }
518                let offered_tool_preview = tool_schemas
519                    .iter()
520                    .take(8)
521                    .map(|schema| normalize_tool_name(&schema.name))
522                    .collect::<Vec<_>>()
523                    .join(", ");
524                self.event_bus.publish(EngineEvent::new(
525                    "provider.call.iteration.start",
526                    json!({
527                        "sessionID": session_id,
528                        "messageID": user_message_id,
529                        "iteration": iteration,
530                        "selectedToolCount": allowed_tool_names.len(),
531                    }),
532                ));
533                let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
534                let provider_connect_timeout =
535                    Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
536                let stream_result = tokio::time::timeout(
537                    provider_connect_timeout,
538                    self.providers.stream_for_provider(
539                        Some(provider_id.as_str()),
540                        Some(model_id_value.as_str()),
541                        messages,
542                        requested_tool_mode.clone(),
543                        Some(tool_schemas),
544                        cancel.clone(),
545                    ),
546                )
547                .await
548                .map_err(|_| {
549                    anyhow::anyhow!(
550                        "provider stream connect timeout after {} ms",
551                        provider_connect_timeout.as_millis()
552                    )
553                })
554                .and_then(|result| result);
555                let stream = match stream_result {
556                    Ok(stream) => stream,
557                    Err(err) => {
558                        let error_text = err.to_string();
559                        let error_code = provider_error_code(&error_text);
560                        let detail = truncate_text(&error_text, 500);
561                        emit_event(
562                            Level::ERROR,
563                            ProcessKind::Engine,
564                            ObservabilityEvent {
565                                event: "provider.call.error",
566                                component: "engine.loop",
567                                correlation_id: correlation_ref,
568                                session_id: Some(&session_id),
569                                run_id: None,
570                                message_id: Some(&user_message_id),
571                                provider_id: Some(provider_id.as_str()),
572                                model_id,
573                                status: Some("failed"),
574                                error_code: Some(error_code),
575                                detail: Some(&detail),
576                            },
577                        );
578                        self.event_bus.publish(EngineEvent::new(
579                            "provider.call.iteration.error",
580                            json!({
581                                "sessionID": session_id,
582                                "messageID": user_message_id,
583                                "iteration": iteration,
584                                "error": detail,
585                            }),
586                        ));
587                        return Err(err);
588                    }
589                };
590                tokio::pin!(stream);
591                completion.clear();
592                let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
593                let mut provider_usage: Option<TokenUsage> = None;
594                let mut accepted_tool_calls_in_cycle = 0usize;
595                let provider_idle_timeout =
596                    Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
597                loop {
598                    let next_chunk_result =
599                        tokio::time::timeout(provider_idle_timeout, stream.next())
600                            .await
601                            .map_err(|_| {
602                                anyhow::anyhow!(
603                                    "provider stream idle timeout after {} ms",
604                                    provider_idle_timeout.as_millis()
605                                )
606                            });
607                    let next_chunk = match next_chunk_result {
608                        Ok(next_chunk) => next_chunk,
609                        Err(err) => {
610                            self.event_bus.publish(EngineEvent::new(
611                                "provider.call.iteration.error",
612                                json!({
613                                    "sessionID": session_id,
614                                    "messageID": user_message_id,
615                                    "iteration": iteration,
616                                    "error": truncate_text(&err.to_string(), 500),
617                                }),
618                            ));
619                            return Err(err);
620                        }
621                    };
622                    let Some(chunk) = next_chunk else {
623                        break;
624                    };
625                    let chunk = match chunk {
626                        Ok(chunk) => chunk,
627                        Err(err) => {
628                            let error_text = err.to_string();
629                            let error_code = provider_error_code(&error_text);
630                            let detail = truncate_text(&error_text, 500);
631                            emit_event(
632                                Level::ERROR,
633                                ProcessKind::Engine,
634                                ObservabilityEvent {
635                                    event: "provider.call.error",
636                                    component: "engine.loop",
637                                    correlation_id: correlation_ref,
638                                    session_id: Some(&session_id),
639                                    run_id: None,
640                                    message_id: Some(&user_message_id),
641                                    provider_id: Some(provider_id.as_str()),
642                                    model_id,
643                                    status: Some("failed"),
644                                    error_code: Some(error_code),
645                                    detail: Some(&detail),
646                                },
647                            );
648                            self.event_bus.publish(EngineEvent::new(
649                                "provider.call.iteration.error",
650                                json!({
651                                    "sessionID": session_id,
652                                    "messageID": user_message_id,
653                                    "iteration": iteration,
654                                    "error": detail,
655                                }),
656                            ));
657                            return Err(anyhow::anyhow!(
658                                "provider stream chunk error: {error_text}"
659                            ));
660                        }
661                    };
662                    match chunk {
663                        StreamChunk::TextDelta(delta) => {
664                            let delta = strip_model_control_markers(&delta);
665                            if delta.trim().is_empty() {
666                                continue;
667                            }
668                            if completion.is_empty() {
669                                emit_event(
670                                    Level::INFO,
671                                    ProcessKind::Engine,
672                                    ObservabilityEvent {
673                                        event: "provider.call.first_byte",
674                                        component: "engine.loop",
675                                        correlation_id: correlation_ref,
676                                        session_id: Some(&session_id),
677                                        run_id: None,
678                                        message_id: Some(&user_message_id),
679                                        provider_id: Some(provider_id.as_str()),
680                                        model_id,
681                                        status: Some("streaming"),
682                                        error_code: None,
683                                        detail: Some("first text delta"),
684                                    },
685                                );
686                            }
687                            completion.push_str(&delta);
688                            let delta = truncate_text(&delta, 4_000);
689                            let delta_part =
690                                WireMessagePart::text(&session_id, &user_message_id, delta.clone());
691                            self.event_bus.publish(EngineEvent::new(
692                                "message.part.updated",
693                                json!({"part": delta_part, "delta": delta}),
694                            ));
695                        }
696                        StreamChunk::ReasoningDelta(_reasoning) => {}
697                        StreamChunk::Done {
698                            finish_reason: _,
699                            usage,
700                        } => {
701                            if usage.is_some() {
702                                provider_usage = usage;
703                            }
704                            break;
705                        }
706                        StreamChunk::ToolCallStart { id, name } => {
707                            let entry = streamed_tool_calls.entry(id).or_default();
708                            if entry.name.is_empty() {
709                                entry.name = name;
710                            }
711                        }
712                        StreamChunk::ToolCallDelta { id, args_delta } => {
713                            let entry = streamed_tool_calls.entry(id.clone()).or_default();
714                            entry.args.push_str(&args_delta);
715                            let tool_name = if entry.name.trim().is_empty() {
716                                "tool".to_string()
717                            } else {
718                                normalize_tool_name(&entry.name)
719                            };
720                            let parsed_preview = if entry.name.trim().is_empty() {
721                                Value::String(truncate_text(&entry.args, 1_000))
722                            } else {
723                                parse_streamed_tool_args(&tool_name, &entry.args)
724                            };
725                            let mut tool_part = WireMessagePart::tool_invocation(
726                                &session_id,
727                                &user_message_id,
728                                tool_name.clone(),
729                                parsed_preview.clone(),
730                            );
731                            tool_part.id = Some(id.clone());
732                            if tool_name == "write" {
733                                tracing::info!(
734                                    session_id = %session_id,
735                                    message_id = %user_message_id,
736                                    tool_call_id = %id,
737                                    args_delta_len = args_delta.len(),
738                                    accumulated_args_len = entry.args.len(),
739                                    parsed_preview_empty = parsed_preview.is_null()
740                                        || parsed_preview.as_object().is_some_and(|value| value.is_empty())
741                                        || parsed_preview
742                                            .as_str()
743                                            .map(|value| value.trim().is_empty())
744                                            .unwrap_or(false),
745                                    "streamed write tool args delta received"
746                                );
747                            }
748                            self.event_bus.publish(EngineEvent::new(
749                                "message.part.updated",
750                                json!({
751                                    "part": tool_part,
752                                    "toolCallDelta": {
753                                        "id": id,
754                                        "tool": tool_name,
755                                        "argsDelta": truncate_text(&args_delta, 1_000),
756                                        "rawArgsPreview": truncate_text(&entry.args, 2_000),
757                                        "parsedArgsPreview": parsed_preview
758                                    }
759                                }),
760                            ));
761                        }
762                        StreamChunk::ToolCallEnd { id: _ } => {}
763                    }
764                    if cancel.is_cancelled() {
765                        break;
766                    }
767                }
768
769                let streamed_tool_call_count = streamed_tool_calls.len();
770                let streamed_tool_call_parse_failed = streamed_tool_calls
771                    .values()
772                    .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
773                let mut tool_calls = streamed_tool_calls
774                    .into_iter()
775                    .filter_map(|(call_id, call)| {
776                        if call.name.trim().is_empty() {
777                            return None;
778                        }
779                        let tool_name = normalize_tool_name(&call.name);
780                        let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
781                        Some(ParsedToolCall {
782                            tool: tool_name,
783                            args: parsed_args,
784                            call_id: Some(call_id),
785                        })
786                    })
787                    .collect::<Vec<_>>();
788                if tool_calls.is_empty() {
789                    tool_calls = parse_tool_invocations_from_response(&completion)
790                        .into_iter()
791                        .map(|(tool, args)| ParsedToolCall {
792                            tool,
793                            args,
794                            call_id: None,
795                        })
796                        .collect::<Vec<_>>();
797                }
798                let provider_tool_parse_failed = tool_calls.is_empty()
799                    && (streamed_tool_call_parse_failed
800                        || (streamed_tool_call_count > 0
801                            && looks_like_unparsed_tool_payload(&completion))
802                        || looks_like_unparsed_tool_payload(&completion));
803                if provider_tool_parse_failed {
804                    latest_required_tool_failure_kind =
805                        RequiredToolFailureKind::ToolCallParseFailed;
806                } else if tool_calls.is_empty() {
807                    latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
808                }
809                if router_enabled
810                    && matches!(requested_tool_mode, ToolMode::Auto)
811                    && !auto_tools_escalated
812                    && iteration == 1
813                    && should_escalate_auto_tools(intent, &text, &completion)
814                {
815                    auto_tools_escalated = true;
816                    followup_context = Some(
817                        "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
818                            .to_string(),
819                    );
820                    self.event_bus.publish(EngineEvent::new(
821                        "provider.call.iteration.finish",
822                        json!({
823                            "sessionID": session_id,
824                            "messageID": user_message_id,
825                            "iteration": iteration,
826                            "finishReason": "auto_escalate",
827                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
828                            "rejectedToolCalls": 0,
829                        }),
830                    ));
831                    continue;
832                }
833                if tool_calls.is_empty()
834                    && !auto_workspace_probe_attempted
835                    && should_force_workspace_probe(&text, &completion)
836                    && allowed_tool_names.contains("glob")
837                {
838                    auto_workspace_probe_attempted = true;
839                    tool_calls = vec![ParsedToolCall {
840                        tool: "glob".to_string(),
841                        args: json!({ "pattern": "*" }),
842                        call_id: None,
843                    }];
844                }
845                if !tool_calls.is_empty() {
846                    let saw_tool_call_candidate = true;
847                    let mut outputs = Vec::new();
848                    let mut executed_productive_tool = false;
849                    let mut write_tool_attempted_in_cycle = false;
850                    let mut auth_required_hit_in_cycle = false;
851                    let mut guard_budget_hit_in_cycle = false;
852                    let mut duplicate_signature_hit_in_cycle = false;
853                    let mut rejected_tool_call_in_cycle = false;
854                    for ParsedToolCall {
855                        tool,
856                        args,
857                        call_id,
858                    } in tool_calls
859                    {
860                        if !agent_can_use_tool(&active_agent, &tool) {
861                            rejected_tool_call_in_cycle = true;
862                            continue;
863                        }
864                        let tool_key = normalize_tool_name(&tool);
865                        if is_workspace_write_tool(&tool_key) {
866                            write_tool_attempted_in_cycle = true;
867                        }
868                        if !allowed_tool_names.contains(&tool_key) {
869                            rejected_tool_call_in_cycle = true;
870                            let note = if offered_tool_preview.is_empty() {
871                                format!(
872                                    "Tool `{}` call skipped: it is not available in this turn.",
873                                    tool_key
874                                )
875                            } else {
876                                format!(
877                                    "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
878                                    tool_key, offered_tool_preview
879                                )
880                            };
881                            self.event_bus.publish(EngineEvent::new(
882                                "tool.call.rejected_unoffered",
883                                json!({
884                                    "sessionID": session_id,
885                                    "messageID": user_message_id,
886                                    "iteration": iteration,
887                                    "tool": tool_key,
888                                    "offeredToolCount": allowed_tool_names.len()
889                                }),
890                            ));
891                            if tool_name_looks_like_email_action(&tool_key) {
892                                latest_email_action_note = Some(note.clone());
893                            }
894                            outputs.push(note);
895                            continue;
896                        }
897                        if let Some(server) = mcp_server_from_tool_name(&tool_key) {
898                            if blocked_mcp_servers.contains(server) {
899                                rejected_tool_call_in_cycle = true;
900                                outputs.push(format!(
901                                    "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
902                                    tool_key, server
903                                ));
904                                continue;
905                            }
906                        }
907                        if tool_key == "question" {
908                            question_tool_used = true;
909                        }
910                        if tool_key == "pack_builder" && pack_builder_executed {
911                            rejected_tool_call_in_cycle = true;
912                            outputs.push(
913                                "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
914                                    .to_string(),
915                            );
916                            continue;
917                        }
918                        if websearch_query_blocked && tool_key == "websearch" {
919                            rejected_tool_call_in_cycle = true;
920                            outputs.push(
921                                "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
922                                    .to_string(),
923                            );
924                            continue;
925                        }
926                        let mut effective_args = args.clone();
927                        if tool_key == "todo_write" {
928                            effective_args = normalize_todo_write_args(effective_args, &completion);
929                            if is_empty_todo_write_args(&effective_args) {
930                                rejected_tool_call_in_cycle = true;
931                                outputs.push(
932                                    "Tool `todo_write` call skipped: empty todo payload."
933                                        .to_string(),
934                                );
935                                continue;
936                            }
937                        }
938                        let signature = if tool_key == "batch" {
939                            batch_tool_signature(&args)
940                                .unwrap_or_else(|| tool_signature(&tool_key, &args))
941                        } else {
942                            tool_signature(&tool_key, &args)
943                        };
944                        if is_shell_tool_name(&tool_key)
945                            && shell_mismatch_signatures.contains(&signature)
946                        {
947                            rejected_tool_call_in_cycle = true;
948                            outputs.push(
949                                "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
950                                    .to_string(),
951                            );
952                            continue;
953                        }
954                        let mut signature_count = 1usize;
955                        if is_read_only_tool(&tool_key)
956                            || (tool_key == "batch" && is_read_only_batch_call(&args))
957                        {
958                            let count = readonly_signature_counts
959                                .entry(signature.clone())
960                                .and_modify(|v| *v = v.saturating_add(1))
961                                .or_insert(1);
962                            signature_count = *count;
963                            if tool_key == "websearch" {
964                                if let Some(limit) = websearch_duplicate_signature_limit {
965                                    if *count > limit {
966                                        rejected_tool_call_in_cycle = true;
967                                        self.event_bus.publish(EngineEvent::new(
968                                            "tool.loop_guard.triggered",
969                                            json!({
970                                                "sessionID": session_id,
971                                                "messageID": user_message_id,
972                                                "tool": tool_key,
973                                                "reason": "duplicate_signature_retry_exhausted",
974                                                "duplicateLimit": limit,
975                                                "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
976                                                "loop_guard_triggered": true
977                                            }),
978                                        ));
979                                        outputs.push(
980                                            "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
981                                                .to_string(),
982                                        );
983                                        continue;
984                                    }
985                                }
986                            }
987                            if tool_key != "websearch" && *count > 1 {
988                                rejected_tool_call_in_cycle = true;
989                                if let Some(cached) = readonly_tool_cache.get(&signature) {
990                                    outputs.push(cached.clone());
991                                } else {
992                                    outputs.push(format!(
993                                        "Tool `{}` call skipped: duplicate call signature detected.",
994                                        tool_key
995                                    ));
996                                }
997                                continue;
998                            }
999                        }
1000                        let is_read_only_signature = is_read_only_tool(&tool_key)
1001                            || (tool_key == "batch" && is_read_only_batch_call(&args));
1002                        if !is_read_only_signature {
1003                            let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1004                            let seen = mutable_signature_counts
1005                                .entry(signature.clone())
1006                                .and_modify(|v| *v = v.saturating_add(1))
1007                                .or_insert(1);
1008                            if *seen > duplicate_limit {
1009                                rejected_tool_call_in_cycle = true;
1010                                self.event_bus.publish(EngineEvent::new(
1011                                    "tool.loop_guard.triggered",
1012                                    json!({
1013                                        "sessionID": session_id,
1014                                        "messageID": user_message_id,
1015                                        "tool": tool_key,
1016                                        "reason": "duplicate_signature_retry_exhausted",
1017                                        "signatureHash": stable_hash(&signature),
1018                                        "duplicateLimit": duplicate_limit,
1019                                        "loop_guard_triggered": true
1020                                    }),
1021                                ));
1022                                outputs.push(format!(
1023                                    "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1024                                    tool_key, duplicate_limit
1025                                ));
1026                                duplicate_signature_hit_in_cycle = true;
1027                                continue;
1028                            }
1029                        }
1030                        let budget = tool_budget_for(&tool_key);
1031                        let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1032                        if *entry >= budget {
1033                            rejected_tool_call_in_cycle = true;
1034                            outputs.push(format!(
1035                                "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1036                                tool_key, budget
1037                            ));
1038                            guard_budget_hit_in_cycle = true;
1039                            continue;
1040                        }
1041                        let mut finalized_part = WireMessagePart::tool_invocation(
1042                            &session_id,
1043                            &user_message_id,
1044                            tool.clone(),
1045                            effective_args.clone(),
1046                        );
1047                        if let Some(call_id) = call_id.clone() {
1048                            finalized_part.id = Some(call_id);
1049                        }
1050                        finalized_part.state = Some("pending".to_string());
1051                        self.event_bus.publish(EngineEvent::new(
1052                            "message.part.updated",
1053                            json!({"part": finalized_part}),
1054                        ));
1055                        *entry += 1;
1056                        accepted_tool_calls_in_cycle =
1057                            accepted_tool_calls_in_cycle.saturating_add(1);
1058                        if let Some(output) = self
1059                            .execute_tool_with_permission(
1060                                &session_id,
1061                                &user_message_id,
1062                                tool,
1063                                effective_args,
1064                                call_id,
1065                                active_agent.skills.as_deref(),
1066                                &text,
1067                                requested_write_required,
1068                                Some(&completion),
1069                                cancel.clone(),
1070                            )
1071                            .await?
1072                        {
1073                            let productive = is_productive_tool_output(&tool_key, &output);
1074                            if output.contains("WEBSEARCH_QUERY_MISSING") {
1075                                websearch_query_blocked = true;
1076                            }
1077                            if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1078                            {
1079                                shell_mismatch_signatures.insert(signature.clone());
1080                            }
1081                            if is_read_only_tool(&tool_key)
1082                                && tool_key != "websearch"
1083                                && signature_count == 1
1084                            {
1085                                readonly_tool_cache.insert(signature, output.clone());
1086                            }
1087                            if productive {
1088                                productive_tool_calls_total =
1089                                    productive_tool_calls_total.saturating_add(1);
1090                                if is_workspace_write_tool(&tool_key) {
1091                                    productive_write_tool_calls_total =
1092                                        productive_write_tool_calls_total.saturating_add(1);
1093                                }
1094                                if is_workspace_inspection_tool(&tool_key) {
1095                                    productive_workspace_inspection_total =
1096                                        productive_workspace_inspection_total.saturating_add(1);
1097                                }
1098                                if tool_key == "read" {
1099                                    productive_concrete_read_total =
1100                                        productive_concrete_read_total.saturating_add(1);
1101                                }
1102                                if is_web_research_tool(&tool_key) {
1103                                    productive_web_research_total =
1104                                        productive_web_research_total.saturating_add(1);
1105                                    if is_successful_web_research_output(&tool_key, &output) {
1106                                        successful_web_research_total =
1107                                            successful_web_research_total.saturating_add(1);
1108                                    }
1109                                }
1110                                executed_productive_tool = true;
1111                                if tool_key == "pack_builder" {
1112                                    pack_builder_executed = true;
1113                                }
1114                            }
1115                            if tool_name_looks_like_email_action(&tool_key) {
1116                                if productive {
1117                                    email_action_executed = true;
1118                                } else {
1119                                    latest_email_action_note =
1120                                        Some(truncate_text(&output, 280).replace('\n', " "));
1121                                }
1122                            }
1123                            if is_auth_required_tool_output(&output) {
1124                                if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1125                                    blocked_mcp_servers.insert(server.to_string());
1126                                }
1127                                auth_required_hit_in_cycle = true;
1128                            }
1129                            outputs.push(output);
1130                            if auth_required_hit_in_cycle {
1131                                break;
1132                            }
1133                            if guard_budget_hit_in_cycle {
1134                                break;
1135                            }
1136                        }
1137                    }
1138                    if !outputs.is_empty() {
1139                        last_tool_outputs = outputs.clone();
1140                        if matches!(requested_tool_mode, ToolMode::Required)
1141                            && productive_tool_calls_total == 0
1142                        {
1143                            latest_required_tool_failure_kind = classify_required_tool_failure(
1144                                &outputs,
1145                                saw_tool_call_candidate,
1146                                accepted_tool_calls_in_cycle,
1147                                provider_tool_parse_failed,
1148                                rejected_tool_call_in_cycle,
1149                            );
1150                            if requested_write_required
1151                                && write_tool_attempted_in_cycle
1152                                && productive_write_tool_calls_total == 0
1153                                && is_write_invalid_args_failure_kind(
1154                                    latest_required_tool_failure_kind,
1155                                )
1156                            {
1157                                if required_write_retry_count + 1 < strict_write_retry_max_attempts
1158                                {
1159                                    required_write_retry_count += 1;
1160                                    required_tool_retry_count += 1;
1161                                    followup_context = Some(build_write_required_retry_context(
1162                                        &offered_tool_preview,
1163                                        latest_required_tool_failure_kind,
1164                                        &text,
1165                                        &requested_prewrite_requirements,
1166                                        productive_workspace_inspection_total > 0,
1167                                        productive_concrete_read_total > 0,
1168                                        productive_web_research_total > 0,
1169                                        successful_web_research_total > 0,
1170                                    ));
1171                                    self.event_bus.publish(EngineEvent::new(
1172                                        "provider.call.iteration.finish",
1173                                        json!({
1174                                            "sessionID": session_id,
1175                                            "messageID": user_message_id,
1176                                            "iteration": iteration,
1177                                            "finishReason": "required_write_invalid_retry",
1178                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1179                                            "rejectedToolCalls": 0,
1180                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1181                                        }),
1182                                    ));
1183                                    continue;
1184                                }
1185                            }
1186                            let progress_made_in_cycle = productive_workspace_inspection_total > 0
1187                                || productive_concrete_read_total > 0
1188                                || productive_web_research_total > 0
1189                                || successful_web_research_total > 0;
1190                            if should_retry_nonproductive_required_tool_cycle(
1191                                requested_write_required,
1192                                write_tool_attempted_in_cycle,
1193                                progress_made_in_cycle,
1194                                required_tool_retry_count,
1195                            ) {
1196                                required_tool_retry_count += 1;
1197                                followup_context =
1198                                    Some(build_required_tool_retry_context_for_task(
1199                                        &offered_tool_preview,
1200                                        latest_required_tool_failure_kind,
1201                                        &text,
1202                                    ));
1203                                self.event_bus.publish(EngineEvent::new(
1204                                    "provider.call.iteration.finish",
1205                                    json!({
1206                                        "sessionID": session_id,
1207                                        "messageID": user_message_id,
1208                                        "iteration": iteration,
1209                                        "finishReason": "required_tool_retry",
1210                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1211                                        "rejectedToolCalls": 0,
1212                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1213                                    }),
1214                                ));
1215                                continue;
1216                            }
1217                            completion = required_tool_mode_unsatisfied_completion(
1218                                latest_required_tool_failure_kind,
1219                            );
1220                            if !required_tool_unsatisfied_emitted {
1221                                required_tool_unsatisfied_emitted = true;
1222                                self.event_bus.publish(EngineEvent::new(
1223                                    "tool.mode.required.unsatisfied",
1224                                    json!({
1225                                        "sessionID": session_id,
1226                                        "messageID": user_message_id,
1227                                        "iteration": iteration,
1228                                        "selectedToolCount": allowed_tool_names.len(),
1229                                        "offeredToolsPreview": offered_tool_preview,
1230                                        "reason": latest_required_tool_failure_kind.code(),
1231                                    }),
1232                                ));
1233                            }
1234                            self.event_bus.publish(EngineEvent::new(
1235                                "provider.call.iteration.finish",
1236                                json!({
1237                                    "sessionID": session_id,
1238                                    "messageID": user_message_id,
1239                                    "iteration": iteration,
1240                                    "finishReason": "required_tool_unsatisfied",
1241                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1242                                    "rejectedToolCalls": 0,
1243                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1244                                }),
1245                            ));
1246                            break;
1247                        }
1248                        let prewrite_gate = evaluate_prewrite_gate(
1249                            requested_write_required,
1250                            &requested_prewrite_requirements,
1251                            PrewriteProgress {
1252                                productive_write_tool_calls_total,
1253                                productive_workspace_inspection_total,
1254                                productive_concrete_read_total,
1255                                productive_web_research_total,
1256                                successful_web_research_total,
1257                                required_write_retry_count,
1258                                unmet_prewrite_repair_retry_count,
1259                                prewrite_gate_waived,
1260                            },
1261                        );
1262                        let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1263                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1264                        if requested_write_required
1265                            && productive_tool_calls_total > 0
1266                            && productive_write_tool_calls_total == 0
1267                        {
1268                            if should_start_prewrite_repair_before_first_write(
1269                                requested_prewrite_requirements.repair_on_unmet_requirements,
1270                                productive_write_tool_calls_total,
1271                                prewrite_satisfied,
1272                                code_workflow_requested,
1273                            ) {
1274                                if unmet_prewrite_repair_retry_count
1275                                    < prewrite_repair_retry_max_attempts()
1276                                {
1277                                    unmet_prewrite_repair_retry_count += 1;
1278                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1279                                    let repair_attempts_remaining =
1280                                        prewrite_repair_retry_max_attempts()
1281                                            .saturating_sub(repair_attempt);
1282                                    followup_context = Some(build_prewrite_repair_retry_context(
1283                                        &offered_tool_preview,
1284                                        latest_required_tool_failure_kind,
1285                                        &text,
1286                                        &requested_prewrite_requirements,
1287                                        productive_workspace_inspection_total > 0,
1288                                        productive_concrete_read_total > 0,
1289                                        productive_web_research_total > 0,
1290                                        successful_web_research_total > 0,
1291                                    ));
1292                                    self.event_bus.publish(EngineEvent::new(
1293                                        "provider.call.iteration.finish",
1294                                        json!({
1295                                            "sessionID": session_id,
1296                                            "messageID": user_message_id,
1297                                            "iteration": iteration,
1298                                            "finishReason": "prewrite_repair_retry",
1299                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300                                            "rejectedToolCalls": 0,
1301                                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302                                            "repair": prewrite_repair_event_payload(
1303                                                repair_attempt,
1304                                                repair_attempts_remaining,
1305                                                &unmet_prewrite_codes,
1306                                                false,
1307                                            ),
1308                                        }),
1309                                    ));
1310                                    continue;
1311                                }
1312                                if !prewrite_gate_waived {
1313                                    if prewrite_gate_strict_mode() {
1314                                        // Strict mode: refuse to waive; emit blocked event and
1315                                        // continue so the gate retains control.
1316                                        self.event_bus.publish(EngineEvent::new(
1317                                            "prewrite.gate.strict_mode.blocked",
1318                                            json!({
1319                                                "sessionID": session_id,
1320                                                "messageID": user_message_id,
1321                                                "iteration": iteration,
1322                                                "unmetCodes": unmet_prewrite_codes,
1323                                            }),
1324                                        ));
1325                                        continue;
1326                                    }
1327                                    prewrite_gate_waived = true;
1328                                    let repair_attempt = unmet_prewrite_repair_retry_count;
1329                                    let repair_attempts_remaining =
1330                                        prewrite_repair_retry_max_attempts()
1331                                            .saturating_sub(repair_attempt);
1332                                    followup_context = Some(build_prewrite_waived_write_context(
1333                                        &text,
1334                                        &unmet_prewrite_codes,
1335                                    ));
1336                                    self.event_bus.publish(EngineEvent::new(
1337                                        "prewrite.gate.waived.write_executed",
1338                                        json!({
1339                                            "sessionID": session_id,
1340                                            "messageID": user_message_id,
1341                                            "unmetCodes": unmet_prewrite_codes,
1342                                        }),
1343                                    ));
1344                                    self.event_bus.publish(EngineEvent::new(
1345                                        "provider.call.iteration.finish",
1346                                        json!({
1347                                            "sessionID": session_id,
1348                                            "messageID": user_message_id,
1349                                            "iteration": iteration,
1350                                            "finishReason": "prewrite_gate_waived",
1351                                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1352                                            "rejectedToolCalls": 0,
1353                                            "prewriteGateWaived": true,
1354                                            "repair": prewrite_repair_event_payload(
1355                                                repair_attempt,
1356                                                repair_attempts_remaining,
1357                                                &unmet_prewrite_codes,
1358                                                true,
1359                                            ),
1360                                        }),
1361                                    ));
1362                                    continue;
1363                                }
1364                            }
1365                            latest_required_tool_failure_kind =
1366                                RequiredToolFailureKind::WriteRequiredNotSatisfied;
1367                            if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1368                                required_write_retry_count += 1;
1369                                followup_context = Some(build_write_required_retry_context(
1370                                    &offered_tool_preview,
1371                                    latest_required_tool_failure_kind,
1372                                    &text,
1373                                    &requested_prewrite_requirements,
1374                                    productive_workspace_inspection_total > 0,
1375                                    productive_concrete_read_total > 0,
1376                                    productive_web_research_total > 0,
1377                                    successful_web_research_total > 0,
1378                                ));
1379                                self.event_bus.publish(EngineEvent::new(
1380                                    "provider.call.iteration.finish",
1381                                    json!({
1382                                        "sessionID": session_id,
1383                                        "messageID": user_message_id,
1384                                        "iteration": iteration,
1385                                        "finishReason": "required_write_retry",
1386                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1387                                        "rejectedToolCalls": 0,
1388                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1389                                    }),
1390                                ));
1391                                continue;
1392                            }
1393                            completion = required_tool_mode_unsatisfied_completion(
1394                                latest_required_tool_failure_kind,
1395                            );
1396                            if !required_tool_unsatisfied_emitted {
1397                                required_tool_unsatisfied_emitted = true;
1398                                self.event_bus.publish(EngineEvent::new(
1399                                    "tool.mode.required.unsatisfied",
1400                                    json!({
1401                                        "sessionID": session_id,
1402                                        "messageID": user_message_id,
1403                                        "iteration": iteration,
1404                                        "selectedToolCount": allowed_tool_names.len(),
1405                                        "offeredToolsPreview": offered_tool_preview,
1406                                        "reason": latest_required_tool_failure_kind.code(),
1407                                    }),
1408                                ));
1409                            }
1410                            self.event_bus.publish(EngineEvent::new(
1411                                "provider.call.iteration.finish",
1412                                json!({
1413                                    "sessionID": session_id,
1414                                    "messageID": user_message_id,
1415                                    "iteration": iteration,
1416                                    "finishReason": "required_write_unsatisfied",
1417                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1418                                    "rejectedToolCalls": 0,
1419                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1420                                }),
1421                            ));
1422                            break;
1423                        }
1424                        if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1425                            if let Some(retry_context) =
1426                                build_invalid_tool_args_retry_context_from_outputs(
1427                                    &outputs,
1428                                    invalid_tool_args_retry_count,
1429                                )
1430                            {
1431                                invalid_tool_args_retry_count += 1;
1432                                followup_context = Some(format!(
1433                                    "Previous tool call arguments were invalid. {}",
1434                                    retry_context
1435                                ));
1436                                self.event_bus.publish(EngineEvent::new(
1437                                    "provider.call.iteration.finish",
1438                                    json!({
1439                                        "sessionID": session_id,
1440                                        "messageID": user_message_id,
1441                                        "iteration": iteration,
1442                                        "finishReason": "invalid_tool_args_retry",
1443                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1444                                        "rejectedToolCalls": 0,
1445                                    }),
1446                                ));
1447                                continue;
1448                            }
1449                        }
1450                        let guard_budget_hit =
1451                            outputs.iter().any(|o| is_guard_budget_tool_output(o));
1452                        if executed_productive_tool {
1453                            let prewrite_gate = evaluate_prewrite_gate(
1454                                requested_write_required,
1455                                &requested_prewrite_requirements,
1456                                PrewriteProgress {
1457                                    productive_write_tool_calls_total,
1458                                    productive_workspace_inspection_total,
1459                                    productive_concrete_read_total,
1460                                    productive_web_research_total,
1461                                    successful_web_research_total,
1462                                    required_write_retry_count,
1463                                    unmet_prewrite_repair_retry_count,
1464                                    prewrite_gate_waived,
1465                                },
1466                            );
1467                            let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1468                            let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1469                            if requested_write_required
1470                                && productive_write_tool_calls_total > 0
1471                                && requested_prewrite_requirements.repair_on_unmet_requirements
1472                                && unmet_prewrite_repair_retry_count
1473                                    < prewrite_repair_retry_max_attempts()
1474                                && !prewrite_satisfied
1475                            {
1476                                unmet_prewrite_repair_retry_count += 1;
1477                                let repair_attempt = unmet_prewrite_repair_retry_count;
1478                                let repair_attempts_remaining =
1479                                    prewrite_repair_retry_max_attempts()
1480                                        .saturating_sub(repair_attempt);
1481                                followup_context = Some(build_prewrite_repair_retry_context(
1482                                    &offered_tool_preview,
1483                                    latest_required_tool_failure_kind,
1484                                    &text,
1485                                    &requested_prewrite_requirements,
1486                                    productive_workspace_inspection_total > 0,
1487                                    productive_concrete_read_total > 0,
1488                                    productive_web_research_total > 0,
1489                                    successful_web_research_total > 0,
1490                                ));
1491                                self.event_bus.publish(EngineEvent::new(
1492                                    "provider.call.iteration.finish",
1493                                    json!({
1494                                        "sessionID": session_id,
1495                                        "messageID": user_message_id,
1496                                        "iteration": iteration,
1497                                        "finishReason": "prewrite_repair_retry",
1498                                        "acceptedToolCalls": accepted_tool_calls_in_cycle,
1499                                        "rejectedToolCalls": 0,
1500                                        "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1501                                        "repair": prewrite_repair_event_payload(
1502                                            repair_attempt,
1503                                            repair_attempts_remaining,
1504                                            &unmet_prewrite_codes,
1505                                            false,
1506                                        ),
1507                                    }),
1508                                ));
1509                                continue;
1510                            }
1511                            followup_context = Some(format!(
1512                                "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1513                                summarize_tool_outputs(&outputs)
1514                            ));
1515                            self.event_bus.publish(EngineEvent::new(
1516                                "provider.call.iteration.finish",
1517                                json!({
1518                                    "sessionID": session_id,
1519                                    "messageID": user_message_id,
1520                                    "iteration": iteration,
1521                                    "finishReason": "tool_followup",
1522                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1523                                    "rejectedToolCalls": 0,
1524                                }),
1525                            ));
1526                            continue;
1527                        }
1528                        if guard_budget_hit {
1529                            completion = summarize_guard_budget_outputs(&outputs)
1530                                .unwrap_or_else(|| {
1531                                    "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1532                                });
1533                        } else if duplicate_signature_hit_in_cycle {
1534                            completion = summarize_duplicate_signature_outputs(&outputs)
1535                                .unwrap_or_else(|| {
1536                                    "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1537                                });
1538                        } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1539                            completion = summary;
1540                        } else {
1541                            completion.clear();
1542                        }
1543                        self.event_bus.publish(EngineEvent::new(
1544                            "provider.call.iteration.finish",
1545                            json!({
1546                                "sessionID": session_id,
1547                                "messageID": user_message_id,
1548                                "iteration": iteration,
1549                                "finishReason": "tool_summary",
1550                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1551                                "rejectedToolCalls": 0,
1552                            }),
1553                        ));
1554                        break;
1555                    } else if matches!(requested_tool_mode, ToolMode::Required) {
1556                        latest_required_tool_failure_kind = classify_required_tool_failure(
1557                            &outputs,
1558                            saw_tool_call_candidate,
1559                            accepted_tool_calls_in_cycle,
1560                            provider_tool_parse_failed,
1561                            rejected_tool_call_in_cycle,
1562                        );
1563                    }
1564                }
1565
1566                {
1567                    let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1568                        if let Some(usage) = provider_usage {
1569                            (
1570                                usage.prompt_tokens,
1571                                usage.completion_tokens,
1572                                usage.total_tokens,
1573                                "provider",
1574                            )
1575                        } else {
1576                            // Provider did not return usage (e.g. streaming without
1577                            // include_usage, or a backend that omits it). Estimate from
1578                            // accumulated char counts using the ~4 chars-per-token heuristic.
1579                            let est_prompt = (estimated_prompt_chars / 4) as u64;
1580                            let est_completion = (completion.len() / 4) as u64;
1581                            tracing::debug!(
1582                                session_id = %session_id,
1583                                provider_id = %provider_id,
1584                                "provider.usage missing from stream; using char-count estimate \
1585                                 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1586                                completion.len()
1587                            );
1588                            (
1589                                est_prompt,
1590                                est_completion,
1591                                est_prompt.saturating_add(est_completion),
1592                                "estimated",
1593                            )
1594                        };
1595                    self.event_bus.publish(EngineEvent::new(
1596                        "provider.usage",
1597                        json!({
1598                            "sessionID": session_id,
1599                            "correlationID": correlation_ref,
1600                            "messageID": user_message_id,
1601                            "promptTokens": prompt_tokens,
1602                            "completionTokens": completion_tokens,
1603                            "totalTokens": total_tokens,
1604                            "usageSource": usage_source,
1605                        }),
1606                    ));
1607                }
1608
1609                if matches!(requested_tool_mode, ToolMode::Required)
1610                    && productive_tool_calls_total == 0
1611                {
1612                    if requested_write_required
1613                        && required_write_retry_count > 0
1614                        && productive_write_tool_calls_total == 0
1615                        && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1616                    {
1617                        latest_required_tool_failure_kind =
1618                            RequiredToolFailureKind::WriteRequiredNotSatisfied;
1619                    }
1620                    if requested_write_required
1621                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1622                    {
1623                        required_write_retry_count += 1;
1624                        followup_context = Some(build_write_required_retry_context(
1625                            &offered_tool_preview,
1626                            latest_required_tool_failure_kind,
1627                            &text,
1628                            &requested_prewrite_requirements,
1629                            productive_workspace_inspection_total > 0,
1630                            productive_concrete_read_total > 0,
1631                            productive_web_research_total > 0,
1632                            successful_web_research_total > 0,
1633                        ));
1634                        continue;
1635                    }
1636                    let progress_made_in_cycle = productive_workspace_inspection_total > 0
1637                        || productive_concrete_read_total > 0
1638                        || productive_web_research_total > 0
1639                        || successful_web_research_total > 0;
1640                    if should_retry_nonproductive_required_tool_cycle(
1641                        requested_write_required,
1642                        false,
1643                        progress_made_in_cycle,
1644                        required_tool_retry_count,
1645                    ) {
1646                        required_tool_retry_count += 1;
1647                        followup_context = Some(build_required_tool_retry_context_for_task(
1648                            &offered_tool_preview,
1649                            latest_required_tool_failure_kind,
1650                            &text,
1651                        ));
1652                        continue;
1653                    }
1654                    completion = required_tool_mode_unsatisfied_completion(
1655                        latest_required_tool_failure_kind,
1656                    );
1657                    if !required_tool_unsatisfied_emitted {
1658                        required_tool_unsatisfied_emitted = true;
1659                        self.event_bus.publish(EngineEvent::new(
1660                            "tool.mode.required.unsatisfied",
1661                            json!({
1662                                "sessionID": session_id,
1663                                "messageID": user_message_id,
1664                                "iteration": iteration,
1665                                "selectedToolCount": allowed_tool_names.len(),
1666                                "offeredToolsPreview": offered_tool_preview,
1667                                "reason": latest_required_tool_failure_kind.code(),
1668                            }),
1669                        ));
1670                    }
1671                    self.event_bus.publish(EngineEvent::new(
1672                        "provider.call.iteration.finish",
1673                        json!({
1674                            "sessionID": session_id,
1675                            "messageID": user_message_id,
1676                            "iteration": iteration,
1677                            "finishReason": "required_tool_unsatisfied",
1678                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1679                            "rejectedToolCalls": 0,
1680                            "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1681                        }),
1682                    ));
1683                } else {
1684                    if completion.trim().is_empty()
1685                        && !last_tool_outputs.is_empty()
1686                        && requested_write_required
1687                        && empty_completion_retry_count == 0
1688                    {
1689                        empty_completion_retry_count += 1;
1690                        followup_context = Some(build_empty_completion_retry_context(
1691                            &offered_tool_preview,
1692                            &text,
1693                            &requested_prewrite_requirements,
1694                            productive_workspace_inspection_total > 0,
1695                            productive_concrete_read_total > 0,
1696                            productive_web_research_total > 0,
1697                            successful_web_research_total > 0,
1698                        ));
1699                        self.event_bus.publish(EngineEvent::new(
1700                            "provider.call.iteration.finish",
1701                            json!({
1702                                "sessionID": session_id,
1703                                "messageID": user_message_id,
1704                                "iteration": iteration,
1705                                "finishReason": "empty_completion_retry",
1706                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1707                                "rejectedToolCalls": 0,
1708                            }),
1709                        ));
1710                        continue;
1711                    }
1712                    let prewrite_gate = evaluate_prewrite_gate(
1713                        requested_write_required,
1714                        &requested_prewrite_requirements,
1715                        PrewriteProgress {
1716                            productive_write_tool_calls_total,
1717                            productive_workspace_inspection_total,
1718                            productive_concrete_read_total,
1719                            productive_web_research_total,
1720                            successful_web_research_total,
1721                            required_write_retry_count,
1722                            unmet_prewrite_repair_retry_count,
1723                            prewrite_gate_waived,
1724                        },
1725                    );
1726                    if should_start_prewrite_repair_before_first_write(
1727                        requested_prewrite_requirements.repair_on_unmet_requirements,
1728                        productive_write_tool_calls_total,
1729                        prewrite_gate.prewrite_satisfied,
1730                        code_workflow_requested,
1731                    ) && !prewrite_gate_waived
1732                    {
1733                        let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1734                        if unmet_prewrite_repair_retry_count < prewrite_repair_retry_max_attempts()
1735                        {
1736                            unmet_prewrite_repair_retry_count += 1;
1737                            let repair_attempt = unmet_prewrite_repair_retry_count;
1738                            let repair_attempts_remaining =
1739                                prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
1740                            followup_context = Some(build_prewrite_repair_retry_context(
1741                                &offered_tool_preview,
1742                                latest_required_tool_failure_kind,
1743                                &text,
1744                                &requested_prewrite_requirements,
1745                                productive_workspace_inspection_total > 0,
1746                                productive_concrete_read_total > 0,
1747                                productive_web_research_total > 0,
1748                                successful_web_research_total > 0,
1749                            ));
1750                            self.event_bus.publish(EngineEvent::new(
1751                                "provider.call.iteration.finish",
1752                                json!({
1753                                    "sessionID": session_id,
1754                                    "messageID": user_message_id,
1755                                    "iteration": iteration,
1756                                    "finishReason": "prewrite_repair_retry",
1757                                    "acceptedToolCalls": accepted_tool_calls_in_cycle,
1758                                    "rejectedToolCalls": 0,
1759                                    "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1760                                    "repair": prewrite_repair_event_payload(
1761                                        repair_attempt,
1762                                        repair_attempts_remaining,
1763                                        &unmet_prewrite_codes,
1764                                        false,
1765                                    ),
1766                                }),
1767                            ));
1768                            continue;
1769                        }
1770                        if prewrite_gate_strict_mode() {
1771                            self.event_bus.publish(EngineEvent::new(
1772                                "prewrite.gate.strict_mode.blocked",
1773                                json!({
1774                                    "sessionID": session_id,
1775                                    "messageID": user_message_id,
1776                                    "iteration": iteration,
1777                                    "unmetCodes": unmet_prewrite_codes,
1778                                }),
1779                            ));
1780                            continue;
1781                        }
1782                        prewrite_gate_waived = true;
1783                        let repair_attempt = unmet_prewrite_repair_retry_count;
1784                        let repair_attempts_remaining =
1785                            prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
1786                        followup_context = Some(build_prewrite_waived_write_context(
1787                            &text,
1788                            &unmet_prewrite_codes,
1789                        ));
1790                        self.event_bus.publish(EngineEvent::new(
1791                            "prewrite.gate.waived.write_executed",
1792                            json!({
1793                                "sessionID": session_id,
1794                                "messageID": user_message_id,
1795                                "unmetCodes": unmet_prewrite_codes,
1796                            }),
1797                        ));
1798                        self.event_bus.publish(EngineEvent::new(
1799                            "provider.call.iteration.finish",
1800                            json!({
1801                                "sessionID": session_id,
1802                                "messageID": user_message_id,
1803                                "iteration": iteration,
1804                                "finishReason": "prewrite_gate_waived",
1805                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1806                                "rejectedToolCalls": 0,
1807                                "prewriteGateWaived": true,
1808                                "repair": prewrite_repair_event_payload(
1809                                    repair_attempt,
1810                                    repair_attempts_remaining,
1811                                    &unmet_prewrite_codes,
1812                                    true,
1813                                ),
1814                            }),
1815                        ));
1816                        continue;
1817                    }
1818                    if prewrite_gate_waived
1819                        && requested_write_required
1820                        && productive_write_tool_calls_total == 0
1821                        && required_write_retry_count + 1 < strict_write_retry_max_attempts
1822                    {
1823                        required_write_retry_count += 1;
1824                        followup_context = Some(build_write_required_retry_context(
1825                            &offered_tool_preview,
1826                            latest_required_tool_failure_kind,
1827                            &text,
1828                            &requested_prewrite_requirements,
1829                            productive_workspace_inspection_total > 0,
1830                            productive_concrete_read_total > 0,
1831                            productive_web_research_total > 0,
1832                            successful_web_research_total > 0,
1833                        ));
1834                        self.event_bus.publish(EngineEvent::new(
1835                            "provider.call.iteration.finish",
1836                            json!({
1837                                "sessionID": session_id,
1838                                "messageID": user_message_id,
1839                                "iteration": iteration,
1840                                "finishReason": "waived_write_retry",
1841                                "acceptedToolCalls": accepted_tool_calls_in_cycle,
1842                                "rejectedToolCalls": 0,
1843                            }),
1844                        ));
1845                        continue;
1846                    }
1847                    self.event_bus.publish(EngineEvent::new(
1848                        "provider.call.iteration.finish",
1849                        json!({
1850                            "sessionID": session_id,
1851                            "messageID": user_message_id,
1852                            "iteration": iteration,
1853                            "finishReason": "provider_completion",
1854                            "acceptedToolCalls": accepted_tool_calls_in_cycle,
1855                            "rejectedToolCalls": 0,
1856                        }),
1857                    ));
1858                }
1859                break;
1860            }
1861            if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
1862            {
1863                completion =
1864                    required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
1865                if !required_tool_unsatisfied_emitted {
1866                    self.event_bus.publish(EngineEvent::new(
1867                        "tool.mode.required.unsatisfied",
1868                        json!({
1869                            "sessionID": session_id,
1870                            "messageID": user_message_id,
1871                            "selectedToolCount": tool_call_counts.len(),
1872                            "reason": latest_required_tool_failure_kind.code(),
1873                        }),
1874                    ));
1875                }
1876            }
1877            if completion.trim().is_empty()
1878                && !last_tool_outputs.is_empty()
1879                && requested_write_required
1880                && productive_write_tool_calls_total > 0
1881            {
1882                let final_prewrite_satisfied = evaluate_prewrite_gate(
1883                    requested_write_required,
1884                    &requested_prewrite_requirements,
1885                    PrewriteProgress {
1886                        productive_write_tool_calls_total,
1887                        productive_workspace_inspection_total,
1888                        productive_concrete_read_total,
1889                        productive_web_research_total,
1890                        successful_web_research_total,
1891                        required_write_retry_count,
1892                        unmet_prewrite_repair_retry_count,
1893                        prewrite_gate_waived,
1894                    },
1895                )
1896                .prewrite_satisfied;
1897                completion = synthesize_artifact_write_completion_from_tool_state(
1898                    &text,
1899                    final_prewrite_satisfied,
1900                    prewrite_gate_waived,
1901                );
1902            }
1903            if completion.trim().is_empty()
1904                && !last_tool_outputs.is_empty()
1905                && should_generate_post_tool_final_narrative(
1906                    requested_tool_mode,
1907                    productive_tool_calls_total,
1908                )
1909            {
1910                if let Some(narrative) = self
1911                    .generate_final_narrative_without_tools(
1912                        &session_id,
1913                        &active_agent,
1914                        Some(provider_id.as_str()),
1915                        Some(model_id_value.as_str()),
1916                        cancel.clone(),
1917                        &last_tool_outputs,
1918                    )
1919                    .await
1920                {
1921                    completion = narrative;
1922                }
1923            }
1924            if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1925                if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
1926                    completion = summary;
1927                } else if let Some(hint) =
1928                    summarize_terminal_tool_failure_for_user(&last_tool_outputs)
1929                {
1930                    completion = hint;
1931                } else {
1932                    let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
1933                    if preview.trim().is_empty() {
1934                        completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
1935                    } else {
1936                        completion = format!(
1937                            "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
1938                            preview
1939                        );
1940                    }
1941                }
1942            }
1943            if completion.trim().is_empty() {
1944                completion =
1945                    "I couldn't produce a final response for that run. Please retry your request."
1946                        .to_string();
1947            }
1948            // M-3: Gate fires when email was requested AND email-action tools were
1949            // actually offered to the agent during at least one iteration but no
1950            // email action tool was executed. The completion text is NOT consulted —
1951            // this prevents the model from bypassing the gate by rephrasing, and
1952            // prevents false positives on legitimate text containing email keywords.
1953            // Skipping when no email tools were ever offered avoids clobbering
1954            // legitimate output with a delivery-failure message the agent could not
1955            // have avoided (e.g. prompts that mention gmail tool names as context).
1956            if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
1957                let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
1958                    .to_string();
1959                if let Some(note) = latest_email_action_note.as_ref() {
1960                    fallback.push_str("\n\nLast email tool status: ");
1961                    fallback.push_str(note);
1962                }
1963                fallback.push_str(
1964                    "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
1965                );
1966                completion = fallback;
1967            }
1968            completion = strip_model_control_markers(&completion);
1969            truncate_text(&completion, 16_000)
1970        };
1971        emit_event(
1972            Level::INFO,
1973            ProcessKind::Engine,
1974            ObservabilityEvent {
1975                event: "provider.call.finish",
1976                component: "engine.loop",
1977                correlation_id: correlation_ref,
1978                session_id: Some(&session_id),
1979                run_id: None,
1980                message_id: Some(&user_message_id),
1981                provider_id: Some(provider_id.as_str()),
1982                model_id,
1983                status: Some("ok"),
1984                error_code: None,
1985                detail: Some("provider stream complete"),
1986            },
1987        );
1988        if active_agent.name.eq_ignore_ascii_case("plan") {
1989            emit_plan_todo_fallback(
1990                self.storage.clone(),
1991                &self.event_bus,
1992                &session_id,
1993                &user_message_id,
1994                &completion,
1995            )
1996            .await;
1997            let todos_after_fallback = self.storage.get_todos(&session_id).await;
1998            if todos_after_fallback.is_empty() && !question_tool_used {
1999                emit_plan_question_fallback(
2000                    self.storage.clone(),
2001                    &self.event_bus,
2002                    &session_id,
2003                    &user_message_id,
2004                    &completion,
2005                )
2006                .await;
2007            }
2008        }
2009        if cancel.is_cancelled() {
2010            self.event_bus.publish(EngineEvent::new(
2011                "session.status",
2012                json!({"sessionID": session_id, "status":"cancelled"}),
2013            ));
2014            self.cancellations.remove(&session_id).await;
2015            return Ok(());
2016        }
2017        let assistant = Message::new(
2018            MessageRole::Assistant,
2019            vec![MessagePart::Text {
2020                text: completion.clone(),
2021            }],
2022        );
2023        let assistant_message_id = assistant.id.clone();
2024        self.storage.append_message(&session_id, assistant).await?;
2025        let final_part = WireMessagePart::text(
2026            &session_id,
2027            &assistant_message_id,
2028            truncate_text(&completion, 16_000),
2029        );
2030        self.event_bus.publish(EngineEvent::new(
2031            "message.part.updated",
2032            json!({"part": final_part}),
2033        ));
2034        self.event_bus.publish(EngineEvent::new(
2035            "session.updated",
2036            json!({"sessionID": session_id, "status":"idle"}),
2037        ));
2038        self.event_bus.publish(EngineEvent::new(
2039            "session.status",
2040            json!({"sessionID": session_id, "status":"idle"}),
2041        ));
2042        self.cancellations.remove(&session_id).await;
2043        Ok(())
2044    }
2045}