Skip to main content

everruns_core/atoms/
reason.rs

1//! ReasonAtom - Atom for LLM reasoning (model call)
2//!
3//! This atom handles:
4//! 1. Emitting reason.started event
5//! 2. Context preparation (loading message history, adding system message)
6//! 3. Fixing invalid context (e.g., missing tool_results for dangling tool calls)
7//! 4. LLM call with streaming support
8//! 5. Storing the assistant response
9//! 6. Emitting reason.completed event
10//! 7. Returning the result with tool calls (if any)
11//!
12//! NOTES from Python spec:
13//! - Context preparation includes loading message history, adding system message, editing context if needed
14//! - Before LLM call, invalid context (e.g. missing tool_results) should be fixed
15//! - LLM call should emit start/end events
16//! - Failure of the LLM call should be "normal" result, should user message that LLM call failed
17//! - Reason should be cancellable, cancellation should stop LLM call and exit with message
18
19use async_trait::async_trait;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use serde_json::json;
23use std::collections::{BTreeSet, HashMap};
24use std::sync::Arc;
25use std::time::Instant;
26use uuid::Uuid;
27
28use super::{Atom, AtomContext};
29use crate::capabilities::CapabilityRegistry;
30use crate::error::{AgentLoopError, Result};
31use crate::events::{
32    CapabilityUsageData, CapabilityUsageKind, CapabilityUsageRecord, EventContext, EventRequest,
33    LlmCompactionInfo, LlmGenerationData, LlmPromptCacheInfo, LlmRequestOptions, LlmRetryInfo,
34    LlmToolSearchInfo, OutputMessageCompletedData, OutputMessageDeltaData,
35    OutputMessageReplacedData, OutputMessageStartedData, ReasonCompletedData, ReasonItemData,
36    ReasonStartedData, ReasonThinkingCompletedData, ReasonThinkingDeltaData,
37    ReasonThinkingStartedData, TokenUsage, ToolDefinitionSummary,
38};
39use crate::llm_driver_registry::{
40    DriverRegistry, LlmCallConfigBuilder, LlmCompletionMetadata, LlmMessage, LlmMessageContent,
41    LlmMessageRole, LlmStreamEvent, ProviderConfig, ProviderType,
42};
43use crate::llm_retry::is_transient_error_message;
44use crate::message::{Message, MessageRole};
45use crate::message_retriever::MessageRetriever;
46use crate::openresponses_protocol::{
47    CompactInputItem, CompactRequest, compact_output_to_messages, messages_to_compact_input,
48};
49use crate::output_guardrail::{
50    ArmedGuardrail, OutputGuardrailContext, TrippedGuardrail, evaluate_guardrails,
51};
52use crate::runtime_context::{AssembledTurnContext, assemble_turn_context};
53use crate::tool_types::{ToolCall, ToolDefinition};
54use crate::traits::{
55    AgentStore, EventEmitter, HarnessStore, ImageResolver, LlmProviderStore, ModelWithProvider,
56    ResolvedImage, SessionStore,
57};
58use crate::typed_id::{AgentId, HarnessId, MessageId, SessionId};
59use crate::{UserFacingErrorContext, user_facing_error_codes};
60
61// ============================================================================
62// Helper Functions
63// ============================================================================
64
65/// Patch dangling tool calls by adding synthetic "cancelled" results.
66///
67/// This ensures every tool call has a corresponding tool result,
68/// preventing LLM API errors (e.g., OpenAI requires every tool_call to have a result).
69fn patch_dangling_tool_calls(messages: &[Message]) -> Vec<Message> {
70    let mut result = Vec::new();
71
72    for (i, msg) in messages.iter().enumerate() {
73        result.push(msg.clone());
74
75        // After an assistant message with tool calls, add cancelled results for any missing ones
76        if msg.role == MessageRole::Agent && msg.has_tool_calls() {
77            for tc in msg.tool_calls() {
78                // Look for a matching tool result in ALL subsequent messages
79                let has_result = messages[(i + 1)..]
80                    .iter()
81                    .any(|m| m.role == MessageRole::ToolResult && m.tool_call_id() == Some(&tc.id));
82
83                if !has_result {
84                    result.push(Message::tool_result(
85                        &tc.id,
86                        None,
87                        Some(
88                            "cancelled - another message came in before it could be completed"
89                                .to_string(),
90                        ),
91                    ));
92                }
93            }
94        }
95    }
96
97    result
98}
99
100/// Known error placeholder texts emitted by the DLQ handler and user_facing_message().
101/// These add no conversational value and inflate subsequent LLM requests.
102const ERROR_PLACEHOLDER_MESSAGES: &[&str] = &[
103    "I encountered an error while processing your request. Please try again later.",
104    "The AI provider is experiencing issues. Please try again shortly.",
105    "Rate limited by the AI provider. Please wait a moment.",
106    "The conversation has become too long for the model to process. Please start a new session or reduce the context size.",
107    "There is a misconfiguration with the AI provider. Please contact support.",
108];
109
110/// Returns true if the message is an error placeholder that should be stripped
111/// from the conversation history before sending to the LLM.
112fn is_error_placeholder_message(msg: &Message) -> bool {
113    if msg.role != MessageRole::Agent {
114        return false;
115    }
116    // Must have no tool calls (pure text-only error message)
117    if msg.has_tool_calls() {
118        return false;
119    }
120    if let Some(metadata) = &msg.metadata
121        && let Some(serde_json::Value::String(code)) = metadata.get("error_code")
122    {
123        return matches!(
124            code.as_str(),
125            user_facing_error_codes::BUDGET_EXHAUSTED
126                | user_facing_error_codes::BUDGET_PAUSED
127                | user_facing_error_codes::MODEL_UNAVAILABLE
128                | user_facing_error_codes::REQUEST_TOO_LARGE
129                | user_facing_error_codes::PROVIDER_RATE_LIMITED
130                | user_facing_error_codes::PROVIDER_MISCONFIGURED
131                | user_facing_error_codes::PROVIDER_UNAVAILABLE
132                | user_facing_error_codes::DEPENDENCY_UNAVAILABLE
133                | user_facing_error_codes::PROCESSING_ERROR
134        );
135    }
136    let text = msg.text().unwrap_or("");
137    ERROR_PLACEHOLDER_MESSAGES.contains(&text) || is_dynamic_error_placeholder(text)
138}
139
140fn is_dynamic_error_placeholder(text: &str) -> bool {
141    (text.starts_with("Budget exhausted.") && text.ends_with("Increase the budget to continue."))
142        || (text.starts_with("Budget paused.")
143            && text.ends_with("Increase or resume the budget to continue."))
144        || (text.starts_with("Budget paused with ")
145            && text.ends_with("Increase or resume the budget to continue."))
146        || (text.starts_with("Soft limit reached.") && text.ends_with("soft limit."))
147        || (text.starts_with("The model `") && text.ends_with("Please select a different model."))
148}
149
150// ============================================================================
151// Input and Output Types
152// ============================================================================
153
154/// Input for ReasonAtom
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ReasonInput {
157    /// Atom execution context
158    pub context: AtomContext,
159    /// Harness ID for loading base configuration
160    pub harness_id: HarnessId,
161    /// Agent ID for loading configuration (optional)
162    #[serde(skip_serializing_if = "Option::is_none")]
163    pub agent_id: Option<AgentId>,
164    /// Organization ID for multi-tenancy tracking
165    #[serde(default)]
166    pub org_id: i64,
167    /// MCP tool definitions from agent's MCP capabilities (pre-resolved)
168    /// These are passed from the control-plane since MCP capabilities
169    /// are not in the CapabilityRegistry.
170    #[serde(default)]
171    pub mcp_tool_definitions: Vec<ToolDefinition>,
172    /// Previous LLM response ID for stateful continuation.
173    /// Enables server-side context caching across reason iterations.
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub previous_response_id: Option<String>,
176    /// Current iteration number within this turn (1-based).
177    /// Used for output.message.started events so UI can show progress.
178    #[serde(default = "default_iteration")]
179    pub iteration: u32,
180}
181
182fn default_iteration() -> u32 {
183    1
184}
185
186/// Result of the ReasonAtom
187#[derive(Debug, Clone, Default, Serialize, Deserialize)]
188pub struct ReasonResult {
189    /// Whether the LLM call succeeded
190    pub success: bool,
191    /// Text response from the model
192    pub text: String,
193    /// Tool calls requested by the model
194    #[serde(default)]
195    pub tool_calls: Vec<ToolCall>,
196    /// Whether tool execution is needed
197    pub has_tool_calls: bool,
198    /// Tool definitions from applied capabilities (for tool execution)
199    #[serde(default)]
200    pub tool_definitions: Vec<ToolDefinition>,
201    /// Maximum iterations configured for the agent
202    #[serde(default = "default_max_iterations")]
203    pub max_iterations: usize,
204    /// Error message if the call failed
205    #[serde(skip_serializing_if = "Option::is_none")]
206    pub error: Option<String>,
207    /// Token usage from the LLM call
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub usage: Option<TokenUsage>,
210    /// Assistant message emitted by `output.message.completed` for this generation.
211    #[serde(skip_serializing_if = "Option::is_none")]
212    pub output_message_id: Option<MessageId>,
213    /// Streaming latency for this LLM call, when available.
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub time_to_first_token_ms: Option<u64>,
216    /// LLM provider's response ID for chaining with `previous_response_id`
217    #[serde(skip_serializing_if = "Option::is_none")]
218    pub response_id: Option<String>,
219    /// Resolved locale used for this turn's prompt and backend-authored strings.
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub locale: Option<String>,
222    /// Merged network access list for URL filtering in tools.
223    #[serde(default, skip_serializing_if = "Option::is_none")]
224    pub network_access: Option<crate::network_access::NetworkAccessList>,
225}
226
227fn default_max_iterations() -> usize {
228    500
229}
230
231fn build_request_options(
232    config: &crate::llm_driver_registry::LlmCallConfig,
233    provider: &str,
234) -> Option<LlmRequestOptions> {
235    let prompt_cache = config
236        .prompt_cache
237        .as_ref()
238        .filter(|cfg| cfg.enabled)
239        .map(|cfg| LlmPromptCacheInfo {
240            enabled: true,
241            strategy: cfg.strategy,
242            provider_mode: match provider {
243                "openai" => Some("prompt_cache_key".to_string()),
244                "anthropic" => Some("cache_control".to_string()),
245                "gemini" => Some(
246                    if cfg.gemini_cached_content.is_some() {
247                        "cached_content"
248                    } else {
249                        "implicit"
250                    }
251                    .to_string(),
252                ),
253                _ => None,
254            },
255        });
256
257    let tool_search = config
258        .tool_search
259        .as_ref()
260        .filter(|cfg| cfg.enabled)
261        .map(|cfg| LlmToolSearchInfo {
262            enabled: true,
263            threshold: cfg.threshold,
264        });
265
266    let mut provider_options = HashMap::new();
267    if provider == "openai" && config.previous_response_id.is_some() {
268        provider_options.insert(
269            "openai".to_string(),
270            json!({ "previous_response_id": true }),
271        );
272    }
273    if provider == "gemini"
274        && config
275            .prompt_cache
276            .as_ref()
277            .filter(|cfg| cfg.enabled)
278            .and_then(|cfg| cfg.gemini_cached_content.as_ref())
279            .is_some()
280    {
281        provider_options.insert("gemini".to_string(), json!({ "cached_content": true }));
282    }
283
284    let request_options = LlmRequestOptions {
285        prompt_cache,
286        tool_search,
287        provider_options,
288    };
289
290    (!request_options.is_empty()).then_some(request_options)
291}
292
293fn capability_name_snapshot(registry: &CapabilityRegistry, capability_id: &str) -> Option<String> {
294    registry
295        .get(capability_id)
296        .map(|capability| capability.name().to_string())
297}
298
299fn capability_usage_snapshot_records(
300    registry: &CapabilityRegistry,
301    resolved_capability_configs: &[crate::AgentCapabilityConfig],
302    tool_definitions: &[ToolDefinition],
303) -> Vec<CapabilityUsageRecord> {
304    let mut records = Vec::new();
305    let mut seen = BTreeSet::new();
306
307    for config in resolved_capability_configs {
308        let capability_id = config.capability_id().to_string();
309        if seen.insert((
310            "resolved".to_string(),
311            capability_id.clone(),
312            None::<String>,
313        )) {
314            records.push(CapabilityUsageRecord {
315                capability_name: capability_name_snapshot(registry, &capability_id),
316                capability_id,
317                usage_kind: CapabilityUsageKind::Resolved,
318                tool_name: None,
319                usage_count: Some(1),
320                duration_ms: None,
321            });
322        }
323    }
324
325    for tool in tool_definitions {
326        let Some((capability_id, capability_name)) = tool.capability_attribution() else {
327            continue;
328        };
329        let capability_id = capability_id.to_string();
330        let tool_name = tool.name().to_string();
331        if seen.insert((
332            "exposed".to_string(),
333            capability_id.clone(),
334            Some(tool_name.clone()),
335        )) {
336            records.push(CapabilityUsageRecord {
337                capability_name: capability_name
338                    .map(str::to_string)
339                    .or_else(|| capability_name_snapshot(registry, &capability_id)),
340                capability_id,
341                usage_kind: CapabilityUsageKind::Exposed,
342                tool_name: Some(tool_name),
343                usage_count: Some(1),
344                duration_ms: None,
345            });
346        }
347    }
348
349    records
350}
351
352// ============================================================================
353// ReasonAtom
354// ============================================================================
355
356/// Atom that calls the LLM model for reasoning
357///
358/// This atom:
359/// 1. Emits reason.started event
360/// 2. Retrieves agent and session configuration from stores
361/// 3. Resolves model using priority: controls.model_id > session.model_id > agent.default_model_id
362/// 4. Builds configuration with capabilities applied
363/// 5. Loads messages from the store
364/// 6. Patches dangling tool calls
365/// 7. Resolves image_file content parts to actual image data (if ImageResolver provided)
366/// 8. Calls the LLM with the messages
367/// 9. Stores the assistant response
368/// 10. Emits reason.completed event
369/// 11. Returns the result with tool calls (if any)
370pub struct ReasonAtom {
371    harness_store: Arc<dyn HarnessStore>,
372    agent_store: Arc<dyn AgentStore>,
373    session_store: Arc<dyn SessionStore>,
374    message_retriever: Arc<dyn MessageRetriever>,
375    provider_store: Arc<dyn LlmProviderStore>,
376    capability_registry: CapabilityRegistry,
377    driver_registry: DriverRegistry,
378    event_emitter: Arc<dyn EventEmitter>,
379    /// Optional image resolver for resolving image_file content parts
380    image_resolver: Option<Arc<dyn ImageResolver>>,
381    /// Optional file store for capabilities that need filesystem access
382    /// (e.g., agent_instructions reads AGENTS.md, skills_discovery scans for skills)
383    file_store: Option<Arc<dyn crate::traits::SessionFileSystem>>,
384    /// Optional heartbeater for stream-liveness signalling (EVE-531).
385    stream_heartbeater: Option<Arc<dyn crate::traits::StreamHeartbeater>>,
386    /// Optional provider stall timeout (EVE-531). Default: 120s.
387    provider_stall_timeout: Option<std::time::Duration>,
388}
389
390impl ReasonAtom {
391    /// Create a new ReasonAtom
392    #[allow(clippy::too_many_arguments)]
393    pub fn new(
394        harness_store: impl HarnessStore + 'static,
395        agent_store: impl AgentStore + 'static,
396        session_store: impl SessionStore + 'static,
397        message_retriever: impl MessageRetriever + 'static,
398        provider_store: impl LlmProviderStore + 'static,
399        capability_registry: CapabilityRegistry,
400        driver_registry: DriverRegistry,
401        event_emitter: impl EventEmitter + 'static,
402    ) -> Self {
403        Self {
404            harness_store: Arc::new(harness_store),
405            agent_store: Arc::new(agent_store),
406            session_store: Arc::new(session_store),
407            message_retriever: Arc::new(message_retriever),
408            provider_store: Arc::new(provider_store),
409            capability_registry,
410            driver_registry,
411            event_emitter: Arc::new(event_emitter),
412            image_resolver: None,
413            file_store: None,
414            stream_heartbeater: None,
415            provider_stall_timeout: None,
416        }
417    }
418
419    /// Set the file store for capabilities that need filesystem access.
420    ///
421    /// Provides filesystem access to capabilities via `SystemPromptContext`.
422    /// Capabilities like `agent_instructions` (reads AGENTS.md) and
423    /// `skills_discovery` (scans for skills) use this to generate dynamic
424    /// system prompt content.
425    pub fn with_file_store(
426        mut self,
427        file_store: Arc<dyn crate::traits::SessionFileSystem>,
428    ) -> Self {
429        self.file_store = Some(file_store);
430        self
431    }
432
433    /// Set the image resolver for resolving image_file content parts
434    ///
435    /// When set, image_file references in messages will be resolved to actual
436    /// image data before being sent to the LLM. This is required for multimodal
437    /// conversations that include image attachments.
438    ///
439    /// # Example
440    ///
441    /// ```ignore
442    /// let resolver = Arc::new(GrpcImageResolver::new(client));
443    /// let atom = ReasonAtom::new(/* ... */).with_image_resolver(resolver);
444    /// ```
445    pub fn with_image_resolver(mut self, resolver: Arc<dyn ImageResolver>) -> Self {
446        self.image_resolver = Some(resolver);
447        self
448    }
449
450    /// Set the stream heartbeater for liveness signalling during LLM streaming.
451    pub fn with_stream_heartbeater(
452        mut self,
453        heartbeater: Arc<dyn crate::traits::StreamHeartbeater>,
454    ) -> Self {
455        self.stream_heartbeater = Some(heartbeater);
456        self
457    }
458
459    /// Set the provider stall timeout. If no token arrives within this window,
460    /// the stream is aborted and the activity fails with a retryable error.
461    pub fn with_provider_stall_timeout(mut self, timeout: std::time::Duration) -> Self {
462        self.provider_stall_timeout = Some(timeout);
463        self
464    }
465}
466
467#[async_trait]
468impl Atom for ReasonAtom {
469    type Input = ReasonInput;
470    type Output = ReasonResult;
471
472    fn name(&self) -> &'static str {
473        "reason"
474    }
475
476    async fn execute(&self, input: Self::Input) -> Result<Self::Output> {
477        self.execute_inner(input, None).await
478    }
479}
480
481impl ReasonAtom {
482    /// Execute using a pre-assembled turn context.
483    ///
484    /// Hosts that already assembled turn context for the current reason phase can
485    /// pass it through here to avoid reloading messages and rebuilding the agent.
486    pub async fn execute_with_assembled_context(
487        &self,
488        input: ReasonInput,
489        assembled: AssembledTurnContext,
490    ) -> Result<ReasonResult> {
491        self.execute_inner(input, Some(assembled)).await
492    }
493
494    async fn emit_capability_usage_snapshot(
495        &self,
496        session_id: SessionId,
497        context: &AtomContext,
498        resolved_capability_configs: &[crate::AgentCapabilityConfig],
499        tool_definitions: &[ToolDefinition],
500    ) {
501        let records = capability_usage_snapshot_records(
502            &self.capability_registry,
503            resolved_capability_configs,
504            tool_definitions,
505        );
506        if records.is_empty() {
507            return;
508        }
509
510        if let Err(error) = self
511            .event_emitter
512            .emit(EventRequest::new(
513                session_id,
514                EventContext::from_atom_context(context),
515                CapabilityUsageData { records },
516            ))
517            .await
518        {
519            tracing::warn!(
520                session_id = %session_id,
521                error = %error,
522                "ReasonAtom: failed to emit capability.usage event"
523            );
524        }
525    }
526
527    async fn execute_inner(
528        &self,
529        input: ReasonInput,
530        assembled: Option<AssembledTurnContext>,
531    ) -> Result<ReasonResult> {
532        let ReasonInput {
533            context,
534            harness_id,
535            agent_id,
536            org_id,
537            mcp_tool_definitions,
538            previous_response_id,
539            iteration,
540        } = input;
541
542        tracing::info!(
543            session_id = %context.session_id,
544            turn_id = %context.turn_id,
545            exec_id = %context.exec_id,
546            harness_id = %harness_id,
547            agent_id = ?agent_id,
548            mcp_tools_count = %mcp_tool_definitions.len(),
549            "ReasonAtom: starting LLM call"
550        );
551
552        // Generate OTel-style span IDs for hierarchical tracing
553        // trace_id: groups all events in this turn
554        // span_id: unique identifier for this reason span (shared by started/completed)
555        // parent_span_id: links to turn as parent
556        //
557        // NOTE: TurnId::to_string() returns prefixed format (e.g., "turn_abc123")
558        // matching the format used by turn.started/completed events in Braintrust.
559        let trace_id = context.turn_id.to_string();
560        let reason_span_id = Uuid::now_v7().to_string();
561        let parent_span_id = trace_id.clone(); // Parent is the turn
562
563        // Create event context from atom context with span info
564        let event_context = EventContext::from_atom_context(&context).with_span(
565            trace_id.clone(),
566            reason_span_id.clone(),
567            Some(parent_span_id.clone()),
568        );
569
570        // Track reason phase timing for Braintrust observability
571        let reason_start = Instant::now();
572
573        // Emit reason.started event
574        if let Err(e) = self
575            .event_emitter
576            .emit(EventRequest::new(
577                context.session_id,
578                event_context.clone(),
579                ReasonStartedData {
580                    harness_id,
581                    agent_id,
582                    metadata: None, // Will be populated after model resolution
583                },
584            ))
585            .await
586        {
587            tracing::warn!(
588                session_id = %context.session_id,
589                error = %e,
590                "ReasonAtom: failed to emit reason.started event"
591            );
592        }
593
594        // Execute the LLM call and handle errors gracefully
595        let result = match self
596            .execute_llm_call(
597                context.session_id,
598                harness_id,
599                agent_id,
600                org_id,
601                &context,
602                &mcp_tool_definitions,
603                &trace_id,
604                &reason_span_id,
605                previous_response_id,
606                iteration,
607                assembled,
608            )
609            .await
610        {
611            Ok(result) => {
612                // Calculate reason phase duration
613                let reason_duration_ms = reason_start.elapsed().as_millis() as u64;
614
615                // Emit reason.completed event (same span as reason.started, parent is turn)
616                let completed_context = EventContext::from_atom_context(&context).with_span(
617                    trace_id.clone(),
618                    reason_span_id.clone(), // Same span_id as started
619                    Some(parent_span_id.clone()),
620                );
621                if let Err(e) = self
622                    .event_emitter
623                    .emit(EventRequest::new(
624                        context.session_id,
625                        completed_context,
626                        ReasonCompletedData::success(
627                            &result.text,
628                            result.has_tool_calls,
629                            result.tool_calls.len() as u32,
630                            Some(reason_duration_ms),
631                            result.usage.clone(),
632                        ),
633                    ))
634                    .await
635                {
636                    tracing::warn!(
637                        session_id = %context.session_id,
638                        error = %e,
639                        "ReasonAtom: failed to emit reason.completed event"
640                    );
641                }
642                result
643            }
644            Err(e) => {
645                // Calculate reason phase duration even for failures
646                let reason_duration_ms = reason_start.elapsed().as_millis() as u64;
647
648                // LLM call failure is a "normal" result per the spec
649                // Return a result indicating failure with the error message
650                tracing::warn!(
651                    session_id = %context.session_id,
652                    turn_id = %context.turn_id,
653                    error = %e,
654                    "ReasonAtom: LLM call failed"
655                );
656
657                let error_msg = e.to_string();
658                let user_error = e.user_facing_error(UserFacingErrorContext::default());
659                let user_error_text = user_error.fallback_message();
660
661                // Only emit user-facing error events for non-transient errors.
662                // Transient errors (server errors, rate limits, timeouts) will be
663                // retried by the durable task engine. Emitting error events on each
664                // retry attempt causes duplicate error messages in the UI.
665                // The durable worker emits a single error event when all retries
666                // are exhausted (DLQ).
667                let is_transient = is_transient_error_message(&error_msg);
668                let mut output_message_id = None;
669
670                if !is_transient {
671                    // Create error message for the user to see
672                    let mut error_message = Message::assistant(&user_error_text);
673                    let mut metadata = std::collections::HashMap::new();
674                    user_error.apply_to_message_metadata(&mut metadata);
675                    error_message.metadata = Some(metadata);
676
677                    output_message_id = Some(error_message.id);
678
679                    // Emit output.message.completed event (stores message as event with proper turn context)
680                    // output.message.completed is child of reason span
681                    let error_msg_context = EventContext::from_atom_context(&context).with_span(
682                        trace_id.clone(),
683                        Uuid::now_v7().to_string(),   // Own span_id
684                        Some(reason_span_id.clone()), // Parent is reason span
685                    );
686                    if let Err(emit_err) = self
687                        .event_emitter
688                        .emit(EventRequest::new(
689                            context.session_id,
690                            error_msg_context,
691                            OutputMessageCompletedData::new(error_message)
692                                .with_user_facing_error(&user_error),
693                        ))
694                        .await
695                    {
696                        tracing::warn!(
697                            session_id = %context.session_id,
698                            error = %emit_err,
699                            "ReasonAtom: failed to emit output.message.completed event for error"
700                        );
701                    }
702                } else {
703                    tracing::info!(
704                        session_id = %context.session_id,
705                        "ReasonAtom: skipping error event for transient LLM error (will be retried)"
706                    );
707                }
708
709                // Emit reason.completed event for failure (same span as started, parent is turn)
710                let completed_context = EventContext::from_atom_context(&context).with_span(
711                    trace_id.clone(),
712                    reason_span_id.clone(), // Same span_id as started
713                    Some(parent_span_id.clone()),
714                );
715                if let Err(emit_err) = self
716                    .event_emitter
717                    .emit(EventRequest::new(
718                        context.session_id,
719                        completed_context,
720                        ReasonCompletedData::failure(error_msg.clone(), Some(reason_duration_ms)),
721                    ))
722                    .await
723                {
724                    tracing::warn!(
725                        session_id = %context.session_id,
726                        error = %emit_err,
727                        "ReasonAtom: failed to emit reason.completed event"
728                    );
729                }
730
731                ReasonResult {
732                    success: false,
733                    text: user_error_text,
734                    tool_calls: vec![],
735                    has_tool_calls: false,
736                    tool_definitions: vec![],
737                    max_iterations: default_max_iterations(),
738                    error: Some(error_msg),
739                    usage: None,
740                    output_message_id,
741                    time_to_first_token_ms: None,
742                    response_id: None,
743                    locale: None,
744                    network_access: None,
745                }
746            }
747        };
748
749        Ok(result)
750    }
751
752    /// Execute the actual LLM call
753    #[allow(clippy::too_many_arguments)]
754    async fn execute_llm_call(
755        &self,
756        session_id: SessionId,
757        harness_id: HarnessId,
758        agent_id: Option<AgentId>,
759        org_id: i64,
760        context: &AtomContext,
761        mcp_tool_definitions: &[ToolDefinition],
762        trace_id: &str,
763        reason_span_id: &str,
764        previous_response_id: Option<String>,
765        iteration: u32,
766        assembled: Option<AssembledTurnContext>,
767    ) -> Result<ReasonResult> {
768        let assembled = match assembled {
769            Some(assembled) => assembled,
770            None => {
771                assemble_turn_context(
772                    self.harness_store.as_ref(),
773                    self.agent_store.as_ref(),
774                    self.session_store.as_ref(),
775                    self.message_retriever.as_ref(),
776                    self.provider_store.as_ref(),
777                    &self.capability_registry,
778                    session_id,
779                    harness_id,
780                    agent_id,
781                    mcp_tool_definitions,
782                    self.file_store.clone(),
783                )
784                .await?
785            }
786        };
787
788        let messages = assembled.messages;
789        let prior_usage = assembled.session.usage.clone();
790        let model_with_provider = assembled.model_with_provider;
791        let resolved_model_id = assembled.resolved_model_id;
792        let resolved_locale = assembled.resolved_locale;
793        let compaction_config = assembled.compaction_config;
794        let resolved_capability_configs = assembled.resolved_capability_configs;
795        let runtime_agent = assembled.runtime_agent;
796
797        self.emit_capability_usage_snapshot(
798            session_id,
799            context,
800            &resolved_capability_configs,
801            &runtime_agent.tools,
802        )
803        .await;
804
805        // Collect streaming output guardrail providers contributed by enabled
806        // capabilities. Each tuple carries the contributing capability id, a
807        // borrow of that capability's per-agent config (so arming below doesn't
808        // need a second scan), and the provider itself. Capabilities that
809        // contribute no guardrails — the common case — are skipped at zero
810        // allocation cost.
811        let guardrail_providers: Vec<(
812            &str,
813            &serde_json::Value,
814            Arc<dyn crate::output_guardrail::OutputGuardrail>,
815        )> = resolved_capability_configs
816            .iter()
817            .filter_map(|cfg| {
818                let cap_id = cfg.capability_ref.as_str();
819                let cap = self.capability_registry.get(cap_id)?;
820                let guards = cap.output_guardrails();
821                if guards.is_empty() {
822                    return None;
823                }
824                Some(
825                    guards
826                        .into_iter()
827                        .map(move |g| (cap_id, &cfg.config, g))
828                        .collect::<Vec<_>>(),
829                )
830            })
831            .flatten()
832            .collect();
833
834        // 7. Create LLM driver using factory
835        let llm_driver = self.create_llm_driver(&model_with_provider)?;
836
837        // 8. Extract reasoning effort from the last user message's controls,
838        //    but only if the model actually supports reasoning (per its profile).
839        //    This prevents sending unsupported `reasoning` params to non-thinking
840        //    models like gpt-4o-mini, which would cause API errors.
841        let reasoning_effort = messages
842            .iter()
843            .rev()
844            .find(|m| m.role == MessageRole::User)
845            .and_then(|m| m.controls.as_ref())
846            .and_then(|c| c.reasoning.as_ref())
847            .and_then(|r| r.effort.clone())
848            .filter(|effort| {
849                // Skip "none" — it means "don't use reasoning"
850                if effort.eq_ignore_ascii_case("none") {
851                    return false;
852                }
853                // Check model profile; if profile exists and reasoning is false, strip it.
854                // Unknown models (no profile) pass through — let the API decide.
855                let profile = crate::llm_model_profiles::get_model_profile(
856                    &model_with_provider.provider_type,
857                    &model_with_provider.model,
858                );
859                match profile {
860                    Some(p) if !p.reasoning => {
861                        tracing::warn!(
862                            model = %model_with_provider.model,
863                            effort = %effort,
864                            "Stripping reasoning_effort: model does not support reasoning"
865                        );
866                        false
867                    }
868                    _ => true,
869                }
870            });
871
872        // 9. Patch dangling tool calls (add cancelled results for tool calls without responses)
873        let patched_messages = patch_dangling_tool_calls(&messages);
874
875        // 9b. Let enabled capabilities build a prompt-facing model view from
876        // lossless stored messages. Storage remains unchanged.
877        let model_view_providers = crate::capabilities::collect_model_view_providers(
878            &resolved_capability_configs,
879            &self.capability_registry,
880            Some(model_with_provider.model.as_str()),
881        );
882        let model_view_context = crate::capabilities::ModelViewContext {
883            session_id,
884            prior_usage: prior_usage.as_ref(),
885        };
886        let context_messages =
887            model_view_providers.apply_model_view(patched_messages, &model_view_context);
888
889        // 10. Resolve images from image_file references (if any)
890        //
891        // Image resolution converts image_file content parts (which only contain UUIDs)
892        // into actual base64-encoded image data that can be sent to LLMs.
893        let resolved_images = self.resolve_images(&context_messages).await;
894
895        // 11. Build LLM messages
896        let mut llm_messages = Vec::new();
897
898        // Add system prompt
899        let has_system_prompt = !runtime_agent.system_prompt.is_empty();
900        if has_system_prompt {
901            llm_messages.push(LlmMessage {
902                role: LlmMessageRole::System,
903                content: LlmMessageContent::Text(runtime_agent.system_prompt.clone()),
904                tool_calls: None,
905                tool_call_id: None,
906                phase: None,
907                thinking: None,
908                thinking_signature: None,
909            });
910        }
911
912        // Build messages for llm.generation event (includes system message)
913        let messages_for_event: Vec<Message> = if has_system_prompt {
914            std::iter::once(Message::system(&runtime_agent.system_prompt))
915                .chain(context_messages.iter().cloned())
916                .collect()
917        } else {
918            context_messages.clone()
919        };
920
921        // Add conversation messages with resolved images.
922        // For user messages with an external_actor, prefix the first text part
923        // with the actor's display label so the LLM knows who is speaking.
924        // Skip error placeholder messages from prior failed turns — they add
925        // no conversational value and inflate the request.
926        let mut stripped_error_count = 0u32;
927        for msg in &context_messages {
928            if is_error_placeholder_message(msg) {
929                stripped_error_count += 1;
930                continue;
931            }
932            let mut llm_msg = LlmMessage::from_message_with_images(msg, &resolved_images);
933            if msg.role == MessageRole::User
934                && let Some(ref actor) = msg.external_actor
935            {
936                llm_msg.prepend_text_prefix(&format!("[{}] ", actor.display_label()));
937            }
938            llm_messages.push(llm_msg);
939        }
940        if stripped_error_count > 0 {
941            tracing::info!(
942                session_id = %session_id,
943                stripped_error_count,
944                "ReasonAtom: stripped error placeholder messages from LLM input"
945            );
946        }
947
948        // 12. Build LLM call config with reasoning effort and metadata
949        let mut llm_config_builder = LlmCallConfigBuilder::from(&runtime_agent);
950        if let Some(effort) = reasoning_effort.clone() {
951            llm_config_builder = llm_config_builder.reasoning_effort(effort);
952        }
953
954        // Add metadata for API tracking and debugging
955        // These IDs help correlate API requests with Everruns entities
956        // TypedId::to_string() produces prefixed format (e.g., "session_abc123")
957        llm_config_builder = llm_config_builder
958            .with_metadata("session_id", session_id.to_string())
959            .with_metadata("harness_id", harness_id.to_string())
960            .with_metadata("turn_id", context.turn_id.to_string())
961            .with_metadata("exec_id", context.exec_id.to_string())
962            .with_metadata("org_id", format!("org_{:032x}", org_id));
963        if let Some(agent_id) = agent_id {
964            llm_config_builder = llm_config_builder.with_metadata("agent_id", agent_id.to_string());
965        }
966
967        // Add model_id if we have one (not available for system default model)
968        if let Some(model_id) = &resolved_model_id {
969            llm_config_builder = llm_config_builder.with_metadata("model_id", model_id.to_string());
970        }
971
972        let llm_config = llm_config_builder
973            .previous_response_id(previous_response_id.clone())
974            .build();
975
976        tracing::debug!(
977            session_id = %session_id,
978            turn_id = %context.turn_id,
979            model = %runtime_agent.model,
980            message_count = %llm_messages.len(),
981            "ReasonAtom: calling LLM"
982        );
983
984        // 13. Emit output.message.started event BEFORE starting LLM call
985        // This allows UI to show a thinking indicator immediately
986        let streaming_event_context = EventContext::from_atom_context(context);
987
988        // Arm output guardrails for this stream. Each guardrail sees the
989        // assembled system prompt and its own per-capability config (already
990        // borrowed in `guardrail_providers` above, so no second scan over
991        // `resolved_capability_configs`). Guardrails that decline to arm —
992        // e.g. the canary couldn't extract a long-enough sentence — are
993        // skipped, leaving the streaming hot path entirely free of work.
994        let mut armed_guardrails: Vec<ArmedGuardrail> = Vec::new();
995        for (cap_id, cfg, provider) in &guardrail_providers {
996            let ctx = OutputGuardrailContext {
997                system_prompt: &runtime_agent.system_prompt,
998                config: cfg,
999            };
1000            let guardrail_id = provider.id().to_string();
1001            if let Some(run) = provider.arm(&ctx) {
1002                armed_guardrails.push(ArmedGuardrail {
1003                    capability_id: (*cap_id).to_string(),
1004                    guardrail_id,
1005                    run,
1006                });
1007            }
1008        }
1009        let mut tripped: Option<TrippedGuardrail> = None;
1010        tracing::info!(
1011            session_id = %session_id,
1012            turn_id = %context.turn_id,
1013            "ReasonAtom: emitting output.message.started event"
1014        );
1015        if let Err(e) = self
1016            .event_emitter
1017            .emit(EventRequest::new(
1018                session_id,
1019                streaming_event_context.clone(),
1020                OutputMessageStartedData {
1021                    turn_id: context.turn_id,
1022                    model: Some(runtime_agent.model.clone()),
1023                    iteration: Some(iteration),
1024                },
1025            ))
1026            .await
1027        {
1028            tracing::warn!(
1029                session_id = %session_id,
1030                error = %e,
1031                "ReasonAtom: failed to emit output.message.started event"
1032            );
1033        } else {
1034            tracing::info!(
1035                session_id = %session_id,
1036                "ReasonAtom: output.message.started event emitted successfully"
1037            );
1038        }
1039
1040        // Also emit reason.thinking.started if extended thinking is enabled
1041        let thinking_enabled = reasoning_effort.is_some();
1042        if thinking_enabled {
1043            tracing::info!(
1044                session_id = %session_id,
1045                turn_id = %context.turn_id,
1046                "ReasonAtom: emitting reason.thinking.started event"
1047            );
1048            if let Err(e) = self
1049                .event_emitter
1050                .emit(EventRequest::new(
1051                    session_id,
1052                    streaming_event_context.clone(),
1053                    ReasonThinkingStartedData {
1054                        turn_id: context.turn_id,
1055                        model: Some(runtime_agent.model.clone()),
1056                    },
1057                ))
1058                .await
1059            {
1060                tracing::warn!(
1061                    session_id = %session_id,
1062                    error = %e,
1063                    "ReasonAtom: failed to emit reason.thinking.started event"
1064                );
1065            } else {
1066                tracing::info!(
1067                    session_id = %session_id,
1068                    "ReasonAtom: reason.thinking.started event emitted successfully"
1069                );
1070            }
1071        }
1072
1073        // Track LLM call timing
1074        let llm_start = Instant::now();
1075
1076        // Try LLM call with automatic compaction on RequestTooLarge.
1077        // Transient errors (429, 5xx) are retried at the driver level.
1078        // Stream-level errors are not retried here to avoid duplicate user-visible messages.
1079        let mut compaction_info: Option<LlmCompactionInfo> = None;
1080        let mut llm_messages_for_call = llm_messages.clone();
1081
1082        // 13b. Proactive compaction: check token budget BEFORE calling the LLM.
1083        // This avoids the latency of a RequestTooLarge round-trip.
1084        if let Some(ref config) = compaction_config {
1085            let context_window = crate::llm_model_profiles::get_model_profile(
1086                &model_with_provider.provider_type,
1087                &model_with_provider.model,
1088            )
1089            .and_then(|p| p.limits.map(|l| l.context as usize))
1090            .unwrap_or(128_000);
1091
1092            if crate::capabilities::should_compact_proactively(
1093                &llm_messages_for_call,
1094                config,
1095                context_window,
1096            ) {
1097                use crate::capabilities::{
1098                    CompactionStrategy, aggressive_trim, apply_observation_masking,
1099                    estimate_total_tokens,
1100                };
1101                use crate::events::{
1102                    CompactionReason, CompactionStepData, ContextCompactedData,
1103                    ContextCompactingData,
1104                };
1105
1106                let messages_before = llm_messages_for_call.len();
1107                let cascade_start = Instant::now();
1108                let mut strategies_used: Vec<String> = Vec::new();
1109                let mut steps: Vec<CompactionStepData> = Vec::new();
1110
1111                tracing::info!(
1112                    session_id = %session_id,
1113                    strategy = %config.strategy,
1114                    messages = messages_before,
1115                    "ReasonAtom: proactive compaction triggered (budget threshold exceeded)"
1116                );
1117
1118                // Emit context.compacting event
1119                let _ = self
1120                    .event_emitter
1121                    .emit(EventRequest::new(
1122                        session_id,
1123                        streaming_event_context.clone(),
1124                        ContextCompactingData {
1125                            reason: CompactionReason::ProactiveBudget,
1126                            strategy: config.strategy.to_string(),
1127                            messages_before,
1128                        },
1129                    ))
1130                    .await;
1131
1132                let run_masking = matches!(
1133                    config.strategy,
1134                    CompactionStrategy::Auto | CompactionStrategy::ObservationMasking
1135                );
1136
1137                // Step 1: Observation masking (free)
1138                if run_masking {
1139                    let step_start = Instant::now();
1140                    let conversation_msgs = if has_system_prompt {
1141                        &llm_messages_for_call[1..]
1142                    } else {
1143                        &llm_messages_for_call[..]
1144                    };
1145
1146                    let masking_result =
1147                        apply_observation_masking(conversation_msgs, &config.observation_masking);
1148
1149                    if masking_result.masked_count > 0 {
1150                        let mut new_messages = Vec::new();
1151                        if has_system_prompt {
1152                            new_messages.push(llm_messages_for_call[0].clone());
1153                        }
1154                        new_messages.extend(masking_result.messages);
1155                        llm_messages_for_call = new_messages;
1156
1157                        let step_duration = step_start.elapsed().as_millis() as u64;
1158                        strategies_used.push("observation_masking".to_string());
1159                        steps.push(CompactionStepData {
1160                            strategy: "observation_masking".to_string(),
1161                            messages_after: llm_messages_for_call.len(),
1162                            duration_ms: step_duration,
1163                        });
1164                    }
1165                }
1166
1167                // Step 2: If still over budget after masking, apply aggressive trim
1168                let budget_tokens = (context_window as f32 * config.budget_percent) as usize;
1169                if estimate_total_tokens(&llm_messages_for_call) > budget_tokens {
1170                    let step_start = Instant::now();
1171                    llm_messages_for_call =
1172                        aggressive_trim(&llm_messages_for_call, budget_tokens, has_system_prompt);
1173
1174                    let step_duration = step_start.elapsed().as_millis() as u64;
1175                    strategies_used.push("aggressive_trim".to_string());
1176                    steps.push(CompactionStepData {
1177                        strategy: "aggressive_trim".to_string(),
1178                        messages_after: llm_messages_for_call.len(),
1179                        duration_ms: step_duration,
1180                    });
1181                }
1182
1183                let cascade_duration = cascade_start.elapsed().as_millis() as u64;
1184                let messages_after = llm_messages_for_call.len();
1185
1186                if !strategies_used.is_empty() {
1187                    let strategy_used = strategies_used.join("+");
1188
1189                    let _ = self
1190                        .event_emitter
1191                        .emit(EventRequest::new(
1192                            session_id,
1193                            streaming_event_context.clone(),
1194                            ContextCompactedData {
1195                                strategy_used: strategy_used.clone(),
1196                                messages_before,
1197                                messages_after,
1198                                duration_ms: cascade_duration,
1199                                steps,
1200                            },
1201                        ))
1202                        .await;
1203
1204                    tracing::info!(
1205                        session_id = %session_id,
1206                        strategy = %strategy_used,
1207                        messages_before,
1208                        messages_after,
1209                        duration_ms = cascade_duration,
1210                        "ReasonAtom: proactive compaction completed"
1211                    );
1212                }
1213            }
1214        }
1215
1216        // 14. Process stream with batched output.message.delta emissions
1217        // Batch deltas every 100ms to reduce event volume while providing real-time feedback
1218        const DELTA_BATCH_INTERVAL_MS: u64 = 100;
1219        let (
1220            text,
1221            thinking,
1222            thinking_signature,
1223            tool_calls,
1224            completion_metadata,
1225            time_to_first_token_ms,
1226        ) = {
1227            let mut stream = match llm_driver
1228                .chat_completion_stream(llm_messages_for_call.clone(), &llm_config)
1229                .await
1230            {
1231                Ok(stream) => stream,
1232                Err(e) if e.is_request_too_large() => {
1233                    // Context too large — run compaction cascade
1234                    use crate::capabilities::{CompactionStrategy, apply_observation_masking};
1235                    use crate::events::{
1236                        CompactionReason, CompactionStepData, ContextCompactedData,
1237                        ContextCompactingData,
1238                    };
1239
1240                    let Some(config) = compaction_config.clone() else {
1241                        tracing::warn!(
1242                            session_id = %session_id,
1243                            turn_id = %context.turn_id,
1244                            "ReasonAtom: context too large and compaction capability is not enabled"
1245                        );
1246                        return Err(e);
1247                    };
1248                    let messages_before = llm_messages_for_call.len();
1249
1250                    tracing::info!(
1251                        session_id = %session_id,
1252                        turn_id = %context.turn_id,
1253                        strategy = %config.strategy,
1254                        messages = messages_before,
1255                        "ReasonAtom: context too large, attempting compaction"
1256                    );
1257
1258                    // Emit context.compacting event
1259                    let _ = self
1260                        .event_emitter
1261                        .emit(EventRequest::new(
1262                            session_id,
1263                            streaming_event_context.clone(),
1264                            ContextCompactingData {
1265                                reason: CompactionReason::RequestTooLarge,
1266                                strategy: config.strategy.to_string(),
1267                                messages_before,
1268                            },
1269                        ))
1270                        .await;
1271
1272                    let cascade_start = Instant::now();
1273                    let mut steps: Vec<CompactionStepData> = Vec::new();
1274                    let mut strategies_used: Vec<String> = Vec::new();
1275
1276                    // Determine which strategies to run based on config
1277                    let run_masking = matches!(
1278                        config.strategy,
1279                        CompactionStrategy::Auto | CompactionStrategy::ObservationMasking
1280                    );
1281                    let run_native = matches!(
1282                        config.strategy,
1283                        CompactionStrategy::Auto | CompactionStrategy::Native
1284                    ) && llm_driver.supports_compact();
1285                    let run_summarization = matches!(
1286                        config.strategy,
1287                        CompactionStrategy::Auto | CompactionStrategy::Summarization
1288                    );
1289
1290                    // Step 1: Observation masking (free, no LLM call)
1291                    if run_masking {
1292                        let step_start = Instant::now();
1293                        let conversation_msgs = if has_system_prompt {
1294                            &llm_messages_for_call[1..]
1295                        } else {
1296                            &llm_messages_for_call[..]
1297                        };
1298
1299                        let masking_result = apply_observation_masking(
1300                            conversation_msgs,
1301                            &config.observation_masking,
1302                        );
1303
1304                        if masking_result.masked_count > 0 {
1305                            let mut new_messages = Vec::new();
1306                            if has_system_prompt {
1307                                new_messages.push(llm_messages_for_call[0].clone());
1308                            }
1309                            new_messages.extend(masking_result.messages);
1310                            llm_messages_for_call = new_messages;
1311
1312                            let step_duration = step_start.elapsed().as_millis() as u64;
1313                            strategies_used.push("observation_masking".to_string());
1314                            steps.push(CompactionStepData {
1315                                strategy: "observation_masking".to_string(),
1316                                messages_after: llm_messages_for_call.len(),
1317                                duration_ms: step_duration,
1318                            });
1319
1320                            tracing::info!(
1321                                session_id = %session_id,
1322                                masked_count = masking_result.masked_count,
1323                                duration_ms = step_duration,
1324                                "ReasonAtom: observation masking applied"
1325                            );
1326                        }
1327                    }
1328
1329                    // Step 2: Native provider compaction
1330                    if run_native {
1331                        let step_start = Instant::now();
1332                        let messages_to_compact = if has_system_prompt {
1333                            &llm_messages_for_call[1..]
1334                        } else {
1335                            &llm_messages_for_call[..]
1336                        };
1337
1338                        let compact_input = messages_to_compact_input(messages_to_compact);
1339                        let input_count = compact_input.len();
1340
1341                        let compact_request = CompactRequest {
1342                            model: runtime_agent.model.clone(),
1343                            input: compact_input,
1344                            previous_response_id: previous_response_id.clone(),
1345                            instructions: if has_system_prompt {
1346                                Some(runtime_agent.system_prompt.clone())
1347                            } else {
1348                                None
1349                            },
1350                        };
1351
1352                        match llm_driver.compact(compact_request).await {
1353                            Ok(Some(compact_response)) => {
1354                                let (compacted_messages, compaction_items) =
1355                                    compact_output_to_messages(&compact_response.output);
1356
1357                                let input_tokens_after = compact_response
1358                                    .usage
1359                                    .as_ref()
1360                                    .and_then(|u| u.output_tokens);
1361
1362                                compaction_info = Some(LlmCompactionInfo::new(
1363                                    Some(input_count as u32),
1364                                    input_tokens_after,
1365                                    Some(step_start.elapsed().as_millis() as u64),
1366                                ));
1367
1368                                let mut compacted_llm_messages = Vec::new();
1369                                if has_system_prompt {
1370                                    compacted_llm_messages.push(llm_messages_for_call[0].clone());
1371                                }
1372                                compacted_llm_messages.extend(compacted_messages);
1373
1374                                for item in compaction_items {
1375                                    if let CompactInputItem::Compaction { encrypted_content } = item
1376                                    {
1377                                        compacted_llm_messages.push(LlmMessage {
1378                                            role: LlmMessageRole::System,
1379                                            content: LlmMessageContent::Text(format!(
1380                                                "[COMPACTED_CONTEXT:{encrypted_content}]"
1381                                            )),
1382                                            tool_calls: None,
1383                                            tool_call_id: None,
1384                                            phase: None,
1385                                            thinking: None,
1386                                            thinking_signature: None,
1387                                        });
1388                                    }
1389                                }
1390
1391                                llm_messages_for_call = compacted_llm_messages;
1392
1393                                let step_duration = step_start.elapsed().as_millis() as u64;
1394                                strategies_used.push("native".to_string());
1395                                steps.push(CompactionStepData {
1396                                    strategy: "native".to_string(),
1397                                    messages_after: llm_messages_for_call.len(),
1398                                    duration_ms: step_duration,
1399                                });
1400
1401                                tracing::info!(
1402                                    session_id = %session_id,
1403                                    duration_ms = step_duration,
1404                                    messages_after = llm_messages_for_call.len(),
1405                                    "ReasonAtom: native compaction applied"
1406                                );
1407                            }
1408                            Ok(None) | Err(_) => {
1409                                tracing::warn!(
1410                                    session_id = %session_id,
1411                                    "ReasonAtom: native compaction unavailable, continuing cascade"
1412                                );
1413                            }
1414                        }
1415                    }
1416
1417                    // Step 3: Summarization (if configured, and native didn't run or isn't available)
1418                    // Only run if we haven't done native compaction (which already compressed everything)
1419                    if run_summarization && !strategies_used.contains(&"native".to_string()) {
1420                        use crate::capabilities::{
1421                            build_summarization_prompt, build_summary_message,
1422                            format_messages_for_summarization,
1423                        };
1424
1425                        let step_start = Instant::now();
1426                        let conversation_msgs = if has_system_prompt {
1427                            &llm_messages_for_call[1..]
1428                        } else {
1429                            &llm_messages_for_call[..]
1430                        };
1431
1432                        // Keep the last few messages verbatim, summarize the rest
1433                        let keep_recent = 10.min(conversation_msgs.len());
1434                        let to_summarize =
1435                            &conversation_msgs[..conversation_msgs.len() - keep_recent];
1436                        let recent = &conversation_msgs[conversation_msgs.len() - keep_recent..];
1437
1438                        if !to_summarize.is_empty() {
1439                            let summary_prompt = build_summarization_prompt(&config.summarization);
1440                            let messages_text = format_messages_for_summarization(to_summarize);
1441
1442                            // Use the LLM to generate a summary
1443                            let summary_messages = vec![
1444                                LlmMessage {
1445                                    role: LlmMessageRole::System,
1446                                    content: LlmMessageContent::Text(summary_prompt),
1447                                    tool_calls: None,
1448                                    tool_call_id: None,
1449                                    phase: None,
1450                                    thinking: None,
1451                                    thinking_signature: None,
1452                                },
1453                                LlmMessage {
1454                                    role: LlmMessageRole::User,
1455                                    content: LlmMessageContent::Text(messages_text),
1456                                    tool_calls: None,
1457                                    tool_call_id: None,
1458                                    phase: None,
1459                                    thinking: None,
1460                                    thinking_signature: None,
1461                                },
1462                            ];
1463
1464                            let summary_config = crate::llm_driver_registry::LlmCallConfig {
1465                                model: config
1466                                    .summarization
1467                                    .model
1468                                    .clone()
1469                                    .unwrap_or_else(|| runtime_agent.model.clone()),
1470                                temperature: Some(0.0),
1471                                max_tokens: Some(2000),
1472                                tools: vec![],
1473                                reasoning_effort: None,
1474                                metadata: HashMap::new(),
1475                                previous_response_id: None,
1476                                tool_search: None,
1477                                prompt_cache: None,
1478                            };
1479
1480                            match llm_driver
1481                                .chat_completion(summary_messages, &summary_config)
1482                                .await
1483                            {
1484                                Ok(response) => {
1485                                    let summary_text = response.text;
1486                                    let summary_msg = build_summary_message(&summary_text);
1487
1488                                    let mut new_messages = Vec::new();
1489                                    if has_system_prompt {
1490                                        new_messages.push(llm_messages_for_call[0].clone());
1491                                    }
1492                                    new_messages.push(summary_msg);
1493                                    new_messages.extend_from_slice(recent);
1494                                    llm_messages_for_call = new_messages;
1495
1496                                    let step_duration = step_start.elapsed().as_millis() as u64;
1497                                    strategies_used.push("summarization".to_string());
1498                                    steps.push(CompactionStepData {
1499                                        strategy: "summarization".to_string(),
1500                                        messages_after: llm_messages_for_call.len(),
1501                                        duration_ms: step_duration,
1502                                    });
1503
1504                                    tracing::info!(
1505                                        session_id = %session_id,
1506                                        duration_ms = step_duration,
1507                                        messages_after = llm_messages_for_call.len(),
1508                                        "ReasonAtom: summarization applied"
1509                                    );
1510                                }
1511                                Err(e) => {
1512                                    tracing::warn!(
1513                                        session_id = %session_id,
1514                                        error = %e,
1515                                        "ReasonAtom: summarization failed, continuing"
1516                                    );
1517                                }
1518                            }
1519                        }
1520                    }
1521
1522                    // Step 4: Aggressive trim (last resort — drop oldest messages)
1523                    // Only run if previous strategies didn't reduce context enough.
1524                    // Use a generous target (half the estimated original size).
1525                    if strategies_used.is_empty()
1526                        || llm_messages_for_call.len() > messages_before / 2
1527                    {
1528                        use crate::capabilities::aggressive_trim;
1529                        let step_start = Instant::now();
1530                        // Target: keep roughly half the messages by token budget
1531                        let estimated_total =
1532                            crate::capabilities::estimate_total_tokens(&llm_messages_for_call);
1533                        let target = estimated_total / 2;
1534                        let trimmed =
1535                            aggressive_trim(&llm_messages_for_call, target, has_system_prompt);
1536                        if trimmed.len() < llm_messages_for_call.len() {
1537                            llm_messages_for_call = trimmed;
1538                            let step_duration = step_start.elapsed().as_millis() as u64;
1539                            strategies_used.push("aggressive_trim".to_string());
1540                            steps.push(CompactionStepData {
1541                                strategy: "aggressive_trim".to_string(),
1542                                messages_after: llm_messages_for_call.len(),
1543                                duration_ms: step_duration,
1544                            });
1545                            tracing::info!(
1546                                session_id = %session_id,
1547                                messages_after = llm_messages_for_call.len(),
1548                                "ReasonAtom: aggressive trim applied (last resort)"
1549                            );
1550                        }
1551                    }
1552
1553                    let cascade_duration = cascade_start.elapsed().as_millis() as u64;
1554                    let messages_after = llm_messages_for_call.len();
1555
1556                    // Emit context.compacted event
1557                    let strategy_used = if strategies_used.is_empty() {
1558                        "none".to_string()
1559                    } else {
1560                        strategies_used.join("+")
1561                    };
1562
1563                    let _ = self
1564                        .event_emitter
1565                        .emit(EventRequest::new(
1566                            session_id,
1567                            streaming_event_context.clone(),
1568                            ContextCompactedData {
1569                                strategy_used: strategy_used.clone(),
1570                                messages_before,
1571                                messages_after,
1572                                duration_ms: cascade_duration,
1573                                steps,
1574                            },
1575                        ))
1576                        .await;
1577
1578                    tracing::info!(
1579                        session_id = %session_id,
1580                        strategy = %strategy_used,
1581                        messages_before,
1582                        messages_after,
1583                        duration_ms = cascade_duration,
1584                        "ReasonAtom: compaction cascade completed, retrying LLM call"
1585                    );
1586
1587                    llm_driver
1588                        .chat_completion_stream(llm_messages_for_call.clone(), &llm_config)
1589                        .await?
1590                }
1591                Err(e) => return Err(e),
1592            };
1593
1594            let mut text = String::new();
1595            let mut thinking = String::new();
1596            let mut thinking_signature: Option<String> = None;
1597            let mut tool_calls = Vec::new();
1598            let mut completion_metadata: Option<LlmCompletionMetadata> = None;
1599            let mut pending_delta = String::new();
1600            let mut pending_thinking_delta = String::new();
1601            let mut last_delta_emit = Instant::now();
1602            let mut last_thinking_delta_emit = Instant::now();
1603            let mut time_to_first_token_ms: Option<u64> = None;
1604
1605            // EVE-531: stall timeout + keepalive heartbeat for stream-liveness
1606            let stall_timeout = self
1607                .provider_stall_timeout
1608                .unwrap_or(std::time::Duration::from_secs(120));
1609            let mut stall_sleep = Box::pin(tokio::time::sleep(stall_timeout));
1610            let mut keepalive_ticker = tokio::time::interval(std::time::Duration::from_secs(12));
1611            keepalive_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1612            keepalive_ticker.tick().await; // consume immediate first tick
1613            let mut last_stream_heartbeat = Instant::now();
1614            // Tracks the wall-clock time of the last actual token received.
1615            // Updated only on content events; keepalive heartbeats use this
1616            // so the control plane can distinguish "alive/slow" from "making
1617            // progress" without conflating keepalive pings with real tokens.
1618            let mut last_token_at_unix: u64 = std::time::SystemTime::now()
1619                .duration_since(std::time::UNIX_EPOCH)
1620                .unwrap_or_default()
1621                .as_secs();
1622
1623            loop {
1624                let event = tokio::select! {
1625                    biased;
1626                    next = stream.next() => match next {
1627                        Some(e) => e,
1628                        None => break,
1629                    },
1630                    _ = &mut stall_sleep => {
1631                        tracing::warn!(
1632                            session_id = %session_id,
1633                            turn_id = %context.turn_id,
1634                            stall_secs = stall_timeout.as_secs(),
1635                            "ReasonAtom: provider stream stall timeout"
1636                        );
1637                        return Err(AgentLoopError::llm(format!(
1638                            "provider stream stall: no tokens for {}s",
1639                            stall_timeout.as_secs()
1640                        )));
1641                    },
1642                    _ = keepalive_ticker.tick() => {
1643                        if let Some(ref hb) = self.stream_heartbeater {
1644                            hb.heartbeat(crate::traits::StreamProgress {
1645                                accumulated_len: text.len() + thinking.len(),
1646                                last_delta_at: last_token_at_unix,
1647                            })
1648                            .await;
1649                            last_stream_heartbeat = Instant::now();
1650                        }
1651                        continue;
1652                    },
1653                };
1654                // Reset stall deadline on every received stream event
1655                stall_sleep
1656                    .as_mut()
1657                    .reset(tokio::time::Instant::now() + stall_timeout);
1658                match event? {
1659                    LlmStreamEvent::TextDelta(delta) => {
1660                        // Track time-to-first-token on first non-empty delta
1661                        if time_to_first_token_ms.is_none() && !delta.is_empty() {
1662                            let ttft = llm_start.elapsed().as_millis() as u64;
1663                            time_to_first_token_ms = Some(ttft);
1664                            tracing::info!(
1665                                session_id = %session_id,
1666                                time_to_first_token_ms = ttft,
1667                                "ReasonAtom: received first token from LLM"
1668                            );
1669                        }
1670                        text.push_str(&delta);
1671                        pending_delta.push_str(&delta);
1672                        last_token_at_unix = std::time::SystemTime::now()
1673                            .duration_since(std::time::UNIX_EPOCH)
1674                            .unwrap_or_default()
1675                            .as_secs();
1676
1677                        // Run output guardrails on the new accumulated text.
1678                        // Cheap by contract — runs in the streaming hot path.
1679                        // On block: suppress the pending delta (the bad text
1680                        // never reaches the client as a delta), record the
1681                        // trip, and break the loop. The replacement message is
1682                        // emitted below after the streaming block.
1683                        if !armed_guardrails.is_empty()
1684                            && let Some(t) =
1685                                evaluate_guardrails(&mut armed_guardrails, &text, &delta)
1686                        {
1687                            tracing::warn!(
1688                                session_id = %session_id,
1689                                turn_id = %context.turn_id,
1690                                guardrail_capability_id = %t.capability_id,
1691                                guardrail_id = %t.guardrail_id,
1692                                reason_code = %t.block.reason_code,
1693                                "ReasonAtom: output guardrail tripped, replacing assistant message"
1694                            );
1695                            pending_delta.clear();
1696                            tripped = Some(t);
1697                            break;
1698                        }
1699
1700                        // Emit batched delta if interval elapsed
1701                        if last_delta_emit.elapsed().as_millis() as u64 >= DELTA_BATCH_INTERVAL_MS
1702                            && !pending_delta.is_empty()
1703                        {
1704                            if let Err(e) = self
1705                                .event_emitter
1706                                .emit(EventRequest::new(
1707                                    session_id,
1708                                    streaming_event_context.clone(),
1709                                    OutputMessageDeltaData {
1710                                        turn_id: context.turn_id,
1711                                        delta: pending_delta.clone(),
1712                                        accumulated: text.clone(),
1713                                    },
1714                                ))
1715                                .await
1716                            {
1717                                tracing::warn!(
1718                                    session_id = %session_id,
1719                                    error = %e,
1720                                    "ReasonAtom: failed to emit output.message.delta event"
1721                                );
1722                            }
1723                            pending_delta.clear();
1724                            last_delta_emit = Instant::now();
1725                        }
1726                    }
1727                    LlmStreamEvent::ThinkingDelta(delta) => {
1728                        // Accumulate thinking content from extended thinking models
1729                        thinking.push_str(&delta);
1730                        pending_thinking_delta.push_str(&delta);
1731                        last_token_at_unix = std::time::SystemTime::now()
1732                            .duration_since(std::time::UNIX_EPOCH)
1733                            .unwrap_or_default()
1734                            .as_secs();
1735                        tracing::debug!(
1736                            session_id = %session_id,
1737                            delta_len = delta.len(),
1738                            total_thinking_len = thinking.len(),
1739                            "ReasonAtom: received ThinkingDelta from LLM"
1740                        );
1741
1742                        // Emit batched thinking delta if interval elapsed
1743                        if last_thinking_delta_emit.elapsed().as_millis() as u64
1744                            >= DELTA_BATCH_INTERVAL_MS
1745                            && !pending_thinking_delta.is_empty()
1746                        {
1747                            if let Err(e) = self
1748                                .event_emitter
1749                                .emit(EventRequest::new(
1750                                    session_id,
1751                                    streaming_event_context.clone(),
1752                                    ReasonThinkingDeltaData {
1753                                        turn_id: context.turn_id,
1754                                        delta: pending_thinking_delta.clone(),
1755                                        accumulated: thinking.clone(),
1756                                    },
1757                                ))
1758                                .await
1759                            {
1760                                tracing::warn!(
1761                                    session_id = %session_id,
1762                                    error = %e,
1763                                    "ReasonAtom: failed to emit reason.thinking.delta event"
1764                                );
1765                            }
1766                            pending_thinking_delta.clear();
1767                            last_thinking_delta_emit = Instant::now();
1768                        }
1769                    }
1770                    LlmStreamEvent::ThinkingSignature(signature) => {
1771                        // Capture the cryptographic signature for thinking content (required to send it back)
1772                        tracing::debug!(
1773                            session_id = %session_id,
1774                            signature_len = signature.len(),
1775                            "ReasonAtom: received ThinkingSignature from LLM"
1776                        );
1777                        thinking_signature = Some(signature);
1778                    }
1779                    LlmStreamEvent::ReasonItem {
1780                        provider,
1781                        model,
1782                        item_id,
1783                        encrypted_content,
1784                        summary,
1785                        token_count,
1786                    } => {
1787                        // Preserve the opaque artifact as the assistant message's
1788                        // thinking_signature so the next request can replay
1789                        // reasoning context, and emit a durable reason.item event
1790                        // for trace/session review. Plaintext reasoning content is
1791                        // never included.
1792                        if let Some(sig) = encrypted_content.as_ref() {
1793                            tracing::debug!(
1794                                session_id = %session_id,
1795                                signature_len = sig.len(),
1796                                provider = %provider,
1797                                item_id = %item_id,
1798                                "ReasonAtom: captured encrypted reasoning content from ReasonItem"
1799                            );
1800                            thinking_signature = Some(sig.clone());
1801                        }
1802                        if let Err(e) = self
1803                            .event_emitter
1804                            .emit(EventRequest::new(
1805                                session_id,
1806                                streaming_event_context.clone(),
1807                                ReasonItemData {
1808                                    turn_id: context.turn_id,
1809                                    provider,
1810                                    model,
1811                                    item_id,
1812                                    encrypted_content,
1813                                    summary,
1814                                    token_count,
1815                                },
1816                            ))
1817                            .await
1818                        {
1819                            tracing::warn!(
1820                                session_id = %session_id,
1821                                error = %e,
1822                                "ReasonAtom: failed to emit reason.item event"
1823                            );
1824                        }
1825                    }
1826                    LlmStreamEvent::ToolCalls(calls) => {
1827                        tool_calls = calls;
1828                    }
1829                    LlmStreamEvent::Done(metadata) => {
1830                        // Emit any remaining pending delta before completing
1831                        if !pending_delta.is_empty()
1832                            && let Err(e) = self
1833                                .event_emitter
1834                                .emit(EventRequest::new(
1835                                    session_id,
1836                                    streaming_event_context.clone(),
1837                                    OutputMessageDeltaData {
1838                                        turn_id: context.turn_id,
1839                                        delta: pending_delta.clone(),
1840                                        accumulated: text.clone(),
1841                                    },
1842                                ))
1843                                .await
1844                        {
1845                            tracing::warn!(
1846                                session_id = %session_id,
1847                                error = %e,
1848                                "ReasonAtom: failed to emit final output.message.delta event"
1849                            );
1850                        }
1851
1852                        // Emit any remaining pending thinking delta before completing
1853                        if !pending_thinking_delta.is_empty()
1854                            && let Err(e) = self
1855                                .event_emitter
1856                                .emit(EventRequest::new(
1857                                    session_id,
1858                                    streaming_event_context.clone(),
1859                                    ReasonThinkingDeltaData {
1860                                        turn_id: context.turn_id,
1861                                        delta: pending_thinking_delta.clone(),
1862                                        accumulated: thinking.clone(),
1863                                    },
1864                                ))
1865                                .await
1866                        {
1867                            tracing::warn!(
1868                                session_id = %session_id,
1869                                error = %e,
1870                                "ReasonAtom: failed to emit final reason.thinking.delta event"
1871                            );
1872                        }
1873
1874                        // Emit reason.thinking.completed if we had any thinking content
1875                        if !thinking.is_empty()
1876                            && let Err(e) = self
1877                                .event_emitter
1878                                .emit(EventRequest::new(
1879                                    session_id,
1880                                    streaming_event_context.clone(),
1881                                    ReasonThinkingCompletedData {
1882                                        turn_id: context.turn_id,
1883                                        thinking: thinking.clone(),
1884                                    },
1885                                ))
1886                                .await
1887                        {
1888                            tracing::warn!(
1889                                session_id = %session_id,
1890                                error = %e,
1891                                "ReasonAtom: failed to emit reason.thinking.completed event"
1892                            );
1893                        }
1894                        completion_metadata = Some(*metadata);
1895                        break;
1896                    }
1897                    LlmStreamEvent::Error(err) => {
1898                        // If we already collected valid tool calls or text before
1899                        // the error arrived, treat it as a partial success. This
1900                        // handles OpenAI Responses API behaviour where a trailing
1901                        // server_error can follow fully-streamed function calls.
1902                        let has_partial_output = !tool_calls.is_empty() || !text.is_empty();
1903
1904                        if has_partial_output {
1905                            tracing::warn!(
1906                                session_id = %session_id,
1907                                error = %err,
1908                                tool_call_count = tool_calls.len(),
1909                                text_len = text.len(),
1910                                "ReasonAtom: trailing stream error after valid output — treating as partial success"
1911                            );
1912                            // Break out of the stream loop and use the output
1913                            // we already collected. completion_metadata will be
1914                            // None since we never got a Done event.
1915                            break;
1916                        }
1917
1918                        // No useful output collected — treat as a real failure.
1919                        let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
1920                        let event_context = EventContext::from_atom_context(context).with_span(
1921                            trace_id.to_string(),
1922                            Uuid::now_v7().to_string(),
1923                            Some(reason_span_id.to_string()),
1924                        );
1925                        let tools_summary: Vec<ToolDefinitionSummary> =
1926                            runtime_agent.tools.iter().map(|t| t.into()).collect();
1927                        let generation_data = LlmGenerationData::failure(
1928                            messages_for_event.clone(),
1929                            tools_summary,
1930                            runtime_agent.model.clone(),
1931                            Some(model_with_provider.provider_type.to_string()),
1932                            err.clone(),
1933                            Some(llm_duration_ms),
1934                            time_to_first_token_ms,
1935                        );
1936                        let _ = self
1937                            .event_emitter
1938                            .emit(EventRequest::new(
1939                                session_id,
1940                                event_context,
1941                                generation_data,
1942                            ))
1943                            .await;
1944                        return Err(AgentLoopError::llm(err));
1945                    }
1946                }
1947                // Per-event heartbeat after processing the event, so accumulated_len
1948                // reflects the just-received tokens. Throttled to every 5s.
1949                if last_stream_heartbeat.elapsed().as_millis() as u64 >= 5_000
1950                    && let Some(ref hb) = self.stream_heartbeater
1951                {
1952                    hb.heartbeat(crate::traits::StreamProgress {
1953                        accumulated_len: text.len() + thinking.len(),
1954                        last_delta_at: last_token_at_unix,
1955                    })
1956                    .await;
1957                    last_stream_heartbeat = Instant::now();
1958                }
1959            }
1960            (
1961                text,
1962                thinking,
1963                thinking_signature,
1964                tool_calls,
1965                completion_metadata,
1966                time_to_first_token_ms,
1967            )
1968        };
1969        let (mut text, mut thinking, thinking_signature, mut tool_calls) =
1970            (text, thinking, thinking_signature, tool_calls);
1971
1972        // If a streaming output guardrail tripped, emit
1973        // output.message.replaced and overwrite the assistant output now so
1974        // every downstream event (llm.generation, output.message.completed)
1975        // carries the replacement instead of the model's withheld tokens.
1976        // The original tokens are never persisted or replayed.
1977        if let Some(ref t) = tripped {
1978            let replaced_event_context = EventContext::from_atom_context(context).with_span(
1979                trace_id.to_string(),
1980                Uuid::now_v7().to_string(),
1981                Some(reason_span_id.to_string()),
1982            );
1983            if let Err(e) = self
1984                .event_emitter
1985                .emit(EventRequest::new(
1986                    session_id,
1987                    replaced_event_context,
1988                    OutputMessageReplacedData {
1989                        turn_id: context.turn_id,
1990                        guardrail_capability_id: t.capability_id.clone(),
1991                        guardrail_id: t.guardrail_id.clone(),
1992                        reason_code: t.block.reason_code.clone(),
1993                        replacement: t.block.replacement.clone(),
1994                    },
1995                ))
1996                .await
1997            {
1998                tracing::warn!(
1999                    session_id = %session_id,
2000                    error = %e,
2001                    "ReasonAtom: failed to emit output.message.replaced event"
2002                );
2003            }
2004            text = t.block.replacement.clone();
2005            tool_calls.clear();
2006            thinking.clear();
2007        }
2008
2009        let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
2010
2011        // Extract response_id from completion metadata for chaining and OTel
2012        let response_id = completion_metadata
2013            .as_ref()
2014            .and_then(|meta| meta.response_id.clone());
2015
2016        // 15. Convert completion metadata to TokenUsage.
2017        //
2018        // Cost is tracked as two independent values: the provider's authoritative
2019        // inline cost when present (e.g. OpenRouter's usage.cost), and a price-table
2020        // estimate from the model profile computed whenever profile cost data
2021        // exists. Keeping both lets downstream consumers prefer the actual charge
2022        // while still reconciling estimate-vs-actual drift.
2023        let usage = completion_metadata.as_ref().and_then(|meta| {
2024            match (meta.prompt_tokens, meta.completion_tokens) {
2025                (Some(input), Some(output)) => {
2026                    let actual_cost_usd = meta.provider_cost_usd;
2027                    let estimated_cost_usd = crate::llm_model_profiles::estimate_cost_usd(
2028                        &model_with_provider.provider_type,
2029                        &runtime_agent.model,
2030                        input,
2031                        output,
2032                    );
2033                    Some(
2034                        TokenUsage::with_cache(
2035                            input,
2036                            output,
2037                            meta.cache_read_tokens,
2038                            meta.cache_creation_tokens,
2039                        )
2040                        .with_cost(actual_cost_usd, estimated_cost_usd),
2041                    )
2042                }
2043                _ => None,
2044            }
2045        });
2046
2047        // 16. Emit llm.generation event (child of reason span)
2048        let event_context = EventContext::from_atom_context(context).with_span(
2049            trace_id.to_string(),
2050            Uuid::now_v7().to_string(),
2051            Some(reason_span_id.to_string()),
2052        );
2053        let tools_summary: Vec<ToolDefinitionSummary> =
2054            runtime_agent.tools.iter().map(|t| t.into()).collect();
2055        // Infer finish reasons from content
2056        let finish_reasons = if !tool_calls.is_empty() {
2057            Some(vec!["tool_calls".to_string()])
2058        } else {
2059            Some(vec!["stop".to_string()])
2060        };
2061        // Extract retry info from completion metadata (if retries occurred)
2062        let retry_info = completion_metadata
2063            .as_ref()
2064            .and_then(|meta| meta.retry_metadata.as_ref())
2065            .filter(|rm| rm.had_retries())
2066            .map(|rm| LlmRetryInfo {
2067                attempts: rm.attempts,
2068                total_wait_ms: rm.total_retry_wait.as_millis() as u64,
2069            });
2070        // Build LlmGenerationData with retry and compaction info
2071        let mut generation_data = LlmGenerationData::success_with_retry(
2072            messages_for_event.clone(),
2073            tools_summary,
2074            Some(text.clone()).filter(|s| !s.is_empty()),
2075            tool_calls.clone(),
2076            runtime_agent.model.clone(),
2077            Some(model_with_provider.provider_type.to_string()),
2078            usage.clone(),
2079            Some(llm_duration_ms),
2080            time_to_first_token_ms,
2081            finish_reasons,
2082            response_id.clone(),
2083            retry_info,
2084        );
2085
2086        // Add compaction info if compaction was performed
2087        if let Some(info) = compaction_info {
2088            generation_data = generation_data.with_compaction(info);
2089        }
2090
2091        if let Some(request_options) =
2092            build_request_options(&llm_config, &model_with_provider.provider_type.to_string())
2093        {
2094            generation_data = generation_data.with_request_options(request_options);
2095        }
2096
2097        if let Err(e) = self
2098            .event_emitter
2099            .emit(EventRequest::new(
2100                session_id,
2101                event_context,
2102                generation_data,
2103            ))
2104            .await
2105        {
2106            tracing::warn!(
2107                session_id = %session_id,
2108                error = %e,
2109                "ReasonAtom: failed to emit llm.generation event"
2110            );
2111        }
2112
2113        // 17. Build metadata with model and reasoning effort info
2114        let mut metadata = std::collections::HashMap::new();
2115        metadata.insert(
2116            "model".to_string(),
2117            serde_json::Value::String(runtime_agent.model.clone()),
2118        );
2119        if let Some(ref effort) = reasoning_effort {
2120            metadata.insert(
2121                "reasoning_effort".to_string(),
2122                serde_json::Value::String(effort.clone()),
2123            );
2124        }
2125
2126        // 18. Store and emit output.message.completed event with metadata and usage
2127        let has_tool_calls = !tool_calls.is_empty();
2128        let mut assistant_message = if has_tool_calls {
2129            Message::assistant_with_tools(&text, tool_calls.clone())
2130        } else {
2131            Message::assistant(&text)
2132        };
2133        // Use the API-provided phase when available (preserving the provider's value),
2134        // otherwise derive from state: Commentary for intermediate iterations (with tool
2135        // calls), FinalAnswer for the completed response.
2136        assistant_message.phase = completion_metadata
2137            .as_ref()
2138            .and_then(|meta| meta.phase.as_deref())
2139            .and_then(crate::message::ExecutionPhase::from_provider_str)
2140            .or_else(|| {
2141                Some(crate::message::ExecutionPhase::from_has_tool_calls(
2142                    has_tool_calls,
2143                ))
2144            });
2145        assistant_message.metadata = Some(metadata);
2146        // Store thinking content and signature for extended thinking models
2147        // Both are required for subsequent API calls when thinking is enabled
2148        if !thinking.is_empty() {
2149            assistant_message.thinking = Some(thinking.clone());
2150            assistant_message.thinking_signature = thinking_signature.clone();
2151        }
2152        let output_message_id = assistant_message.id;
2153
2154        // Emit output.message.completed event (this stores the message as an event with proper turn context)
2155        // Include token usage for tracking (child of reason span)
2156        let message_event_context = EventContext::from_atom_context(context).with_span(
2157            trace_id.to_string(),
2158            Uuid::now_v7().to_string(),
2159            Some(reason_span_id.to_string()),
2160        );
2161        let mut output_message_data = OutputMessageCompletedData::new(assistant_message);
2162        if let Some(ref u) = usage {
2163            output_message_data = output_message_data.with_usage(u.clone());
2164        }
2165        self.event_emitter
2166            .emit(EventRequest::new(
2167                session_id,
2168                message_event_context,
2169                output_message_data,
2170            ))
2171            .await?;
2172
2173        tracing::info!(
2174            session_id = %session_id,
2175            turn_id = %context.turn_id,
2176            has_tool_calls = %has_tool_calls,
2177            tool_count = %tool_calls.len(),
2178            "ReasonAtom: LLM call completed"
2179        );
2180
2181        Ok(ReasonResult {
2182            success: true,
2183            text,
2184            tool_calls,
2185            has_tool_calls,
2186            tool_definitions: runtime_agent.tools.clone(),
2187            max_iterations: runtime_agent.max_iterations,
2188            error: None,
2189            usage,
2190            output_message_id: Some(output_message_id),
2191            time_to_first_token_ms,
2192            response_id,
2193            locale: resolved_locale,
2194            network_access: runtime_agent.network_access.clone(),
2195        })
2196    }
2197
2198    /// Resolve model using priority chain: controls > session > agent > harness > system default
2199    /// Create LLM driver using the driver registry
2200    fn create_llm_driver(
2201        &self,
2202        model: &ModelWithProvider,
2203    ) -> Result<crate::llm_driver_registry::BoxedLlmDriver> {
2204        let provider_type = match model.provider_type {
2205            crate::llm_models::LlmProviderType::Openai => ProviderType::OpenAI,
2206            crate::llm_models::LlmProviderType::Openrouter => ProviderType::OpenRouter,
2207            crate::llm_models::LlmProviderType::AzureOpenai => ProviderType::AzureOpenAI,
2208            crate::llm_models::LlmProviderType::OpenaiCompletions => {
2209                ProviderType::OpenAICompletions
2210            }
2211            crate::llm_models::LlmProviderType::Anthropic => ProviderType::Anthropic,
2212            crate::llm_models::LlmProviderType::Gemini => ProviderType::Gemini,
2213            crate::llm_models::LlmProviderType::LlmSim => ProviderType::LlmSim,
2214            crate::llm_models::LlmProviderType::Bedrock => ProviderType::Bedrock,
2215        };
2216
2217        let mut config = ProviderConfig::new(provider_type);
2218        if let Some(ref api_key) = model.api_key {
2219            config = config.with_api_key(api_key);
2220        }
2221        if let Some(ref base_url) = model.base_url {
2222            config = config.with_base_url(base_url);
2223        }
2224
2225        self.driver_registry.create_driver(&config)
2226    }
2227
2228    /// Resolve image_file references to actual image data
2229    ///
2230    /// This method extracts all image_file IDs from the messages and resolves
2231    /// them to base64-encoded image data using the configured ImageResolver.
2232    ///
2233    /// # Returns
2234    ///
2235    /// A HashMap mapping image IDs to ResolvedImage data. If no ImageResolver
2236    /// is configured, or if resolution fails for some images, those images
2237    /// will simply be missing from the map (and converted to placeholder text).
2238    async fn resolve_images(&self, messages: &[Message]) -> HashMap<Uuid, ResolvedImage> {
2239        let mut resolved = HashMap::new();
2240
2241        // Check if we have an image resolver
2242        let resolver = match &self.image_resolver {
2243            Some(r) => r,
2244            None => return resolved,
2245        };
2246
2247        // Collect all unique image_file IDs from all messages
2248        let image_ids: Vec<Uuid> = messages
2249            .iter()
2250            .flat_map(LlmMessage::extract_image_file_ids)
2251            .collect::<std::collections::HashSet<_>>()
2252            .into_iter()
2253            .collect();
2254
2255        if image_ids.is_empty() {
2256            return resolved;
2257        }
2258
2259        tracing::debug!(
2260            image_count = image_ids.len(),
2261            "ReasonAtom: resolving image_file references"
2262        );
2263
2264        // Resolve each image
2265        for image_id in image_ids {
2266            match resolver.resolve_image(image_id).await {
2267                Ok(Some(image)) => {
2268                    resolved.insert(image_id, image);
2269                }
2270                Ok(None) => {
2271                    tracing::warn!(
2272                        image_id = %image_id,
2273                        "ReasonAtom: image not found during resolution"
2274                    );
2275                }
2276                Err(e) => {
2277                    tracing::warn!(
2278                        image_id = %image_id,
2279                        error = %e,
2280                        "ReasonAtom: failed to resolve image"
2281                    );
2282                }
2283            }
2284        }
2285
2286        tracing::debug!(
2287            resolved_count = resolved.len(),
2288            "ReasonAtom: image resolution complete"
2289        );
2290
2291        resolved
2292    }
2293}
2294
2295// ============================================================================
2296// Tests
2297// ============================================================================
2298
2299#[cfg(test)]
2300mod tests {
2301    use super::*;
2302    use crate::llm_driver_registry::{LlmCallConfig, PromptCacheConfig, PromptCacheStrategy};
2303    use std::collections::HashMap;
2304
2305    #[test]
2306    fn test_reason_result_default() {
2307        let result = ReasonResult::default();
2308        assert!(!result.success);
2309        assert!(result.text.is_empty());
2310        assert!(result.tool_calls.is_empty());
2311        assert!(!result.has_tool_calls);
2312        // Default derive gives 0, but serde deserialization gives 100 via default_max_iterations()
2313        assert_eq!(result.max_iterations, 0);
2314    }
2315
2316    #[test]
2317    fn test_reason_result_serde_default() {
2318        // Test that serde uses the default_max_iterations function
2319        let json = r#"{"success":true,"text":"","has_tool_calls":false}"#;
2320        let result: ReasonResult = serde_json::from_str(json).unwrap();
2321        assert_eq!(result.max_iterations, 500);
2322    }
2323
2324    #[test]
2325    fn test_capability_usage_snapshot_keeps_resolved_and_exposed_separate() {
2326        let registry = CapabilityRegistry::with_builtins();
2327        let tool = ToolDefinition::Builtin(crate::tool_types::BuiltinTool {
2328            name: "demo_tool".to_string(),
2329            display_name: None,
2330            description: "demo".to_string(),
2331            parameters: json!({"type": "object"}),
2332            policy: crate::tool_types::ToolPolicy::Auto,
2333            category: None,
2334            deferrable: crate::tool_types::DeferrablePolicy::default(),
2335            hints: crate::tool_types::ToolHints::default(),
2336            full_parameters: None,
2337        })
2338        .with_capability_attribution("cap:demo", Some("Demo Capability"));
2339
2340        let records = capability_usage_snapshot_records(
2341            &registry,
2342            &[crate::AgentCapabilityConfig::new("current_time")],
2343            &[tool],
2344        );
2345
2346        assert!(records.iter().any(|record| {
2347            matches!(record.usage_kind, CapabilityUsageKind::Resolved)
2348                && record.capability_id == "current_time"
2349                && record.tool_name.is_none()
2350        }));
2351        assert!(records.iter().any(|record| {
2352            matches!(record.usage_kind, CapabilityUsageKind::Exposed)
2353                && record.capability_id == "cap:demo"
2354                && record.tool_name.as_deref() == Some("demo_tool")
2355        }));
2356    }
2357
2358    #[test]
2359    fn test_patch_dangling_tool_calls_no_tool_calls() {
2360        let messages = vec![Message::user("Hello"), Message::assistant("Hi there!")];
2361        let patched = patch_dangling_tool_calls(&messages);
2362        assert_eq!(patched.len(), 2);
2363    }
2364
2365    #[test]
2366    fn test_patch_dangling_tool_calls_with_result() {
2367        let tool_call = ToolCall {
2368            id: "call_123".to_string(),
2369            name: "get_weather".to_string(),
2370            arguments: serde_json::json!({"city": "NYC"}),
2371        };
2372
2373        let messages = vec![
2374            Message::user("What's the weather?"),
2375            Message::assistant_with_tools("Let me check", vec![tool_call]),
2376            Message::tool_result("call_123", Some(serde_json::json!({"temp": 72})), None),
2377        ];
2378
2379        let patched = patch_dangling_tool_calls(&messages);
2380        assert_eq!(patched.len(), 3);
2381    }
2382
2383    #[test]
2384    fn test_patch_dangling_tool_calls_missing_result() {
2385        let tool_call = ToolCall {
2386            id: "call_456".to_string(),
2387            name: "search_web".to_string(),
2388            arguments: serde_json::json!({"query": "rust"}),
2389        };
2390
2391        let messages = vec![
2392            Message::user("Search for rust"),
2393            Message::assistant_with_tools("Searching...", vec![tool_call]),
2394            Message::user("Actually, never mind"),
2395        ];
2396
2397        let patched = patch_dangling_tool_calls(&messages);
2398        // Should have added a cancelled result
2399        assert_eq!(patched.len(), 4);
2400        assert_eq!(patched[2].role, MessageRole::ToolResult);
2401        assert_eq!(patched[2].tool_call_id(), Some("call_456"));
2402    }
2403
2404    #[test]
2405    fn test_build_request_options_for_openai_prompt_cache() {
2406        let config = LlmCallConfig {
2407            model: "gpt-5.4".to_string(),
2408            temperature: None,
2409            max_tokens: None,
2410            tools: vec![],
2411            reasoning_effort: None,
2412            metadata: HashMap::new(),
2413            previous_response_id: Some("resp_123".to_string()),
2414            tool_search: None,
2415            prompt_cache: Some(PromptCacheConfig {
2416                enabled: true,
2417                strategy: PromptCacheStrategy::Auto,
2418                gemini_cached_content: None,
2419            }),
2420        };
2421
2422        let request_options = build_request_options(&config, "openai").unwrap();
2423        assert_eq!(
2424            request_options
2425                .prompt_cache
2426                .and_then(|info| info.provider_mode),
2427            Some("prompt_cache_key".to_string())
2428        );
2429        assert_eq!(
2430            request_options.provider_options.get("openai"),
2431            Some(&json!({ "previous_response_id": true }))
2432        );
2433    }
2434
2435    #[test]
2436    fn test_build_request_options_for_gemini_explicit_cache() {
2437        let config = LlmCallConfig {
2438            model: "gemini-2.5-pro".to_string(),
2439            temperature: None,
2440            max_tokens: None,
2441            tools: vec![],
2442            reasoning_effort: None,
2443            metadata: HashMap::new(),
2444            previous_response_id: None,
2445            tool_search: None,
2446            prompt_cache: Some(PromptCacheConfig {
2447                enabled: true,
2448                strategy: PromptCacheStrategy::Auto,
2449                gemini_cached_content: Some("cachedContents/demo-cache".to_string()),
2450            }),
2451        };
2452
2453        let request_options = build_request_options(&config, "gemini").unwrap();
2454        assert_eq!(
2455            request_options
2456                .prompt_cache
2457                .and_then(|info| info.provider_mode),
2458            Some("cached_content".to_string())
2459        );
2460        assert_eq!(
2461            request_options.provider_options.get("gemini"),
2462            Some(&json!({ "cached_content": true }))
2463        );
2464    }
2465
2466    #[test]
2467    fn test_build_request_options_omits_gemini_cache_flag_when_disabled() {
2468        let config = LlmCallConfig {
2469            model: "gemini-2.5-pro".to_string(),
2470            temperature: None,
2471            max_tokens: None,
2472            tools: vec![],
2473            reasoning_effort: None,
2474            metadata: HashMap::new(),
2475            previous_response_id: None,
2476            tool_search: None,
2477            prompt_cache: Some(PromptCacheConfig {
2478                enabled: false,
2479                strategy: PromptCacheStrategy::Auto,
2480                gemini_cached_content: Some("cachedContents/demo-cache".to_string()),
2481            }),
2482        };
2483
2484        assert!(build_request_options(&config, "gemini").is_none());
2485    }
2486}