1use super::*;
2
3impl EngineLoop {
4 pub async fn run_prompt_async_with_context(
5 &self,
6 session_id: String,
7 req: SendMessageRequest,
8 correlation_id: Option<String>,
9 ) -> anyhow::Result<()> {
10 let session_model = self
11 .storage
12 .get_session(&session_id)
13 .await
14 .and_then(|s| s.model);
15 let (provider_id, model_id_value) =
16 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
17 anyhow::anyhow!(
18 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
19 )
20 })?;
21 let correlation_ref = correlation_id.as_deref();
22 let model_id = Some(model_id_value.as_str());
23 let cancel = self.cancellations.create(&session_id).await;
24 emit_event(
25 Level::INFO,
26 ProcessKind::Engine,
27 ObservabilityEvent {
28 event: "provider.call.start",
29 component: "engine.loop",
30 correlation_id: correlation_ref,
31 session_id: Some(&session_id),
32 run_id: None,
33 message_id: None,
34 provider_id: Some(provider_id.as_str()),
35 model_id,
36 status: Some("start"),
37 error_code: None,
38 detail: Some("run_prompt_async dispatch"),
39 },
40 );
41 self.event_bus.publish(EngineEvent::new(
42 "session.status",
43 json!({"sessionID": session_id, "status":"running"}),
44 ));
45 let request_parts = req.parts.clone();
46 let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
47 let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
48 let requested_write_required = req.write_required.unwrap_or(false);
49 let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
50 let prewrite_repair_budget = prewrite_repair_retry_budget(&requested_prewrite_requirements);
51 let prewrite_fail_closed = prewrite_gate_strict_mode(&requested_prewrite_requirements);
52 let request_tool_allowlist = req
53 .tool_allowlist
54 .clone()
55 .unwrap_or_default()
56 .into_iter()
57 .map(|tool| normalize_tool_name(&tool))
58 .filter(|tool| !tool.trim().is_empty())
59 .collect::<HashSet<_>>();
60 let required_mcp_tools_before_write =
61 concrete_mcp_tools_required_before_write(&request_tool_allowlist);
62 if !request_tool_allowlist.is_empty() {
65 self.set_session_allowed_tools(
66 &session_id,
67 request_tool_allowlist.iter().cloned().collect(),
68 )
69 .await;
70 }
71 let text = req
72 .parts
73 .iter()
74 .map(|p| match p {
75 MessagePartInput::Text { text } => text.clone(),
76 MessagePartInput::File {
77 mime,
78 filename,
79 url,
80 } => format!(
81 "[file mime={} name={} url={}]",
82 mime,
83 filename.clone().unwrap_or_else(|| "unknown".to_string()),
84 url
85 ),
86 })
87 .collect::<Vec<_>>()
88 .join("\n");
89 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
90 self.auto_rename_session_from_user_text(&session_id, &text)
91 .await;
92 let active_agent = self.agents.get(req.agent.as_deref()).await;
93 let mut user_message_id = self
94 .find_recent_matching_user_message_id(&session_id, &text)
95 .await;
96 if user_message_id.is_none() {
97 let user_message = Message::new(
98 MessageRole::User,
99 vec![MessagePart::Text { text: text.clone() }],
100 );
101 let created_message_id = user_message.id.clone();
102 self.storage
103 .append_message(&session_id, user_message)
104 .await?;
105
106 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
107 self.event_bus.publish(EngineEvent::new(
108 "message.part.updated",
109 json!({
110 "part": user_part,
111 "delta": text,
112 "agent": active_agent.name
113 }),
114 ));
115 user_message_id = Some(created_message_id);
116 }
117 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
118
119 if cancel.is_cancelled() {
120 self.event_bus.publish(EngineEvent::new(
121 "session.status",
122 json!({"sessionID": session_id, "status":"cancelled"}),
123 ));
124 self.cancellations.remove(&session_id).await;
125 return Ok(());
126 }
127
128 let mut question_tool_used = false;
129 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
130 if normalize_tool_name(&tool) == "question" {
131 question_tool_used = true;
132 }
133 if !agent_can_use_tool(&active_agent, &tool) {
134 format!(
135 "Tool `{tool}` is not enabled for agent `{}`.",
136 active_agent.name
137 )
138 } else {
139 match self
140 .execute_tool_with_permission(
141 &session_id,
142 &user_message_id,
143 tool.clone(),
144 args,
145 None,
146 active_agent.skills.as_deref(),
147 &text,
148 requested_write_required,
149 None,
150 cancel.clone(),
151 )
152 .await
153 {
154 Ok(output) => output.unwrap_or_default(),
155 Err(err) => {
156 self.mark_session_run_failed(&session_id, &err.to_string())
157 .await;
158 return Err(err);
159 }
160 }
161 }
162 } else {
163 let mut completion = String::new();
164 let mut max_iterations = max_tool_iterations();
165 let mut followup_context: Option<String> = None;
166 let mut last_tool_outputs: Vec<String> = Vec::new();
167 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
168 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
169 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
170 let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
171 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
172 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
173 let mut websearch_query_blocked = false;
174 let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
175 let mut pack_builder_executed = false;
176 let mut auto_workspace_probe_attempted = false;
177 let mut productive_tool_calls_total = 0usize;
178 let mut productive_write_tool_calls_total = 0usize;
179 let mut productive_workspace_inspection_total = 0usize;
180 let mut productive_web_research_total = 0usize;
181 let mut productive_concrete_read_total = 0usize;
182 let mut successful_web_research_total = 0usize;
183 let mut required_tool_retry_count = 0usize;
184 let mut required_write_retry_count = 0usize;
185 let mut unmet_prewrite_repair_retry_count = 0usize;
186 let mut empty_completion_retry_count = 0usize;
187 let mut prewrite_gate_waived = false;
188 let mut invalid_tool_args_retry_count = 0usize;
189 let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
190 let mut required_tool_unsatisfied_emitted = false;
191 let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
192 let email_delivery_requested = requires_email_delivery_prompt(&text);
193 let web_research_requested = requires_web_research_prompt(&text);
194 let code_workflow_requested = infer_code_workflow_from_text(&text);
195 let mut email_action_executed = false;
196 let mut latest_email_action_note: Option<String> = None;
197 let mut email_tools_ever_offered = false;
198 let intent = classify_intent(&text);
199 let router_enabled = tool_router_enabled();
200 let retrieval_enabled = semantic_tool_retrieval_enabled();
201 let retrieval_k = semantic_tool_retrieval_k();
202 let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
203 self.tools.mcp_server_names().await
204 } else {
205 Vec::new()
206 };
207 let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
208 let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
209 && runtime_attachments.is_empty()
210 && is_short_simple_prompt(&text)
211 && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
212
213 while max_iterations > 0 && !cancel.is_cancelled() {
214 let iteration = 26usize.saturating_sub(max_iterations);
215 max_iterations -= 1;
216 let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
217 ChatHistoryProfile::Full
218 } else if matches!(requested_context_mode, ContextMode::Compact)
219 || context_is_auto_compact
220 {
221 ChatHistoryProfile::Compact
222 } else {
223 ChatHistoryProfile::Standard
224 };
225 let mut messages =
226 load_chat_history(self.storage.clone(), &session_id, context_profile).await;
227 if iteration == 1 && !runtime_attachments.is_empty() {
228 attach_to_last_user_message(&mut messages, &runtime_attachments);
229 }
230 let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
231 self.event_bus.publish(EngineEvent::new(
232 "context.profile.selected",
233 json!({
234 "sessionID": session_id,
235 "messageID": user_message_id,
236 "iteration": iteration,
237 "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
238 "historyMessageCount": messages.len(),
239 "historyCharCount": history_char_count,
240 "memoryInjected": false
241 }),
242 ));
243 let mut system_parts = vec![tandem_runtime_system_prompt(
244 &self.host_runtime_context,
245 &mcp_server_names,
246 )];
247 if let Some(system) = active_agent.system_prompt.as_ref() {
248 system_parts.push(system.clone());
249 }
250 messages.insert(
251 0,
252 ChatMessage {
253 role: "system".to_string(),
254 content: system_parts.join("\n\n"),
255 attachments: Vec::new(),
256 },
257 );
258 if let Some(extra) = followup_context.take() {
259 messages.push(ChatMessage {
260 role: "user".to_string(),
261 content: extra,
262 attachments: Vec::new(),
263 });
264 }
265 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
266 let ctx = PromptContextHookContext {
267 session_id: session_id.clone(),
268 message_id: user_message_id.clone(),
269 provider_id: provider_id.clone(),
270 model_id: model_id_value.clone(),
271 iteration,
272 };
273 let hook_timeout =
274 Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
275 match tokio::time::timeout(
276 hook_timeout,
277 hook.augment_provider_messages(ctx, messages.clone()),
278 )
279 .await
280 {
281 Ok(Ok(augmented)) => {
282 messages = augmented;
283 }
284 Ok(Err(err)) => {
285 self.event_bus.publish(EngineEvent::new(
286 "memory.context.error",
287 json!({
288 "sessionID": session_id,
289 "messageID": user_message_id,
290 "iteration": iteration,
291 "error": truncate_text(&err.to_string(), 500),
292 }),
293 ));
294 }
295 Err(_) => {
296 self.event_bus.publish(EngineEvent::new(
297 "memory.context.error",
298 json!({
299 "sessionID": session_id,
300 "messageID": user_message_id,
301 "iteration": iteration,
302 "error": format!(
303 "prompt context hook timeout after {} ms",
304 hook_timeout.as_millis()
305 ),
306 }),
307 ));
308 }
309 }
310 }
311 let all_tools = self.tools.list().await;
312 let mut retrieval_fallback_reason: Option<&'static str> = None;
313 let mut candidate_tools = if retrieval_enabled {
314 self.tools.retrieve(&text, retrieval_k).await
315 } else {
316 all_tools.clone()
317 };
318 if retrieval_enabled {
319 if candidate_tools.is_empty() && !all_tools.is_empty() {
320 candidate_tools = all_tools.clone();
321 retrieval_fallback_reason = Some("retrieval_empty_result");
322 } else if web_research_requested
323 && has_web_research_tools(&all_tools)
324 && !has_web_research_tools(&candidate_tools)
325 && required_write_retry_count == 0
326 {
327 candidate_tools = all_tools.clone();
328 retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
329 } else if email_delivery_requested
330 && has_email_action_tools(&all_tools)
331 && !has_email_action_tools(&candidate_tools)
332 {
333 candidate_tools = all_tools.clone();
334 retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
335 }
336 }
337 let mut tool_schemas = if !router_enabled {
338 candidate_tools
339 } else {
340 match requested_tool_mode {
341 ToolMode::None => Vec::new(),
342 ToolMode::Required => select_tool_subset(
343 candidate_tools,
344 intent,
345 &request_tool_allowlist,
346 iteration > 1,
347 ),
348 ToolMode::Auto => {
349 if !auto_tools_escalated {
350 Vec::new()
351 } else {
352 select_tool_subset(
353 candidate_tools,
354 intent,
355 &request_tool_allowlist,
356 iteration > 1,
357 )
358 }
359 }
360 }
361 };
362 let mut policy_patterns =
363 request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
364 if let Some(agent_tools) = active_agent.tools.as_ref() {
365 policy_patterns
366 .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
367 }
368 let session_allowed_tools = self
369 .session_allowed_tools
370 .read()
371 .await
372 .get(&session_id)
373 .cloned()
374 .unwrap_or_default();
375 policy_patterns.extend(session_allowed_tools.iter().cloned());
376 if !policy_patterns.is_empty() {
377 let mut included = tool_schemas
378 .iter()
379 .map(|schema| normalize_tool_name(&schema.name))
380 .collect::<HashSet<_>>();
381 for schema in &all_tools {
382 let normalized = normalize_tool_name(&schema.name);
383 if policy_patterns
384 .iter()
385 .any(|pattern| tool_name_matches_policy(pattern, &normalized))
386 && included.insert(normalized)
387 {
388 tool_schemas.push(schema.clone());
389 }
390 }
391 }
392 if !request_tool_allowlist.is_empty() {
393 tool_schemas.retain(|schema| {
394 let tool = normalize_tool_name(&schema.name);
395 request_tool_allowlist
396 .iter()
397 .any(|pattern| tool_name_matches_policy(pattern, &tool))
398 });
399 }
400 let prewrite_gate = evaluate_prewrite_gate(
401 requested_write_required,
402 &requested_prewrite_requirements,
403 PrewriteProgress {
404 productive_write_tool_calls_total,
405 productive_workspace_inspection_total,
406 productive_concrete_read_total,
407 productive_web_research_total,
408 successful_web_research_total,
409 required_write_retry_count,
410 unmet_prewrite_repair_retry_count,
411 prewrite_gate_waived,
412 },
413 );
414 let _prewrite_satisfied = prewrite_gate.prewrite_satisfied;
415 let prewrite_gate_write = prewrite_gate.gate_write;
416 let force_write_only_retry = prewrite_gate.force_write_only_retry;
417 let allow_repair_tools = prewrite_gate.allow_repair_tools;
418 let required_mcp_tool_pending = has_unattempted_required_mcp_tool(
419 &required_mcp_tools_before_write,
420 &tool_call_counts,
421 );
422 if prewrite_gate_write {
423 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
424 }
425 if requested_prewrite_requirements.repair_on_unmet_requirements
426 && productive_write_tool_calls_total >= 3
427 {
428 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
429 }
430 if allow_repair_tools {
431 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
432 let repair_tools = tool_schemas
433 .iter()
434 .filter(|schema| {
435 tool_matches_unmet_prewrite_repair_requirement(
436 &schema.name,
437 &unmet_prewrite_codes,
438 productive_workspace_inspection_total > 0,
439 )
440 })
441 .cloned()
442 .collect::<Vec<_>>();
443 if !repair_tools.is_empty() {
444 tool_schemas = repair_tools;
445 }
446 }
447 if force_write_only_retry && !allow_repair_tools && !required_mcp_tool_pending {
448 tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
449 }
450 if active_agent.tools.is_some() {
451 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
452 }
453 tool_schemas.retain(|schema| {
454 let normalized = normalize_tool_name(&schema.name);
455 if let Some(server) = mcp_server_from_tool_name(&normalized) {
456 !blocked_mcp_servers.contains(server)
457 } else {
458 true
459 }
460 });
461 if let Some(allowed_tools) = self
462 .session_allowed_tools
463 .read()
464 .await
465 .get(&session_id)
466 .cloned()
467 {
468 if !allowed_tools.is_empty() {
469 tool_schemas.retain(|schema| {
470 let normalized = normalize_tool_name(&schema.name);
471 any_policy_matches(&allowed_tools, &normalized)
472 });
473 }
474 }
475 if required_mcp_tool_pending {
476 tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
477 }
478 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
479 let detail = validation_err.to_string();
480 emit_event(
481 Level::ERROR,
482 ProcessKind::Engine,
483 ObservabilityEvent {
484 event: "provider.call.error",
485 component: "engine.loop",
486 correlation_id: correlation_ref,
487 session_id: Some(&session_id),
488 run_id: None,
489 message_id: Some(&user_message_id),
490 provider_id: Some(provider_id.as_str()),
491 model_id,
492 status: Some("failed"),
493 error_code: Some("TOOL_SCHEMA_INVALID"),
494 detail: Some(&detail),
495 },
496 );
497 anyhow::bail!("{detail}");
498 }
499 let routing_decision = ToolRoutingDecision {
500 pass: if auto_tools_escalated { 2 } else { 1 },
501 mode: match requested_tool_mode {
502 ToolMode::Auto => default_mode_name(),
503 ToolMode::None => "none",
504 ToolMode::Required => "required",
505 },
506 intent,
507 selected_count: tool_schemas.len(),
508 total_available_count: all_tools.len(),
509 mcp_included: tool_schemas
510 .iter()
511 .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
512 };
513 self.event_bus.publish(EngineEvent::new(
514 "tool.routing.decision",
515 json!({
516 "sessionID": session_id,
517 "messageID": user_message_id,
518 "iteration": iteration,
519 "pass": routing_decision.pass,
520 "mode": routing_decision.mode,
521 "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
522 "selectedToolCount": routing_decision.selected_count,
523 "totalAvailableTools": routing_decision.total_available_count,
524 "mcpIncluded": routing_decision.mcp_included,
525 "retrievalEnabled": retrieval_enabled,
526 "retrievalK": retrieval_k,
527 "fallbackToFullTools": retrieval_fallback_reason.is_some(),
528 "fallbackReason": retrieval_fallback_reason
529 }),
530 ));
531 let allowed_tool_names = tool_schemas
532 .iter()
533 .map(|schema| normalize_tool_name(&schema.name))
534 .collect::<HashSet<_>>();
535 if !email_tools_ever_offered && has_email_action_tools(&tool_schemas) {
536 email_tools_ever_offered = true;
537 }
538 let offered_tool_preview = tool_schemas
539 .iter()
540 .take(8)
541 .map(|schema| normalize_tool_name(&schema.name))
542 .collect::<Vec<_>>()
543 .join(", ");
544 self.event_bus.publish(EngineEvent::new(
545 "provider.call.iteration.start",
546 json!({
547 "sessionID": session_id,
548 "messageID": user_message_id,
549 "iteration": iteration,
550 "selectedToolCount": allowed_tool_names.len(),
551 }),
552 ));
553 let estimated_prompt_chars: usize = messages.iter().map(|m| m.content.len()).sum();
554 let provider_connect_timeout =
555 Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
556 let provider_idle_timeout =
557 Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
558 let provider_stream_retry_budget = provider_stream_decode_retry_attempts();
559 let mut provider_stream_retry_count = 0usize;
560 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
561 let mut provider_usage: Option<TokenUsage>;
562 let mut accepted_tool_calls_in_cycle: usize;
563 'provider_stream_attempt: loop {
564 completion.clear();
565 streamed_tool_calls.clear();
566 provider_usage = None;
567 accepted_tool_calls_in_cycle = 0;
568 let stream_result = tokio::time::timeout(
569 provider_connect_timeout,
570 self.providers.stream_for_provider(
571 Some(provider_id.as_str()),
572 Some(model_id_value.as_str()),
573 messages.clone(),
574 requested_tool_mode.clone(),
575 Some(tool_schemas.clone()),
576 cancel.clone(),
577 ),
578 )
579 .await
580 .map_err(|_| {
581 anyhow::anyhow!(
582 "provider stream connect timeout after {} ms",
583 provider_connect_timeout.as_millis()
584 )
585 })
586 .and_then(|result| result);
587 let stream = match stream_result {
588 Ok(stream) => stream,
589 Err(err) => {
590 let error_text = err.to_string();
591 if is_transient_provider_stream_error(&error_text)
592 && provider_stream_retry_count < provider_stream_retry_budget
593 {
594 provider_stream_retry_count =
595 provider_stream_retry_count.saturating_add(1);
596 let detail = truncate_text(&error_text, 500);
597 self.event_bus.publish(EngineEvent::new(
598 "provider.call.iteration.retry",
599 json!({
600 "sessionID": session_id,
601 "messageID": user_message_id,
602 "providerID": provider_id,
603 "modelID": model_id_value,
604 "iteration": iteration,
605 "error": detail,
606 "retry": provider_stream_retry_count,
607 "maxRetries": provider_stream_retry_budget,
608 }),
609 ));
610 continue 'provider_stream_attempt;
611 }
612 let error_code = provider_error_code(&error_text);
613 let detail = truncate_text(&error_text, 500);
614 emit_event(
615 Level::ERROR,
616 ProcessKind::Engine,
617 ObservabilityEvent {
618 event: "provider.call.error",
619 component: "engine.loop",
620 correlation_id: correlation_ref,
621 session_id: Some(&session_id),
622 run_id: None,
623 message_id: Some(&user_message_id),
624 provider_id: Some(provider_id.as_str()),
625 model_id,
626 status: Some("failed"),
627 error_code: Some(error_code),
628 detail: Some(&detail),
629 },
630 );
631 self.event_bus.publish(EngineEvent::new(
632 "provider.call.iteration.error",
633 json!({
634 "sessionID": session_id,
635 "messageID": user_message_id,
636 "iteration": iteration,
637 "error": detail,
638 }),
639 ));
640 self.mark_session_run_failed(&session_id, &err.to_string())
641 .await;
642 return Err(err);
643 }
644 };
645 tokio::pin!(stream);
646 loop {
647 let next_chunk_result =
648 tokio::time::timeout(provider_idle_timeout, stream.next())
649 .await
650 .map_err(|_| {
651 anyhow::anyhow!(
652 "provider stream idle timeout after {} ms",
653 provider_idle_timeout.as_millis()
654 )
655 });
656 let next_chunk = match next_chunk_result {
657 Ok(next_chunk) => next_chunk,
658 Err(err) => {
659 let error_text = err.to_string();
660 self.event_bus.publish(EngineEvent::new(
661 "provider.call.iteration.error",
662 json!({
663 "sessionID": session_id,
664 "messageID": user_message_id,
665 "iteration": iteration,
666 "error": truncate_text(&error_text, 500),
667 }),
668 ));
669 self.mark_session_run_failed(&session_id, &error_text).await;
670 return Err(err);
671 }
672 };
673 let Some(chunk) = next_chunk else {
674 break 'provider_stream_attempt;
675 };
676 let chunk = match chunk {
677 Ok(chunk) => chunk,
678 Err(err) => {
679 let error_text = err.to_string();
680 let stream_error_text =
681 format!("provider stream chunk error: {error_text}");
682 if is_transient_provider_stream_error(&stream_error_text)
683 && provider_stream_retry_count < provider_stream_retry_budget
684 {
685 provider_stream_retry_count =
686 provider_stream_retry_count.saturating_add(1);
687 let detail = truncate_text(&stream_error_text, 500);
688 self.event_bus.publish(EngineEvent::new(
689 "provider.call.iteration.retry",
690 json!({
691 "sessionID": session_id,
692 "messageID": user_message_id,
693 "providerID": provider_id,
694 "modelID": model_id_value,
695 "iteration": iteration,
696 "error": detail,
697 "retry": provider_stream_retry_count,
698 "maxRetries": provider_stream_retry_budget,
699 }),
700 ));
701 continue 'provider_stream_attempt;
702 }
703 let error_code = provider_error_code(&stream_error_text);
704 let detail = truncate_text(&stream_error_text, 500);
705 emit_event(
706 Level::ERROR,
707 ProcessKind::Engine,
708 ObservabilityEvent {
709 event: "provider.call.error",
710 component: "engine.loop",
711 correlation_id: correlation_ref,
712 session_id: Some(&session_id),
713 run_id: None,
714 message_id: Some(&user_message_id),
715 provider_id: Some(provider_id.as_str()),
716 model_id,
717 status: Some("failed"),
718 error_code: Some(error_code),
719 detail: Some(&detail),
720 },
721 );
722 self.event_bus.publish(EngineEvent::new(
723 "provider.call.iteration.error",
724 json!({
725 "sessionID": session_id,
726 "messageID": user_message_id,
727 "iteration": iteration,
728 "error": detail,
729 }),
730 ));
731 let err = anyhow::anyhow!("{stream_error_text}");
732 self.mark_session_run_failed(&session_id, &err.to_string())
733 .await;
734 return Err(err);
735 }
736 };
737 match chunk {
738 StreamChunk::TextDelta(delta) => {
739 let delta = strip_model_control_markers(&delta);
740 if delta.trim().is_empty() {
741 continue;
742 }
743 if completion.is_empty() {
744 emit_event(
745 Level::INFO,
746 ProcessKind::Engine,
747 ObservabilityEvent {
748 event: "provider.call.first_byte",
749 component: "engine.loop",
750 correlation_id: correlation_ref,
751 session_id: Some(&session_id),
752 run_id: None,
753 message_id: Some(&user_message_id),
754 provider_id: Some(provider_id.as_str()),
755 model_id,
756 status: Some("streaming"),
757 error_code: None,
758 detail: Some("first text delta"),
759 },
760 );
761 }
762 completion.push_str(&delta);
763 let delta = truncate_text(&delta, 4_000);
764 let delta_part = WireMessagePart::text(
765 &session_id,
766 &user_message_id,
767 delta.clone(),
768 );
769 self.event_bus.publish(EngineEvent::new(
770 "message.part.updated",
771 json!({"part": delta_part, "delta": delta}),
772 ));
773 }
774 StreamChunk::ReasoningDelta(_reasoning) => {}
775 StreamChunk::Done {
776 finish_reason: _,
777 usage,
778 } => {
779 if usage.is_some() {
780 provider_usage = usage;
781 }
782 break 'provider_stream_attempt;
783 }
784 StreamChunk::ToolCallStart { id, name } => {
785 let entry = streamed_tool_calls.entry(id).or_default();
786 if entry.name.is_empty() {
787 entry.name = name;
788 }
789 }
790 StreamChunk::ToolCallDelta { id, args_delta } => {
791 let entry = streamed_tool_calls.entry(id.clone()).or_default();
792 entry.args.push_str(&args_delta);
793 let tool_name = if entry.name.trim().is_empty() {
794 "tool".to_string()
795 } else {
796 normalize_tool_name(&entry.name)
797 };
798 let parsed_preview = if entry.name.trim().is_empty() {
799 Value::String(truncate_text(&entry.args, 1_000))
800 } else {
801 parse_streamed_tool_args(&tool_name, &entry.args)
802 };
803 let mut tool_part = WireMessagePart::tool_invocation(
804 &session_id,
805 &user_message_id,
806 tool_name.clone(),
807 parsed_preview.clone(),
808 );
809 tool_part.id = Some(id.clone());
810 if tool_name == "write" {
811 tracing::info!(
812 session_id = %session_id,
813 message_id = %user_message_id,
814 tool_call_id = %id,
815 args_delta_len = args_delta.len(),
816 accumulated_args_len = entry.args.len(),
817 parsed_preview_empty = parsed_preview.is_null()
818 || parsed_preview.as_object().is_some_and(|value| value.is_empty())
819 || parsed_preview
820 .as_str()
821 .map(|value| value.trim().is_empty())
822 .unwrap_or(false),
823 "streamed write tool args delta received"
824 );
825 }
826 self.event_bus.publish(EngineEvent::new(
827 "message.part.updated",
828 json!({
829 "part": tool_part,
830 "toolCallDelta": {
831 "id": id,
832 "tool": tool_name,
833 "argsDelta": truncate_text(&args_delta, 1_000),
834 "rawArgsPreview": truncate_text(&entry.args, 2_000),
835 "parsedArgsPreview": parsed_preview
836 }
837 }),
838 ));
839 }
840 StreamChunk::ToolCallEnd { id: _ } => {}
841 }
842 if cancel.is_cancelled() {
843 break 'provider_stream_attempt;
844 }
845 }
846 }
847
848 let streamed_tool_call_count = streamed_tool_calls.len();
849 let streamed_tool_call_parse_failed = streamed_tool_calls
850 .values()
851 .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
852 let mut tool_calls = streamed_tool_calls
853 .into_iter()
854 .filter_map(|(call_id, call)| {
855 if call.name.trim().is_empty() {
856 return None;
857 }
858 let tool_name = normalize_tool_name(&call.name);
859 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
860 Some(ParsedToolCall {
861 tool: tool_name,
862 args: parsed_args,
863 call_id: Some(call_id),
864 })
865 })
866 .collect::<Vec<_>>();
867 if tool_calls.is_empty() {
868 tool_calls = parse_tool_invocations_from_response(&completion)
869 .into_iter()
870 .map(|(tool, args)| ParsedToolCall {
871 tool,
872 args,
873 call_id: None,
874 })
875 .collect::<Vec<_>>();
876 }
877 let provider_tool_parse_failed = tool_calls.is_empty()
878 && (streamed_tool_call_parse_failed
879 || (streamed_tool_call_count > 0
880 && looks_like_unparsed_tool_payload(&completion))
881 || looks_like_unparsed_tool_payload(&completion));
882 if provider_tool_parse_failed {
883 latest_required_tool_failure_kind =
884 RequiredToolFailureKind::ToolCallParseFailed;
885 } else if tool_calls.is_empty() {
886 latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
887 }
888 if router_enabled
889 && matches!(requested_tool_mode, ToolMode::Auto)
890 && !auto_tools_escalated
891 && iteration == 1
892 && should_escalate_auto_tools(intent, &text, &completion)
893 {
894 auto_tools_escalated = true;
895 followup_context = Some(
896 "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
897 .to_string(),
898 );
899 self.event_bus.publish(EngineEvent::new(
900 "provider.call.iteration.finish",
901 json!({
902 "sessionID": session_id,
903 "messageID": user_message_id,
904 "iteration": iteration,
905 "finishReason": "auto_escalate",
906 "acceptedToolCalls": accepted_tool_calls_in_cycle,
907 "rejectedToolCalls": 0,
908 }),
909 ));
910 continue;
911 }
912 if tool_calls.is_empty()
913 && !auto_workspace_probe_attempted
914 && should_force_workspace_probe(&text, &completion)
915 && allowed_tool_names.contains("glob")
916 {
917 auto_workspace_probe_attempted = true;
918 tool_calls = vec![ParsedToolCall {
919 tool: "glob".to_string(),
920 args: json!({ "pattern": "*" }),
921 call_id: None,
922 }];
923 }
924 if !tool_calls.is_empty() {
925 let saw_tool_call_candidate = true;
926 let mut outputs = Vec::new();
927 let mut executed_productive_tool = false;
928 let mut write_tool_attempted_in_cycle = false;
929 let mut auth_required_hit_in_cycle = false;
930 let mut guard_budget_hit_in_cycle = false;
931 let mut duplicate_signature_hit_in_cycle = false;
932 let mut rejected_tool_call_in_cycle = false;
933 for ParsedToolCall {
934 tool,
935 args,
936 call_id,
937 } in tool_calls
938 {
939 if !agent_can_use_tool(&active_agent, &tool) {
940 rejected_tool_call_in_cycle = true;
941 continue;
942 }
943 let tool_key = normalize_tool_name(&tool);
944 if is_workspace_write_tool(&tool_key) {
945 write_tool_attempted_in_cycle = true;
946 }
947 if !allowed_tool_names.contains(&tool_key) {
948 rejected_tool_call_in_cycle = true;
949 let note = if offered_tool_preview.is_empty() {
950 format!(
951 "Tool `{}` call skipped: it is not available in this turn.",
952 tool_key
953 )
954 } else {
955 format!(
956 "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
957 tool_key, offered_tool_preview
958 )
959 };
960 self.event_bus.publish(EngineEvent::new(
961 "tool.call.rejected_unoffered",
962 json!({
963 "sessionID": session_id,
964 "messageID": user_message_id,
965 "iteration": iteration,
966 "tool": tool_key,
967 "offeredToolCount": allowed_tool_names.len()
968 }),
969 ));
970 if tool_name_looks_like_email_action(&tool_key) {
971 latest_email_action_note = Some(note.clone());
972 }
973 outputs.push(note);
974 continue;
975 }
976 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
977 if blocked_mcp_servers.contains(server) {
978 rejected_tool_call_in_cycle = true;
979 outputs.push(format!(
980 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
981 tool_key, server
982 ));
983 continue;
984 }
985 }
986 if tool_key == "question" {
987 question_tool_used = true;
988 }
989 if tool_key == "pack_builder" && pack_builder_executed {
990 rejected_tool_call_in_cycle = true;
991 outputs.push(
992 "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
993 .to_string(),
994 );
995 continue;
996 }
997 if websearch_query_blocked && tool_key == "websearch" {
998 rejected_tool_call_in_cycle = true;
999 outputs.push(
1000 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
1001 .to_string(),
1002 );
1003 continue;
1004 }
1005 let mut effective_args = args.clone();
1006 if tool_key == "todo_write" {
1007 effective_args = normalize_todo_write_args(effective_args, &completion);
1008 if is_empty_todo_write_args(&effective_args) {
1009 rejected_tool_call_in_cycle = true;
1010 outputs.push(
1011 "Tool `todo_write` call skipped: empty todo payload."
1012 .to_string(),
1013 );
1014 continue;
1015 }
1016 }
1017 let signature = if tool_key == "batch" {
1018 batch_tool_signature(&args)
1019 .unwrap_or_else(|| tool_signature(&tool_key, &args))
1020 } else {
1021 tool_signature(&tool_key, &args)
1022 };
1023 if is_shell_tool_name(&tool_key)
1024 && shell_mismatch_signatures.contains(&signature)
1025 {
1026 rejected_tool_call_in_cycle = true;
1027 outputs.push(
1028 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
1029 .to_string(),
1030 );
1031 continue;
1032 }
1033 let mut signature_count = 1usize;
1034 if is_read_only_tool(&tool_key)
1035 || (tool_key == "batch" && is_read_only_batch_call(&args))
1036 {
1037 let count = readonly_signature_counts
1038 .entry(signature.clone())
1039 .and_modify(|v| *v = v.saturating_add(1))
1040 .or_insert(1);
1041 signature_count = *count;
1042 if tool_key == "websearch" {
1043 if let Some(limit) = websearch_duplicate_signature_limit {
1044 if *count > limit {
1045 rejected_tool_call_in_cycle = true;
1046 self.event_bus.publish(EngineEvent::new(
1047 "tool.loop_guard.triggered",
1048 json!({
1049 "sessionID": session_id,
1050 "messageID": user_message_id,
1051 "tool": tool_key,
1052 "reason": "duplicate_signature_retry_exhausted",
1053 "duplicateLimit": limit,
1054 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
1055 "loop_guard_triggered": true
1056 }),
1057 ));
1058 outputs.push(
1059 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
1060 .to_string(),
1061 );
1062 continue;
1063 }
1064 }
1065 }
1066 if tool_key != "websearch" && *count > 1 {
1067 rejected_tool_call_in_cycle = true;
1068 if let Some(cached) = readonly_tool_cache.get(&signature) {
1069 outputs.push(cached.clone());
1070 } else {
1071 outputs.push(format!(
1072 "Tool `{}` call skipped: duplicate call signature detected.",
1073 tool_key
1074 ));
1075 }
1076 continue;
1077 }
1078 }
1079 let is_read_only_signature = is_read_only_tool(&tool_key)
1080 || (tool_key == "batch" && is_read_only_batch_call(&args));
1081 if !is_read_only_signature {
1082 let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1083 let seen = mutable_signature_counts
1084 .entry(signature.clone())
1085 .and_modify(|v| *v = v.saturating_add(1))
1086 .or_insert(1);
1087 if *seen > duplicate_limit {
1088 rejected_tool_call_in_cycle = true;
1089 self.event_bus.publish(EngineEvent::new(
1090 "tool.loop_guard.triggered",
1091 json!({
1092 "sessionID": session_id,
1093 "messageID": user_message_id,
1094 "tool": tool_key,
1095 "reason": "duplicate_signature_retry_exhausted",
1096 "signatureHash": stable_hash(&signature),
1097 "duplicateLimit": duplicate_limit,
1098 "loop_guard_triggered": true
1099 }),
1100 ));
1101 outputs.push(format!(
1102 "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1103 tool_key, duplicate_limit
1104 ));
1105 duplicate_signature_hit_in_cycle = true;
1106 continue;
1107 }
1108 }
1109 let budget = tool_budget_for(&tool_key);
1110 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1111 if *entry >= budget {
1112 rejected_tool_call_in_cycle = true;
1113 outputs.push(format!(
1114 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1115 tool_key, budget
1116 ));
1117 guard_budget_hit_in_cycle = true;
1118 continue;
1119 }
1120 let mut finalized_part = WireMessagePart::tool_invocation(
1121 &session_id,
1122 &user_message_id,
1123 tool.clone(),
1124 effective_args.clone(),
1125 );
1126 if let Some(call_id) = call_id.clone() {
1127 finalized_part.id = Some(call_id);
1128 }
1129 finalized_part.state = Some("pending".to_string());
1130 self.event_bus.publish(EngineEvent::new(
1131 "message.part.updated",
1132 json!({"part": finalized_part}),
1133 ));
1134 *entry += 1;
1135 accepted_tool_calls_in_cycle =
1136 accepted_tool_calls_in_cycle.saturating_add(1);
1137 let tool_output_result = self
1138 .execute_tool_with_permission(
1139 &session_id,
1140 &user_message_id,
1141 tool,
1142 effective_args,
1143 call_id,
1144 active_agent.skills.as_deref(),
1145 &text,
1146 requested_write_required,
1147 Some(&completion),
1148 cancel.clone(),
1149 )
1150 .await;
1151 let Some(output) = (match tool_output_result {
1152 Ok(output) => output,
1153 Err(err) => {
1154 self.mark_session_run_failed(&session_id, &err.to_string())
1155 .await;
1156 return Err(err);
1157 }
1158 }) else {
1159 continue;
1160 };
1161 {
1162 let productive = is_productive_tool_output(&tool_key, &output);
1163 if output.contains("WEBSEARCH_QUERY_MISSING") {
1164 websearch_query_blocked = true;
1165 }
1166 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1167 {
1168 shell_mismatch_signatures.insert(signature.clone());
1169 }
1170 if is_read_only_tool(&tool_key)
1171 && tool_key != "websearch"
1172 && signature_count == 1
1173 {
1174 readonly_tool_cache.insert(signature, output.clone());
1175 }
1176 if productive {
1177 productive_tool_calls_total =
1178 productive_tool_calls_total.saturating_add(1);
1179 if is_workspace_write_tool(&tool_key) {
1180 productive_write_tool_calls_total =
1181 productive_write_tool_calls_total.saturating_add(1);
1182 }
1183 if is_workspace_inspection_tool(&tool_key) {
1184 productive_workspace_inspection_total =
1185 productive_workspace_inspection_total.saturating_add(1);
1186 }
1187 if tool_key == "read" {
1188 productive_concrete_read_total =
1189 productive_concrete_read_total.saturating_add(1);
1190 }
1191 if is_web_research_tool(&tool_key) {
1192 productive_web_research_total =
1193 productive_web_research_total.saturating_add(1);
1194 if is_successful_web_research_output(&tool_key, &output) {
1195 successful_web_research_total =
1196 successful_web_research_total.saturating_add(1);
1197 }
1198 }
1199 executed_productive_tool = true;
1200 if tool_key == "pack_builder" {
1201 pack_builder_executed = true;
1202 }
1203 }
1204 if tool_name_looks_like_email_action(&tool_key) {
1205 if productive {
1206 email_action_executed = true;
1207 } else {
1208 latest_email_action_note =
1209 Some(truncate_text(&output, 280).replace('\n', " "));
1210 }
1211 }
1212 if is_auth_required_tool_output(&output) {
1213 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1214 blocked_mcp_servers.insert(server.to_string());
1215 }
1216 auth_required_hit_in_cycle = true;
1217 }
1218 outputs.push(output);
1219 if auth_required_hit_in_cycle {
1220 break;
1221 }
1222 if guard_budget_hit_in_cycle {
1223 break;
1224 }
1225 }
1226 }
1227 if !outputs.is_empty() {
1228 last_tool_outputs = outputs.clone();
1229 if matches!(requested_tool_mode, ToolMode::Required)
1230 && productive_tool_calls_total == 0
1231 {
1232 latest_required_tool_failure_kind = classify_required_tool_failure(
1233 &outputs,
1234 saw_tool_call_candidate,
1235 accepted_tool_calls_in_cycle,
1236 provider_tool_parse_failed,
1237 rejected_tool_call_in_cycle,
1238 );
1239 if requested_write_required
1240 && write_tool_attempted_in_cycle
1241 && productive_write_tool_calls_total == 0
1242 && is_write_invalid_args_failure_kind(
1243 latest_required_tool_failure_kind,
1244 )
1245 {
1246 if required_write_retry_count + 1 < strict_write_retry_max_attempts
1247 {
1248 required_write_retry_count += 1;
1249 required_tool_retry_count += 1;
1250 followup_context = Some(build_write_required_retry_context(
1251 &offered_tool_preview,
1252 latest_required_tool_failure_kind,
1253 &text,
1254 &requested_prewrite_requirements,
1255 productive_workspace_inspection_total > 0,
1256 productive_concrete_read_total > 0,
1257 productive_web_research_total > 0,
1258 successful_web_research_total > 0,
1259 ));
1260 self.event_bus.publish(EngineEvent::new(
1261 "provider.call.iteration.finish",
1262 json!({
1263 "sessionID": session_id,
1264 "messageID": user_message_id,
1265 "iteration": iteration,
1266 "finishReason": "required_write_invalid_retry",
1267 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1268 "rejectedToolCalls": 0,
1269 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1270 }),
1271 ));
1272 continue;
1273 }
1274 }
1275 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1276 || productive_concrete_read_total > 0
1277 || productive_web_research_total > 0
1278 || successful_web_research_total > 0;
1279 if should_retry_nonproductive_required_tool_cycle(
1280 requested_write_required,
1281 write_tool_attempted_in_cycle,
1282 progress_made_in_cycle,
1283 required_tool_retry_count,
1284 ) {
1285 required_tool_retry_count += 1;
1286 followup_context =
1287 Some(build_required_tool_retry_context_for_task(
1288 &offered_tool_preview,
1289 latest_required_tool_failure_kind,
1290 &text,
1291 ));
1292 self.event_bus.publish(EngineEvent::new(
1293 "provider.call.iteration.finish",
1294 json!({
1295 "sessionID": session_id,
1296 "messageID": user_message_id,
1297 "iteration": iteration,
1298 "finishReason": "required_tool_retry",
1299 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1300 "rejectedToolCalls": 0,
1301 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1302 }),
1303 ));
1304 continue;
1305 }
1306 completion = required_tool_mode_unsatisfied_completion(
1307 latest_required_tool_failure_kind,
1308 );
1309 if !required_tool_unsatisfied_emitted {
1310 required_tool_unsatisfied_emitted = true;
1311 self.event_bus.publish(EngineEvent::new(
1312 "tool.mode.required.unsatisfied",
1313 json!({
1314 "sessionID": session_id,
1315 "messageID": user_message_id,
1316 "iteration": iteration,
1317 "selectedToolCount": allowed_tool_names.len(),
1318 "offeredToolsPreview": offered_tool_preview,
1319 "reason": latest_required_tool_failure_kind.code(),
1320 }),
1321 ));
1322 }
1323 self.event_bus.publish(EngineEvent::new(
1324 "provider.call.iteration.finish",
1325 json!({
1326 "sessionID": session_id,
1327 "messageID": user_message_id,
1328 "iteration": iteration,
1329 "finishReason": "required_tool_unsatisfied",
1330 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1331 "rejectedToolCalls": 0,
1332 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1333 }),
1334 ));
1335 break;
1336 }
1337 let prewrite_gate = evaluate_prewrite_gate(
1338 requested_write_required,
1339 &requested_prewrite_requirements,
1340 PrewriteProgress {
1341 productive_write_tool_calls_total,
1342 productive_workspace_inspection_total,
1343 productive_concrete_read_total,
1344 productive_web_research_total,
1345 successful_web_research_total,
1346 required_write_retry_count,
1347 unmet_prewrite_repair_retry_count,
1348 prewrite_gate_waived,
1349 },
1350 );
1351 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1352 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1353 if requested_write_required
1354 && productive_tool_calls_total > 0
1355 && productive_write_tool_calls_total == 0
1356 {
1357 if should_start_prewrite_repair_before_first_write(
1358 requested_prewrite_requirements.repair_on_unmet_requirements,
1359 productive_write_tool_calls_total,
1360 prewrite_satisfied,
1361 code_workflow_requested,
1362 ) {
1363 if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1364 unmet_prewrite_repair_retry_count += 1;
1365 let repair_attempt = unmet_prewrite_repair_retry_count;
1366 let repair_attempts_remaining =
1367 prewrite_repair_budget.saturating_sub(repair_attempt);
1368 followup_context = Some(build_prewrite_repair_retry_context(
1369 &offered_tool_preview,
1370 latest_required_tool_failure_kind,
1371 &text,
1372 &requested_prewrite_requirements,
1373 productive_workspace_inspection_total > 0,
1374 productive_concrete_read_total > 0,
1375 productive_web_research_total > 0,
1376 successful_web_research_total > 0,
1377 ));
1378 self.event_bus.publish(EngineEvent::new(
1379 "provider.call.iteration.finish",
1380 json!({
1381 "sessionID": session_id,
1382 "messageID": user_message_id,
1383 "iteration": iteration,
1384 "finishReason": "prewrite_repair_retry",
1385 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1386 "rejectedToolCalls": 0,
1387 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1388 "repair": prewrite_repair_event_payload(
1389 repair_attempt,
1390 repair_attempts_remaining,
1391 &unmet_prewrite_codes,
1392 false,
1393 ),
1394 }),
1395 ));
1396 continue;
1397 }
1398 if !prewrite_gate_waived {
1399 if prewrite_fail_closed {
1400 let repair_attempt = unmet_prewrite_repair_retry_count;
1401 let repair_attempts_remaining =
1402 prewrite_repair_budget.saturating_sub(repair_attempt);
1403 completion = prewrite_requirements_exhausted_completion(
1404 &unmet_prewrite_codes,
1405 repair_attempt,
1406 repair_attempts_remaining,
1407 );
1408 self.event_bus.publish(EngineEvent::new(
1409 "prewrite.gate.strict_mode.blocked",
1410 json!({
1411 "sessionID": session_id,
1412 "messageID": user_message_id,
1413 "iteration": iteration,
1414 "unmetCodes": unmet_prewrite_codes,
1415 }),
1416 ));
1417 self.event_bus.publish(EngineEvent::new(
1418 "provider.call.iteration.finish",
1419 json!({
1420 "sessionID": session_id,
1421 "messageID": user_message_id,
1422 "iteration": iteration,
1423 "finishReason": "prewrite_requirements_exhausted",
1424 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1425 "rejectedToolCalls": 0,
1426 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1427 "repair": prewrite_repair_event_payload(
1428 repair_attempt,
1429 repair_attempts_remaining,
1430 &unmet_prewrite_codes,
1431 true,
1432 ),
1433 }),
1434 ));
1435 break;
1436 }
1437 prewrite_gate_waived = true;
1438 let repair_attempt = unmet_prewrite_repair_retry_count;
1439 let repair_attempts_remaining =
1440 prewrite_repair_budget.saturating_sub(repair_attempt);
1441 followup_context = Some(build_prewrite_waived_write_context(
1442 &text,
1443 &unmet_prewrite_codes,
1444 ));
1445 self.event_bus.publish(EngineEvent::new(
1446 "prewrite.gate.waived.write_executed",
1447 json!({
1448 "sessionID": session_id,
1449 "messageID": user_message_id,
1450 "unmetCodes": unmet_prewrite_codes,
1451 }),
1452 ));
1453 self.event_bus.publish(EngineEvent::new(
1454 "provider.call.iteration.finish",
1455 json!({
1456 "sessionID": session_id,
1457 "messageID": user_message_id,
1458 "iteration": iteration,
1459 "finishReason": "prewrite_gate_waived",
1460 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1461 "rejectedToolCalls": 0,
1462 "prewriteGateWaived": true,
1463 "repair": prewrite_repair_event_payload(
1464 repair_attempt,
1465 repair_attempts_remaining,
1466 &unmet_prewrite_codes,
1467 true,
1468 ),
1469 }),
1470 ));
1471 continue;
1472 }
1473 }
1474 latest_required_tool_failure_kind =
1475 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1476 if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1477 required_write_retry_count += 1;
1478 followup_context = Some(build_write_required_retry_context(
1479 &offered_tool_preview,
1480 latest_required_tool_failure_kind,
1481 &text,
1482 &requested_prewrite_requirements,
1483 productive_workspace_inspection_total > 0,
1484 productive_concrete_read_total > 0,
1485 productive_web_research_total > 0,
1486 successful_web_research_total > 0,
1487 ));
1488 self.event_bus.publish(EngineEvent::new(
1489 "provider.call.iteration.finish",
1490 json!({
1491 "sessionID": session_id,
1492 "messageID": user_message_id,
1493 "iteration": iteration,
1494 "finishReason": "required_write_retry",
1495 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1496 "rejectedToolCalls": 0,
1497 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1498 }),
1499 ));
1500 continue;
1501 }
1502 completion = required_tool_mode_unsatisfied_completion(
1503 latest_required_tool_failure_kind,
1504 );
1505 if !required_tool_unsatisfied_emitted {
1506 required_tool_unsatisfied_emitted = true;
1507 self.event_bus.publish(EngineEvent::new(
1508 "tool.mode.required.unsatisfied",
1509 json!({
1510 "sessionID": session_id,
1511 "messageID": user_message_id,
1512 "iteration": iteration,
1513 "selectedToolCount": allowed_tool_names.len(),
1514 "offeredToolsPreview": offered_tool_preview,
1515 "reason": latest_required_tool_failure_kind.code(),
1516 }),
1517 ));
1518 }
1519 self.event_bus.publish(EngineEvent::new(
1520 "provider.call.iteration.finish",
1521 json!({
1522 "sessionID": session_id,
1523 "messageID": user_message_id,
1524 "iteration": iteration,
1525 "finishReason": "required_write_unsatisfied",
1526 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1527 "rejectedToolCalls": 0,
1528 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1529 }),
1530 ));
1531 break;
1532 }
1533 if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
1534 if let Some(retry_context) =
1535 build_invalid_tool_args_retry_context_from_outputs(
1536 &outputs,
1537 invalid_tool_args_retry_count,
1538 )
1539 {
1540 invalid_tool_args_retry_count += 1;
1541 followup_context = Some(format!(
1542 "Previous tool call arguments were invalid. {}",
1543 retry_context
1544 ));
1545 self.event_bus.publish(EngineEvent::new(
1546 "provider.call.iteration.finish",
1547 json!({
1548 "sessionID": session_id,
1549 "messageID": user_message_id,
1550 "iteration": iteration,
1551 "finishReason": "invalid_tool_args_retry",
1552 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1553 "rejectedToolCalls": 0,
1554 }),
1555 ));
1556 continue;
1557 }
1558 }
1559 let guard_budget_hit =
1560 outputs.iter().any(|o| is_guard_budget_tool_output(o));
1561 if executed_productive_tool {
1562 let prewrite_gate = evaluate_prewrite_gate(
1563 requested_write_required,
1564 &requested_prewrite_requirements,
1565 PrewriteProgress {
1566 productive_write_tool_calls_total,
1567 productive_workspace_inspection_total,
1568 productive_concrete_read_total,
1569 productive_web_research_total,
1570 successful_web_research_total,
1571 required_write_retry_count,
1572 unmet_prewrite_repair_retry_count,
1573 prewrite_gate_waived,
1574 },
1575 );
1576 let prewrite_satisfied = prewrite_gate.prewrite_satisfied;
1577 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1578 if requested_write_required
1579 && productive_write_tool_calls_total > 0
1580 && requested_prewrite_requirements.repair_on_unmet_requirements
1581 && unmet_prewrite_repair_retry_count < prewrite_repair_budget
1582 && !prewrite_satisfied
1583 {
1584 unmet_prewrite_repair_retry_count += 1;
1585 let repair_attempt = unmet_prewrite_repair_retry_count;
1586 let repair_attempts_remaining =
1587 prewrite_repair_budget.saturating_sub(repair_attempt);
1588 followup_context = Some(build_prewrite_repair_retry_context(
1589 &offered_tool_preview,
1590 latest_required_tool_failure_kind,
1591 &text,
1592 &requested_prewrite_requirements,
1593 productive_workspace_inspection_total > 0,
1594 productive_concrete_read_total > 0,
1595 productive_web_research_total > 0,
1596 successful_web_research_total > 0,
1597 ));
1598 self.event_bus.publish(EngineEvent::new(
1599 "provider.call.iteration.finish",
1600 json!({
1601 "sessionID": session_id,
1602 "messageID": user_message_id,
1603 "iteration": iteration,
1604 "finishReason": "prewrite_repair_retry",
1605 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1606 "rejectedToolCalls": 0,
1607 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1608 "repair": prewrite_repair_event_payload(
1609 repair_attempt,
1610 repair_attempts_remaining,
1611 &unmet_prewrite_codes,
1612 false,
1613 ),
1614 }),
1615 ));
1616 continue;
1617 }
1618 if requested_write_required
1619 && productive_write_tool_calls_total > 0
1620 && requested_prewrite_requirements.repair_on_unmet_requirements
1621 && !prewrite_satisfied
1622 && prewrite_fail_closed
1623 {
1624 let repair_attempt = unmet_prewrite_repair_retry_count;
1625 let repair_attempts_remaining =
1626 prewrite_repair_budget.saturating_sub(repair_attempt);
1627 completion = prewrite_requirements_exhausted_completion(
1628 &unmet_prewrite_codes,
1629 repair_attempt,
1630 repair_attempts_remaining,
1631 );
1632 self.event_bus.publish(EngineEvent::new(
1633 "prewrite.gate.strict_mode.blocked",
1634 json!({
1635 "sessionID": session_id,
1636 "messageID": user_message_id,
1637 "iteration": iteration,
1638 "unmetCodes": unmet_prewrite_codes,
1639 }),
1640 ));
1641 self.event_bus.publish(EngineEvent::new(
1642 "provider.call.iteration.finish",
1643 json!({
1644 "sessionID": session_id,
1645 "messageID": user_message_id,
1646 "iteration": iteration,
1647 "finishReason": "prewrite_requirements_exhausted",
1648 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1649 "rejectedToolCalls": 0,
1650 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1651 "repair": prewrite_repair_event_payload(
1652 repair_attempt,
1653 repair_attempts_remaining,
1654 &unmet_prewrite_codes,
1655 true,
1656 ),
1657 }),
1658 ));
1659 break;
1660 }
1661 followup_context = Some(format!(
1662 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1663 summarize_tool_outputs(&outputs)
1664 ));
1665 self.event_bus.publish(EngineEvent::new(
1666 "provider.call.iteration.finish",
1667 json!({
1668 "sessionID": session_id,
1669 "messageID": user_message_id,
1670 "iteration": iteration,
1671 "finishReason": "tool_followup",
1672 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1673 "rejectedToolCalls": 0,
1674 }),
1675 ));
1676 continue;
1677 }
1678 if guard_budget_hit {
1679 completion = summarize_guard_budget_outputs(&outputs)
1680 .unwrap_or_else(|| {
1681 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1682 });
1683 } else if duplicate_signature_hit_in_cycle {
1684 completion = summarize_duplicate_signature_outputs(&outputs)
1685 .unwrap_or_else(|| {
1686 "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1687 });
1688 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1689 completion = summary;
1690 } else {
1691 completion.clear();
1692 }
1693 self.event_bus.publish(EngineEvent::new(
1694 "provider.call.iteration.finish",
1695 json!({
1696 "sessionID": session_id,
1697 "messageID": user_message_id,
1698 "iteration": iteration,
1699 "finishReason": "tool_summary",
1700 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1701 "rejectedToolCalls": 0,
1702 }),
1703 ));
1704 break;
1705 } else if matches!(requested_tool_mode, ToolMode::Required) {
1706 latest_required_tool_failure_kind = classify_required_tool_failure(
1707 &outputs,
1708 saw_tool_call_candidate,
1709 accepted_tool_calls_in_cycle,
1710 provider_tool_parse_failed,
1711 rejected_tool_call_in_cycle,
1712 );
1713 }
1714 }
1715
1716 {
1717 let (prompt_tokens, completion_tokens, total_tokens, usage_source) =
1718 if let Some(usage) = provider_usage {
1719 (
1720 usage.prompt_tokens,
1721 usage.completion_tokens,
1722 usage.total_tokens,
1723 "provider",
1724 )
1725 } else {
1726 let est_prompt = (estimated_prompt_chars / 4) as u64;
1730 let est_completion = (completion.len() / 4) as u64;
1731 tracing::debug!(
1732 session_id = %session_id,
1733 provider_id = %provider_id,
1734 "provider.usage missing from stream; using char-count estimate \
1735 (prompt_chars={estimated_prompt_chars} completion_chars={})",
1736 completion.len()
1737 );
1738 (
1739 est_prompt,
1740 est_completion,
1741 est_prompt.saturating_add(est_completion),
1742 "estimated",
1743 )
1744 };
1745 self.event_bus.publish(EngineEvent::new(
1746 "provider.usage",
1747 json!({
1748 "sessionID": session_id,
1749 "correlationID": correlation_ref,
1750 "messageID": user_message_id,
1751 "promptTokens": prompt_tokens,
1752 "completionTokens": completion_tokens,
1753 "totalTokens": total_tokens,
1754 "usageSource": usage_source,
1755 }),
1756 ));
1757 }
1758
1759 if matches!(requested_tool_mode, ToolMode::Required)
1760 && productive_tool_calls_total == 0
1761 {
1762 if requested_write_required
1763 && required_write_retry_count > 0
1764 && productive_write_tool_calls_total == 0
1765 && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1766 {
1767 latest_required_tool_failure_kind =
1768 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1769 }
1770 if requested_write_required
1771 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1772 {
1773 required_write_retry_count += 1;
1774 followup_context = Some(build_write_required_retry_context(
1775 &offered_tool_preview,
1776 latest_required_tool_failure_kind,
1777 &text,
1778 &requested_prewrite_requirements,
1779 productive_workspace_inspection_total > 0,
1780 productive_concrete_read_total > 0,
1781 productive_web_research_total > 0,
1782 successful_web_research_total > 0,
1783 ));
1784 continue;
1785 }
1786 let progress_made_in_cycle = productive_workspace_inspection_total > 0
1787 || productive_concrete_read_total > 0
1788 || productive_web_research_total > 0
1789 || successful_web_research_total > 0;
1790 if should_retry_nonproductive_required_tool_cycle(
1791 requested_write_required,
1792 false,
1793 progress_made_in_cycle,
1794 required_tool_retry_count,
1795 ) {
1796 required_tool_retry_count += 1;
1797 followup_context = Some(build_required_tool_retry_context_for_task(
1798 &offered_tool_preview,
1799 latest_required_tool_failure_kind,
1800 &text,
1801 ));
1802 continue;
1803 }
1804 completion = required_tool_mode_unsatisfied_completion(
1805 latest_required_tool_failure_kind,
1806 );
1807 if !required_tool_unsatisfied_emitted {
1808 required_tool_unsatisfied_emitted = true;
1809 self.event_bus.publish(EngineEvent::new(
1810 "tool.mode.required.unsatisfied",
1811 json!({
1812 "sessionID": session_id,
1813 "messageID": user_message_id,
1814 "iteration": iteration,
1815 "selectedToolCount": allowed_tool_names.len(),
1816 "offeredToolsPreview": offered_tool_preview,
1817 "reason": latest_required_tool_failure_kind.code(),
1818 }),
1819 ));
1820 }
1821 self.event_bus.publish(EngineEvent::new(
1822 "provider.call.iteration.finish",
1823 json!({
1824 "sessionID": session_id,
1825 "messageID": user_message_id,
1826 "iteration": iteration,
1827 "finishReason": "required_tool_unsatisfied",
1828 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1829 "rejectedToolCalls": 0,
1830 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1831 }),
1832 ));
1833 } else {
1834 if completion.trim().is_empty()
1835 && !last_tool_outputs.is_empty()
1836 && requested_write_required
1837 && empty_completion_retry_count == 0
1838 {
1839 empty_completion_retry_count += 1;
1840 followup_context = Some(build_empty_completion_retry_context(
1841 &offered_tool_preview,
1842 &text,
1843 &requested_prewrite_requirements,
1844 productive_workspace_inspection_total > 0,
1845 productive_concrete_read_total > 0,
1846 productive_web_research_total > 0,
1847 successful_web_research_total > 0,
1848 ));
1849 self.event_bus.publish(EngineEvent::new(
1850 "provider.call.iteration.finish",
1851 json!({
1852 "sessionID": session_id,
1853 "messageID": user_message_id,
1854 "iteration": iteration,
1855 "finishReason": "empty_completion_retry",
1856 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1857 "rejectedToolCalls": 0,
1858 }),
1859 ));
1860 continue;
1861 }
1862 let prewrite_gate = evaluate_prewrite_gate(
1863 requested_write_required,
1864 &requested_prewrite_requirements,
1865 PrewriteProgress {
1866 productive_write_tool_calls_total,
1867 productive_workspace_inspection_total,
1868 productive_concrete_read_total,
1869 productive_web_research_total,
1870 successful_web_research_total,
1871 required_write_retry_count,
1872 unmet_prewrite_repair_retry_count,
1873 prewrite_gate_waived,
1874 },
1875 );
1876 if should_start_prewrite_repair_before_first_write(
1877 requested_prewrite_requirements.repair_on_unmet_requirements,
1878 productive_write_tool_calls_total,
1879 prewrite_gate.prewrite_satisfied,
1880 code_workflow_requested,
1881 ) && !prewrite_gate_waived
1882 {
1883 let unmet_prewrite_codes = prewrite_gate.unmet_codes.clone();
1884 if unmet_prewrite_repair_retry_count < prewrite_repair_budget {
1885 unmet_prewrite_repair_retry_count += 1;
1886 let repair_attempt = unmet_prewrite_repair_retry_count;
1887 let repair_attempts_remaining =
1888 prewrite_repair_budget.saturating_sub(repair_attempt);
1889 followup_context = Some(build_prewrite_repair_retry_context(
1890 &offered_tool_preview,
1891 latest_required_tool_failure_kind,
1892 &text,
1893 &requested_prewrite_requirements,
1894 productive_workspace_inspection_total > 0,
1895 productive_concrete_read_total > 0,
1896 productive_web_research_total > 0,
1897 successful_web_research_total > 0,
1898 ));
1899 self.event_bus.publish(EngineEvent::new(
1900 "provider.call.iteration.finish",
1901 json!({
1902 "sessionID": session_id,
1903 "messageID": user_message_id,
1904 "iteration": iteration,
1905 "finishReason": "prewrite_repair_retry",
1906 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1907 "rejectedToolCalls": 0,
1908 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1909 "repair": prewrite_repair_event_payload(
1910 repair_attempt,
1911 repair_attempts_remaining,
1912 &unmet_prewrite_codes,
1913 false,
1914 ),
1915 }),
1916 ));
1917 continue;
1918 }
1919 if prewrite_fail_closed {
1920 let repair_attempt = unmet_prewrite_repair_retry_count;
1921 let repair_attempts_remaining =
1922 prewrite_repair_budget.saturating_sub(repair_attempt);
1923 completion = prewrite_requirements_exhausted_completion(
1924 &unmet_prewrite_codes,
1925 repair_attempt,
1926 repair_attempts_remaining,
1927 );
1928 self.event_bus.publish(EngineEvent::new(
1929 "prewrite.gate.strict_mode.blocked",
1930 json!({
1931 "sessionID": session_id,
1932 "messageID": user_message_id,
1933 "iteration": iteration,
1934 "unmetCodes": unmet_prewrite_codes,
1935 }),
1936 ));
1937 self.event_bus.publish(EngineEvent::new(
1938 "provider.call.iteration.finish",
1939 json!({
1940 "sessionID": session_id,
1941 "messageID": user_message_id,
1942 "iteration": iteration,
1943 "finishReason": "prewrite_requirements_exhausted",
1944 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1945 "rejectedToolCalls": 0,
1946 "requiredToolFailureReason": RequiredToolFailureKind::PrewriteRequirementsExhausted.code(),
1947 "repair": prewrite_repair_event_payload(
1948 repair_attempt,
1949 repair_attempts_remaining,
1950 &unmet_prewrite_codes,
1951 true,
1952 ),
1953 }),
1954 ));
1955 break;
1956 }
1957 prewrite_gate_waived = true;
1958 let repair_attempt = unmet_prewrite_repair_retry_count;
1959 let repair_attempts_remaining =
1960 prewrite_repair_budget.saturating_sub(repair_attempt);
1961 followup_context = Some(build_prewrite_waived_write_context(
1962 &text,
1963 &unmet_prewrite_codes,
1964 ));
1965 self.event_bus.publish(EngineEvent::new(
1966 "prewrite.gate.waived.write_executed",
1967 json!({
1968 "sessionID": session_id,
1969 "messageID": user_message_id,
1970 "unmetCodes": unmet_prewrite_codes,
1971 }),
1972 ));
1973 self.event_bus.publish(EngineEvent::new(
1974 "provider.call.iteration.finish",
1975 json!({
1976 "sessionID": session_id,
1977 "messageID": user_message_id,
1978 "iteration": iteration,
1979 "finishReason": "prewrite_gate_waived",
1980 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1981 "rejectedToolCalls": 0,
1982 "prewriteGateWaived": true,
1983 "repair": prewrite_repair_event_payload(
1984 repair_attempt,
1985 repair_attempts_remaining,
1986 &unmet_prewrite_codes,
1987 true,
1988 ),
1989 }),
1990 ));
1991 continue;
1992 }
1993 if prewrite_gate_waived
1994 && requested_write_required
1995 && productive_write_tool_calls_total == 0
1996 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1997 {
1998 required_write_retry_count += 1;
1999 followup_context = Some(build_write_required_retry_context(
2000 &offered_tool_preview,
2001 latest_required_tool_failure_kind,
2002 &text,
2003 &requested_prewrite_requirements,
2004 productive_workspace_inspection_total > 0,
2005 productive_concrete_read_total > 0,
2006 productive_web_research_total > 0,
2007 successful_web_research_total > 0,
2008 ));
2009 self.event_bus.publish(EngineEvent::new(
2010 "provider.call.iteration.finish",
2011 json!({
2012 "sessionID": session_id,
2013 "messageID": user_message_id,
2014 "iteration": iteration,
2015 "finishReason": "waived_write_retry",
2016 "acceptedToolCalls": accepted_tool_calls_in_cycle,
2017 "rejectedToolCalls": 0,
2018 }),
2019 ));
2020 continue;
2021 }
2022 self.event_bus.publish(EngineEvent::new(
2023 "provider.call.iteration.finish",
2024 json!({
2025 "sessionID": session_id,
2026 "messageID": user_message_id,
2027 "iteration": iteration,
2028 "finishReason": "provider_completion",
2029 "acceptedToolCalls": accepted_tool_calls_in_cycle,
2030 "rejectedToolCalls": 0,
2031 }),
2032 ));
2033 }
2034 break;
2035 }
2036 if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
2037 {
2038 completion =
2039 required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
2040 if !required_tool_unsatisfied_emitted {
2041 self.event_bus.publish(EngineEvent::new(
2042 "tool.mode.required.unsatisfied",
2043 json!({
2044 "sessionID": session_id,
2045 "messageID": user_message_id,
2046 "selectedToolCount": tool_call_counts.len(),
2047 "reason": latest_required_tool_failure_kind.code(),
2048 }),
2049 ));
2050 }
2051 }
2052 if completion.trim().is_empty()
2053 && !last_tool_outputs.is_empty()
2054 && requested_write_required
2055 && productive_write_tool_calls_total > 0
2056 {
2057 let final_prewrite_satisfied = evaluate_prewrite_gate(
2058 requested_write_required,
2059 &requested_prewrite_requirements,
2060 PrewriteProgress {
2061 productive_write_tool_calls_total,
2062 productive_workspace_inspection_total,
2063 productive_concrete_read_total,
2064 productive_web_research_total,
2065 successful_web_research_total,
2066 required_write_retry_count,
2067 unmet_prewrite_repair_retry_count,
2068 prewrite_gate_waived,
2069 },
2070 )
2071 .prewrite_satisfied;
2072 if prewrite_fail_closed && !final_prewrite_satisfied {
2073 let unmet_prewrite_codes = evaluate_prewrite_gate(
2074 requested_write_required,
2075 &requested_prewrite_requirements,
2076 PrewriteProgress {
2077 productive_write_tool_calls_total,
2078 productive_workspace_inspection_total,
2079 productive_concrete_read_total,
2080 productive_web_research_total,
2081 successful_web_research_total,
2082 required_write_retry_count,
2083 unmet_prewrite_repair_retry_count,
2084 prewrite_gate_waived,
2085 },
2086 )
2087 .unmet_codes;
2088 completion = prewrite_requirements_exhausted_completion(
2089 &unmet_prewrite_codes,
2090 unmet_prewrite_repair_retry_count,
2091 prewrite_repair_budget.saturating_sub(unmet_prewrite_repair_retry_count),
2092 );
2093 } else {
2094 completion = synthesize_artifact_write_completion_from_tool_state(
2095 &text,
2096 final_prewrite_satisfied,
2097 prewrite_gate_waived,
2098 );
2099 }
2100 }
2101 if completion.trim().is_empty()
2102 && !last_tool_outputs.is_empty()
2103 && should_generate_post_tool_final_narrative(
2104 requested_tool_mode,
2105 productive_tool_calls_total,
2106 )
2107 {
2108 if let Some(narrative) = self
2109 .generate_final_narrative_without_tools(
2110 &session_id,
2111 &active_agent,
2112 Some(provider_id.as_str()),
2113 Some(model_id_value.as_str()),
2114 cancel.clone(),
2115 &last_tool_outputs,
2116 )
2117 .await
2118 {
2119 completion = narrative;
2120 }
2121 }
2122 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
2123 if let Some(summary) = summarize_auth_pending_outputs(&last_tool_outputs) {
2124 completion = summary;
2125 } else if let Some(hint) =
2126 summarize_terminal_tool_failure_for_user(&last_tool_outputs)
2127 {
2128 completion = hint;
2129 } else {
2130 let preview = summarize_user_visible_tool_outputs(&last_tool_outputs);
2131 if preview.trim().is_empty() {
2132 completion = "I used tools for this request, but I couldn't turn the results into a clean final answer. Please retry with the docs page URL, docs path, or exact search query you want me to use.".to_string();
2133 } else {
2134 completion = format!(
2135 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
2136 preview
2137 );
2138 }
2139 }
2140 }
2141 if completion.trim().is_empty() {
2142 completion =
2143 "I couldn't produce a final response for that run. Please retry your request."
2144 .to_string();
2145 }
2146 if email_delivery_requested && email_tools_ever_offered && !email_action_executed {
2155 let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
2156 .to_string();
2157 if let Some(note) = latest_email_action_note.as_ref() {
2158 fallback.push_str("\n\nLast email tool status: ");
2159 fallback.push_str(note);
2160 }
2161 fallback.push_str(
2162 "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
2163 );
2164 completion = fallback;
2165 }
2166 completion = strip_model_control_markers(&completion);
2167 truncate_text(&completion, 16_000)
2168 };
2169 emit_event(
2170 Level::INFO,
2171 ProcessKind::Engine,
2172 ObservabilityEvent {
2173 event: "provider.call.finish",
2174 component: "engine.loop",
2175 correlation_id: correlation_ref,
2176 session_id: Some(&session_id),
2177 run_id: None,
2178 message_id: Some(&user_message_id),
2179 provider_id: Some(provider_id.as_str()),
2180 model_id,
2181 status: Some("ok"),
2182 error_code: None,
2183 detail: Some("provider stream complete"),
2184 },
2185 );
2186 if active_agent.name.eq_ignore_ascii_case("plan") {
2187 emit_plan_todo_fallback(
2188 self.storage.clone(),
2189 &self.event_bus,
2190 &session_id,
2191 &user_message_id,
2192 &completion,
2193 )
2194 .await;
2195 let todos_after_fallback = self.storage.get_todos(&session_id).await;
2196 if todos_after_fallback.is_empty() && !question_tool_used {
2197 emit_plan_question_fallback(
2198 self.storage.clone(),
2199 &self.event_bus,
2200 &session_id,
2201 &user_message_id,
2202 &completion,
2203 )
2204 .await;
2205 }
2206 }
2207 if cancel.is_cancelled() {
2208 self.event_bus.publish(EngineEvent::new(
2209 "session.status",
2210 json!({"sessionID": session_id, "status":"cancelled"}),
2211 ));
2212 self.cancellations.remove(&session_id).await;
2213 return Ok(());
2214 }
2215 let assistant = Message::new(
2216 MessageRole::Assistant,
2217 vec![MessagePart::Text {
2218 text: completion.clone(),
2219 }],
2220 );
2221 let assistant_message_id = assistant.id.clone();
2222 self.storage.append_message(&session_id, assistant).await?;
2223 let final_part = WireMessagePart::text(
2224 &session_id,
2225 &assistant_message_id,
2226 truncate_text(&completion, 16_000),
2227 );
2228 self.event_bus.publish(EngineEvent::new(
2229 "message.part.updated",
2230 json!({"part": final_part}),
2231 ));
2232 self.event_bus.publish(EngineEvent::new(
2233 "session.updated",
2234 json!({"sessionID": session_id, "status":"idle"}),
2235 ));
2236 self.event_bus.publish(EngineEvent::new(
2237 "session.status",
2238 json!({"sessionID": session_id, "status":"idle"}),
2239 ));
2240 self.cancellations.remove(&session_id).await;
2241 Ok(())
2242 }
2243}