1use async_trait::async_trait;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use serde_json::json;
23use std::collections::{BTreeSet, HashMap};
24use std::sync::Arc;
25use std::time::Instant;
26use uuid::Uuid;
27
28use super::{Atom, AtomContext};
29use crate::capabilities::CapabilityRegistry;
30use crate::error::{AgentLoopError, Result};
31use crate::events::{
32 CapabilityUsageData, CapabilityUsageKind, CapabilityUsageRecord, EventContext, EventRequest,
33 LlmCompactionInfo, LlmGenerationData, LlmPromptCacheInfo, LlmRequestOptions, LlmRetryInfo,
34 LlmToolSearchInfo, OutputMessageCompletedData, OutputMessageDeltaData,
35 OutputMessageReplacedData, OutputMessageStartedData, ReasonCompletedData, ReasonItemData,
36 ReasonStartedData, ReasonThinkingCompletedData, ReasonThinkingDeltaData,
37 ReasonThinkingStartedData, TokenUsage, ToolDefinitionSummary,
38};
39use crate::llm_driver_registry::{
40 DriverRegistry, LlmCallConfigBuilder, LlmCompletionMetadata, LlmMessage, LlmMessageContent,
41 LlmMessageRole, LlmStreamEvent, ProviderConfig, ProviderType,
42};
43use crate::llm_retry::is_transient_error_message;
44use crate::message::{Message, MessageRole};
45use crate::message_retriever::MessageRetriever;
46use crate::openresponses_protocol::{
47 CompactInputItem, CompactRequest, compact_output_to_messages, messages_to_compact_input,
48};
49use crate::output_guardrail::{
50 ArmedGuardrail, OutputGuardrailContext, TrippedGuardrail, evaluate_guardrails,
51};
52use crate::runtime_context::{AssembledTurnContext, assemble_turn_context};
53use crate::tool_types::{ToolCall, ToolDefinition};
54use crate::traits::{
55 AgentStore, EventEmitter, HarnessStore, ImageResolver, LlmProviderStore, ModelWithProvider,
56 ResolvedImage, SessionStore,
57};
58use crate::typed_id::{AgentId, HarnessId, MessageId, SessionId};
59use crate::{UserFacingErrorContext, user_facing_error_codes};
60
61fn patch_dangling_tool_calls(messages: &[Message]) -> Vec<Message> {
70 let mut result = Vec::new();
71
72 for (i, msg) in messages.iter().enumerate() {
73 result.push(msg.clone());
74
75 if msg.role == MessageRole::Agent && msg.has_tool_calls() {
77 for tc in msg.tool_calls() {
78 let has_result = messages[(i + 1)..]
80 .iter()
81 .any(|m| m.role == MessageRole::ToolResult && m.tool_call_id() == Some(&tc.id));
82
83 if !has_result {
84 result.push(Message::tool_result(
85 &tc.id,
86 None,
87 Some(
88 "cancelled - another message came in before it could be completed"
89 .to_string(),
90 ),
91 ));
92 }
93 }
94 }
95 }
96
97 result
98}
99
100const ERROR_PLACEHOLDER_MESSAGES: &[&str] = &[
103 "I encountered an error while processing your request. Please try again later.",
104 "The AI provider is experiencing issues. Please try again shortly.",
105 "Rate limited by the AI provider. Please wait a moment.",
106 "The conversation has become too long for the model to process. Please start a new session or reduce the context size.",
107 "There is a misconfiguration with the AI provider. Please contact support.",
108];
109
110fn is_error_placeholder_message(msg: &Message) -> bool {
113 if msg.role != MessageRole::Agent {
114 return false;
115 }
116 if msg.has_tool_calls() {
118 return false;
119 }
120 if let Some(metadata) = &msg.metadata
121 && let Some(serde_json::Value::String(code)) = metadata.get("error_code")
122 {
123 return matches!(
124 code.as_str(),
125 user_facing_error_codes::BUDGET_EXHAUSTED
126 | user_facing_error_codes::BUDGET_PAUSED
127 | user_facing_error_codes::MODEL_UNAVAILABLE
128 | user_facing_error_codes::REQUEST_TOO_LARGE
129 | user_facing_error_codes::PROVIDER_RATE_LIMITED
130 | user_facing_error_codes::PROVIDER_MISCONFIGURED
131 | user_facing_error_codes::PROVIDER_UNAVAILABLE
132 | user_facing_error_codes::DEPENDENCY_UNAVAILABLE
133 | user_facing_error_codes::PROCESSING_ERROR
134 );
135 }
136 let text = msg.text().unwrap_or("");
137 ERROR_PLACEHOLDER_MESSAGES.contains(&text) || is_dynamic_error_placeholder(text)
138}
139
140fn is_dynamic_error_placeholder(text: &str) -> bool {
141 (text.starts_with("Budget exhausted.") && text.ends_with("Increase the budget to continue."))
142 || (text.starts_with("Budget paused.")
143 && text.ends_with("Increase or resume the budget to continue."))
144 || (text.starts_with("Budget paused with ")
145 && text.ends_with("Increase or resume the budget to continue."))
146 || (text.starts_with("Soft limit reached.") && text.ends_with("soft limit."))
147 || (text.starts_with("The model `") && text.ends_with("Please select a different model."))
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct ReasonInput {
157 pub context: AtomContext,
159 pub harness_id: HarnessId,
161 #[serde(skip_serializing_if = "Option::is_none")]
163 pub agent_id: Option<AgentId>,
164 #[serde(default)]
166 pub org_id: i64,
167 #[serde(default)]
171 pub mcp_tool_definitions: Vec<ToolDefinition>,
172 #[serde(skip_serializing_if = "Option::is_none")]
175 pub previous_response_id: Option<String>,
176 #[serde(default = "default_iteration")]
179 pub iteration: u32,
180}
181
182fn default_iteration() -> u32 {
183 1
184}
185
186#[derive(Debug, Clone, Default, Serialize, Deserialize)]
188pub struct ReasonResult {
189 pub success: bool,
191 pub text: String,
193 #[serde(default)]
195 pub tool_calls: Vec<ToolCall>,
196 pub has_tool_calls: bool,
198 #[serde(default)]
200 pub tool_definitions: Vec<ToolDefinition>,
201 #[serde(default = "default_max_iterations")]
203 pub max_iterations: usize,
204 #[serde(skip_serializing_if = "Option::is_none")]
206 pub error: Option<String>,
207 #[serde(skip_serializing_if = "Option::is_none")]
209 pub usage: Option<TokenUsage>,
210 #[serde(skip_serializing_if = "Option::is_none")]
212 pub output_message_id: Option<MessageId>,
213 #[serde(skip_serializing_if = "Option::is_none")]
215 pub time_to_first_token_ms: Option<u64>,
216 #[serde(skip_serializing_if = "Option::is_none")]
218 pub response_id: Option<String>,
219 #[serde(skip_serializing_if = "Option::is_none")]
221 pub locale: Option<String>,
222 #[serde(default, skip_serializing_if = "Option::is_none")]
224 pub network_access: Option<crate::network_access::NetworkAccessList>,
225}
226
227fn default_max_iterations() -> usize {
228 500
229}
230
231fn build_request_options(
232 config: &crate::llm_driver_registry::LlmCallConfig,
233 provider: &str,
234) -> Option<LlmRequestOptions> {
235 let prompt_cache = config
236 .prompt_cache
237 .as_ref()
238 .filter(|cfg| cfg.enabled)
239 .map(|cfg| LlmPromptCacheInfo {
240 enabled: true,
241 strategy: cfg.strategy,
242 provider_mode: match provider {
243 "openai" => Some("prompt_cache_key".to_string()),
244 "anthropic" => Some("cache_control".to_string()),
245 "gemini" => Some(
246 if cfg.gemini_cached_content.is_some() {
247 "cached_content"
248 } else {
249 "implicit"
250 }
251 .to_string(),
252 ),
253 _ => None,
254 },
255 });
256
257 let tool_search = config
258 .tool_search
259 .as_ref()
260 .filter(|cfg| cfg.enabled)
261 .map(|cfg| LlmToolSearchInfo {
262 enabled: true,
263 threshold: cfg.threshold,
264 });
265
266 let mut provider_options = HashMap::new();
267 if provider == "openai" && config.previous_response_id.is_some() {
268 provider_options.insert(
269 "openai".to_string(),
270 json!({ "previous_response_id": true }),
271 );
272 }
273 if provider == "gemini"
274 && config
275 .prompt_cache
276 .as_ref()
277 .filter(|cfg| cfg.enabled)
278 .and_then(|cfg| cfg.gemini_cached_content.as_ref())
279 .is_some()
280 {
281 provider_options.insert("gemini".to_string(), json!({ "cached_content": true }));
282 }
283
284 let request_options = LlmRequestOptions {
285 prompt_cache,
286 tool_search,
287 provider_options,
288 };
289
290 (!request_options.is_empty()).then_some(request_options)
291}
292
293fn capability_name_snapshot(registry: &CapabilityRegistry, capability_id: &str) -> Option<String> {
294 registry
295 .get(capability_id)
296 .map(|capability| capability.name().to_string())
297}
298
299fn capability_usage_snapshot_records(
300 registry: &CapabilityRegistry,
301 resolved_capability_configs: &[crate::AgentCapabilityConfig],
302 tool_definitions: &[ToolDefinition],
303) -> Vec<CapabilityUsageRecord> {
304 let mut records = Vec::new();
305 let mut seen = BTreeSet::new();
306
307 for config in resolved_capability_configs {
308 let capability_id = config.capability_id().to_string();
309 if seen.insert((
310 "resolved".to_string(),
311 capability_id.clone(),
312 None::<String>,
313 )) {
314 records.push(CapabilityUsageRecord {
315 capability_name: capability_name_snapshot(registry, &capability_id),
316 capability_id,
317 usage_kind: CapabilityUsageKind::Resolved,
318 tool_name: None,
319 usage_count: Some(1),
320 duration_ms: None,
321 });
322 }
323 }
324
325 for tool in tool_definitions {
326 let Some((capability_id, capability_name)) = tool.capability_attribution() else {
327 continue;
328 };
329 let capability_id = capability_id.to_string();
330 let tool_name = tool.name().to_string();
331 if seen.insert((
332 "exposed".to_string(),
333 capability_id.clone(),
334 Some(tool_name.clone()),
335 )) {
336 records.push(CapabilityUsageRecord {
337 capability_name: capability_name
338 .map(str::to_string)
339 .or_else(|| capability_name_snapshot(registry, &capability_id)),
340 capability_id,
341 usage_kind: CapabilityUsageKind::Exposed,
342 tool_name: Some(tool_name),
343 usage_count: Some(1),
344 duration_ms: None,
345 });
346 }
347 }
348
349 records
350}
351
352pub struct ReasonAtom {
371 harness_store: Arc<dyn HarnessStore>,
372 agent_store: Arc<dyn AgentStore>,
373 session_store: Arc<dyn SessionStore>,
374 message_retriever: Arc<dyn MessageRetriever>,
375 provider_store: Arc<dyn LlmProviderStore>,
376 capability_registry: CapabilityRegistry,
377 driver_registry: DriverRegistry,
378 event_emitter: Arc<dyn EventEmitter>,
379 image_resolver: Option<Arc<dyn ImageResolver>>,
381 file_store: Option<Arc<dyn crate::traits::SessionFileSystem>>,
384 stream_heartbeater: Option<Arc<dyn crate::traits::StreamHeartbeater>>,
386 provider_stall_timeout: Option<std::time::Duration>,
388}
389
390impl ReasonAtom {
391 #[allow(clippy::too_many_arguments)]
393 pub fn new(
394 harness_store: impl HarnessStore + 'static,
395 agent_store: impl AgentStore + 'static,
396 session_store: impl SessionStore + 'static,
397 message_retriever: impl MessageRetriever + 'static,
398 provider_store: impl LlmProviderStore + 'static,
399 capability_registry: CapabilityRegistry,
400 driver_registry: DriverRegistry,
401 event_emitter: impl EventEmitter + 'static,
402 ) -> Self {
403 Self {
404 harness_store: Arc::new(harness_store),
405 agent_store: Arc::new(agent_store),
406 session_store: Arc::new(session_store),
407 message_retriever: Arc::new(message_retriever),
408 provider_store: Arc::new(provider_store),
409 capability_registry,
410 driver_registry,
411 event_emitter: Arc::new(event_emitter),
412 image_resolver: None,
413 file_store: None,
414 stream_heartbeater: None,
415 provider_stall_timeout: None,
416 }
417 }
418
419 pub fn with_file_store(
426 mut self,
427 file_store: Arc<dyn crate::traits::SessionFileSystem>,
428 ) -> Self {
429 self.file_store = Some(file_store);
430 self
431 }
432
433 pub fn with_image_resolver(mut self, resolver: Arc<dyn ImageResolver>) -> Self {
446 self.image_resolver = Some(resolver);
447 self
448 }
449
450 pub fn with_stream_heartbeater(
452 mut self,
453 heartbeater: Arc<dyn crate::traits::StreamHeartbeater>,
454 ) -> Self {
455 self.stream_heartbeater = Some(heartbeater);
456 self
457 }
458
459 pub fn with_provider_stall_timeout(mut self, timeout: std::time::Duration) -> Self {
462 self.provider_stall_timeout = Some(timeout);
463 self
464 }
465}
466
467#[async_trait]
468impl Atom for ReasonAtom {
469 type Input = ReasonInput;
470 type Output = ReasonResult;
471
472 fn name(&self) -> &'static str {
473 "reason"
474 }
475
476 async fn execute(&self, input: Self::Input) -> Result<Self::Output> {
477 self.execute_inner(input, None).await
478 }
479}
480
481impl ReasonAtom {
482 pub async fn execute_with_assembled_context(
487 &self,
488 input: ReasonInput,
489 assembled: AssembledTurnContext,
490 ) -> Result<ReasonResult> {
491 self.execute_inner(input, Some(assembled)).await
492 }
493
494 async fn emit_capability_usage_snapshot(
495 &self,
496 session_id: SessionId,
497 context: &AtomContext,
498 resolved_capability_configs: &[crate::AgentCapabilityConfig],
499 tool_definitions: &[ToolDefinition],
500 ) {
501 let records = capability_usage_snapshot_records(
502 &self.capability_registry,
503 resolved_capability_configs,
504 tool_definitions,
505 );
506 if records.is_empty() {
507 return;
508 }
509
510 if let Err(error) = self
511 .event_emitter
512 .emit(EventRequest::new(
513 session_id,
514 EventContext::from_atom_context(context),
515 CapabilityUsageData { records },
516 ))
517 .await
518 {
519 tracing::warn!(
520 session_id = %session_id,
521 error = %error,
522 "ReasonAtom: failed to emit capability.usage event"
523 );
524 }
525 }
526
527 async fn execute_inner(
528 &self,
529 input: ReasonInput,
530 assembled: Option<AssembledTurnContext>,
531 ) -> Result<ReasonResult> {
532 let ReasonInput {
533 context,
534 harness_id,
535 agent_id,
536 org_id,
537 mcp_tool_definitions,
538 previous_response_id,
539 iteration,
540 } = input;
541
542 tracing::info!(
543 session_id = %context.session_id,
544 turn_id = %context.turn_id,
545 exec_id = %context.exec_id,
546 harness_id = %harness_id,
547 agent_id = ?agent_id,
548 mcp_tools_count = %mcp_tool_definitions.len(),
549 "ReasonAtom: starting LLM call"
550 );
551
552 let trace_id = context.turn_id.to_string();
560 let reason_span_id = Uuid::now_v7().to_string();
561 let parent_span_id = trace_id.clone(); let event_context = EventContext::from_atom_context(&context).with_span(
565 trace_id.clone(),
566 reason_span_id.clone(),
567 Some(parent_span_id.clone()),
568 );
569
570 let reason_start = Instant::now();
572
573 if let Err(e) = self
575 .event_emitter
576 .emit(EventRequest::new(
577 context.session_id,
578 event_context.clone(),
579 ReasonStartedData {
580 harness_id,
581 agent_id,
582 metadata: None, },
584 ))
585 .await
586 {
587 tracing::warn!(
588 session_id = %context.session_id,
589 error = %e,
590 "ReasonAtom: failed to emit reason.started event"
591 );
592 }
593
594 let result = match self
596 .execute_llm_call(
597 context.session_id,
598 harness_id,
599 agent_id,
600 org_id,
601 &context,
602 &mcp_tool_definitions,
603 &trace_id,
604 &reason_span_id,
605 previous_response_id,
606 iteration,
607 assembled,
608 )
609 .await
610 {
611 Ok(result) => {
612 let reason_duration_ms = reason_start.elapsed().as_millis() as u64;
614
615 let completed_context = EventContext::from_atom_context(&context).with_span(
617 trace_id.clone(),
618 reason_span_id.clone(), Some(parent_span_id.clone()),
620 );
621 if let Err(e) = self
622 .event_emitter
623 .emit(EventRequest::new(
624 context.session_id,
625 completed_context,
626 ReasonCompletedData::success(
627 &result.text,
628 result.has_tool_calls,
629 result.tool_calls.len() as u32,
630 Some(reason_duration_ms),
631 result.usage.clone(),
632 ),
633 ))
634 .await
635 {
636 tracing::warn!(
637 session_id = %context.session_id,
638 error = %e,
639 "ReasonAtom: failed to emit reason.completed event"
640 );
641 }
642 result
643 }
644 Err(e) => {
645 let reason_duration_ms = reason_start.elapsed().as_millis() as u64;
647
648 tracing::warn!(
651 session_id = %context.session_id,
652 turn_id = %context.turn_id,
653 error = %e,
654 "ReasonAtom: LLM call failed"
655 );
656
657 let error_msg = e.to_string();
658 let user_error = e.user_facing_error(UserFacingErrorContext::default());
659 let user_error_text = user_error.fallback_message();
660
661 let is_transient = is_transient_error_message(&error_msg);
668 let mut output_message_id = None;
669
670 if !is_transient {
671 let mut error_message = Message::assistant(&user_error_text);
673 let mut metadata = std::collections::HashMap::new();
674 user_error.apply_to_message_metadata(&mut metadata);
675 error_message.metadata = Some(metadata);
676
677 output_message_id = Some(error_message.id);
678
679 let error_msg_context = EventContext::from_atom_context(&context).with_span(
682 trace_id.clone(),
683 Uuid::now_v7().to_string(), Some(reason_span_id.clone()), );
686 if let Err(emit_err) = self
687 .event_emitter
688 .emit(EventRequest::new(
689 context.session_id,
690 error_msg_context,
691 OutputMessageCompletedData::new(error_message)
692 .with_user_facing_error(&user_error),
693 ))
694 .await
695 {
696 tracing::warn!(
697 session_id = %context.session_id,
698 error = %emit_err,
699 "ReasonAtom: failed to emit output.message.completed event for error"
700 );
701 }
702 } else {
703 tracing::info!(
704 session_id = %context.session_id,
705 "ReasonAtom: skipping error event for transient LLM error (will be retried)"
706 );
707 }
708
709 let completed_context = EventContext::from_atom_context(&context).with_span(
711 trace_id.clone(),
712 reason_span_id.clone(), Some(parent_span_id.clone()),
714 );
715 if let Err(emit_err) = self
716 .event_emitter
717 .emit(EventRequest::new(
718 context.session_id,
719 completed_context,
720 ReasonCompletedData::failure(error_msg.clone(), Some(reason_duration_ms)),
721 ))
722 .await
723 {
724 tracing::warn!(
725 session_id = %context.session_id,
726 error = %emit_err,
727 "ReasonAtom: failed to emit reason.completed event"
728 );
729 }
730
731 ReasonResult {
732 success: false,
733 text: user_error_text,
734 tool_calls: vec![],
735 has_tool_calls: false,
736 tool_definitions: vec![],
737 max_iterations: default_max_iterations(),
738 error: Some(error_msg),
739 usage: None,
740 output_message_id,
741 time_to_first_token_ms: None,
742 response_id: None,
743 locale: None,
744 network_access: None,
745 }
746 }
747 };
748
749 Ok(result)
750 }
751
752 #[allow(clippy::too_many_arguments)]
754 async fn execute_llm_call(
755 &self,
756 session_id: SessionId,
757 harness_id: HarnessId,
758 agent_id: Option<AgentId>,
759 org_id: i64,
760 context: &AtomContext,
761 mcp_tool_definitions: &[ToolDefinition],
762 trace_id: &str,
763 reason_span_id: &str,
764 previous_response_id: Option<String>,
765 iteration: u32,
766 assembled: Option<AssembledTurnContext>,
767 ) -> Result<ReasonResult> {
768 let assembled = match assembled {
769 Some(assembled) => assembled,
770 None => {
771 assemble_turn_context(
772 self.harness_store.as_ref(),
773 self.agent_store.as_ref(),
774 self.session_store.as_ref(),
775 self.message_retriever.as_ref(),
776 self.provider_store.as_ref(),
777 &self.capability_registry,
778 session_id,
779 harness_id,
780 agent_id,
781 mcp_tool_definitions,
782 self.file_store.clone(),
783 )
784 .await?
785 }
786 };
787
788 let messages = assembled.messages;
789 let prior_usage = assembled.session.usage.clone();
790 let model_with_provider = assembled.model_with_provider;
791 let resolved_model_id = assembled.resolved_model_id;
792 let resolved_locale = assembled.resolved_locale;
793 let compaction_config = assembled.compaction_config;
794 let resolved_capability_configs = assembled.resolved_capability_configs;
795 let runtime_agent = assembled.runtime_agent;
796
797 self.emit_capability_usage_snapshot(
798 session_id,
799 context,
800 &resolved_capability_configs,
801 &runtime_agent.tools,
802 )
803 .await;
804
805 let guardrail_providers: Vec<(
812 &str,
813 &serde_json::Value,
814 Arc<dyn crate::output_guardrail::OutputGuardrail>,
815 )> = resolved_capability_configs
816 .iter()
817 .filter_map(|cfg| {
818 let cap_id = cfg.capability_ref.as_str();
819 let cap = self.capability_registry.get(cap_id)?;
820 let guards = cap.output_guardrails();
821 if guards.is_empty() {
822 return None;
823 }
824 Some(
825 guards
826 .into_iter()
827 .map(move |g| (cap_id, &cfg.config, g))
828 .collect::<Vec<_>>(),
829 )
830 })
831 .flatten()
832 .collect();
833
834 let llm_driver = self.create_llm_driver(&model_with_provider)?;
836
837 let reasoning_effort = messages
842 .iter()
843 .rev()
844 .find(|m| m.role == MessageRole::User)
845 .and_then(|m| m.controls.as_ref())
846 .and_then(|c| c.reasoning.as_ref())
847 .and_then(|r| r.effort.clone())
848 .filter(|effort| {
849 if effort.eq_ignore_ascii_case("none") {
851 return false;
852 }
853 let profile = crate::llm_model_profiles::get_model_profile(
856 &model_with_provider.provider_type,
857 &model_with_provider.model,
858 );
859 match profile {
860 Some(p) if !p.reasoning => {
861 tracing::warn!(
862 model = %model_with_provider.model,
863 effort = %effort,
864 "Stripping reasoning_effort: model does not support reasoning"
865 );
866 false
867 }
868 _ => true,
869 }
870 });
871
872 let patched_messages = patch_dangling_tool_calls(&messages);
874
875 let model_view_providers = crate::capabilities::collect_model_view_providers(
878 &resolved_capability_configs,
879 &self.capability_registry,
880 Some(model_with_provider.model.as_str()),
881 );
882 let model_view_context = crate::capabilities::ModelViewContext {
883 session_id,
884 prior_usage: prior_usage.as_ref(),
885 };
886 let context_messages =
887 model_view_providers.apply_model_view(patched_messages, &model_view_context);
888
889 let resolved_images = self.resolve_images(&context_messages).await;
894
895 let mut llm_messages = Vec::new();
897
898 let has_system_prompt = !runtime_agent.system_prompt.is_empty();
900 if has_system_prompt {
901 llm_messages.push(LlmMessage {
902 role: LlmMessageRole::System,
903 content: LlmMessageContent::Text(runtime_agent.system_prompt.clone()),
904 tool_calls: None,
905 tool_call_id: None,
906 phase: None,
907 thinking: None,
908 thinking_signature: None,
909 });
910 }
911
912 let messages_for_event: Vec<Message> = if has_system_prompt {
914 std::iter::once(Message::system(&runtime_agent.system_prompt))
915 .chain(context_messages.iter().cloned())
916 .collect()
917 } else {
918 context_messages.clone()
919 };
920
921 let mut stripped_error_count = 0u32;
927 for msg in &context_messages {
928 if is_error_placeholder_message(msg) {
929 stripped_error_count += 1;
930 continue;
931 }
932 let mut llm_msg = LlmMessage::from_message_with_images(msg, &resolved_images);
933 if msg.role == MessageRole::User
934 && let Some(ref actor) = msg.external_actor
935 {
936 llm_msg.prepend_text_prefix(&format!("[{}] ", actor.display_label()));
937 }
938 llm_messages.push(llm_msg);
939 }
940 if stripped_error_count > 0 {
941 tracing::info!(
942 session_id = %session_id,
943 stripped_error_count,
944 "ReasonAtom: stripped error placeholder messages from LLM input"
945 );
946 }
947
948 let mut llm_config_builder = LlmCallConfigBuilder::from(&runtime_agent);
950 if let Some(effort) = reasoning_effort.clone() {
951 llm_config_builder = llm_config_builder.reasoning_effort(effort);
952 }
953
954 llm_config_builder = llm_config_builder
958 .with_metadata("session_id", session_id.to_string())
959 .with_metadata("harness_id", harness_id.to_string())
960 .with_metadata("turn_id", context.turn_id.to_string())
961 .with_metadata("exec_id", context.exec_id.to_string())
962 .with_metadata("org_id", format!("org_{:032x}", org_id));
963 if let Some(agent_id) = agent_id {
964 llm_config_builder = llm_config_builder.with_metadata("agent_id", agent_id.to_string());
965 }
966
967 if let Some(model_id) = &resolved_model_id {
969 llm_config_builder = llm_config_builder.with_metadata("model_id", model_id.to_string());
970 }
971
972 let llm_config = llm_config_builder
973 .previous_response_id(previous_response_id.clone())
974 .build();
975
976 tracing::debug!(
977 session_id = %session_id,
978 turn_id = %context.turn_id,
979 model = %runtime_agent.model,
980 message_count = %llm_messages.len(),
981 "ReasonAtom: calling LLM"
982 );
983
984 let streaming_event_context = EventContext::from_atom_context(context);
987
988 let mut armed_guardrails: Vec<ArmedGuardrail> = Vec::new();
995 for (cap_id, cfg, provider) in &guardrail_providers {
996 let ctx = OutputGuardrailContext {
997 system_prompt: &runtime_agent.system_prompt,
998 config: cfg,
999 };
1000 let guardrail_id = provider.id().to_string();
1001 if let Some(run) = provider.arm(&ctx) {
1002 armed_guardrails.push(ArmedGuardrail {
1003 capability_id: (*cap_id).to_string(),
1004 guardrail_id,
1005 run,
1006 });
1007 }
1008 }
1009 let mut tripped: Option<TrippedGuardrail> = None;
1010 tracing::info!(
1011 session_id = %session_id,
1012 turn_id = %context.turn_id,
1013 "ReasonAtom: emitting output.message.started event"
1014 );
1015 if let Err(e) = self
1016 .event_emitter
1017 .emit(EventRequest::new(
1018 session_id,
1019 streaming_event_context.clone(),
1020 OutputMessageStartedData {
1021 turn_id: context.turn_id,
1022 model: Some(runtime_agent.model.clone()),
1023 iteration: Some(iteration),
1024 },
1025 ))
1026 .await
1027 {
1028 tracing::warn!(
1029 session_id = %session_id,
1030 error = %e,
1031 "ReasonAtom: failed to emit output.message.started event"
1032 );
1033 } else {
1034 tracing::info!(
1035 session_id = %session_id,
1036 "ReasonAtom: output.message.started event emitted successfully"
1037 );
1038 }
1039
1040 let thinking_enabled = reasoning_effort.is_some();
1042 if thinking_enabled {
1043 tracing::info!(
1044 session_id = %session_id,
1045 turn_id = %context.turn_id,
1046 "ReasonAtom: emitting reason.thinking.started event"
1047 );
1048 if let Err(e) = self
1049 .event_emitter
1050 .emit(EventRequest::new(
1051 session_id,
1052 streaming_event_context.clone(),
1053 ReasonThinkingStartedData {
1054 turn_id: context.turn_id,
1055 model: Some(runtime_agent.model.clone()),
1056 },
1057 ))
1058 .await
1059 {
1060 tracing::warn!(
1061 session_id = %session_id,
1062 error = %e,
1063 "ReasonAtom: failed to emit reason.thinking.started event"
1064 );
1065 } else {
1066 tracing::info!(
1067 session_id = %session_id,
1068 "ReasonAtom: reason.thinking.started event emitted successfully"
1069 );
1070 }
1071 }
1072
1073 let llm_start = Instant::now();
1075
1076 let mut compaction_info: Option<LlmCompactionInfo> = None;
1080 let mut llm_messages_for_call = llm_messages.clone();
1081
1082 if let Some(ref config) = compaction_config {
1085 let context_window = crate::llm_model_profiles::get_model_profile(
1086 &model_with_provider.provider_type,
1087 &model_with_provider.model,
1088 )
1089 .and_then(|p| p.limits.map(|l| l.context as usize))
1090 .unwrap_or(128_000);
1091
1092 if crate::capabilities::should_compact_proactively(
1093 &llm_messages_for_call,
1094 config,
1095 context_window,
1096 ) {
1097 use crate::capabilities::{
1098 CompactionStrategy, aggressive_trim, apply_observation_masking,
1099 estimate_total_tokens,
1100 };
1101 use crate::events::{
1102 CompactionReason, CompactionStepData, ContextCompactedData,
1103 ContextCompactingData,
1104 };
1105
1106 let messages_before = llm_messages_for_call.len();
1107 let cascade_start = Instant::now();
1108 let mut strategies_used: Vec<String> = Vec::new();
1109 let mut steps: Vec<CompactionStepData> = Vec::new();
1110
1111 tracing::info!(
1112 session_id = %session_id,
1113 strategy = %config.strategy,
1114 messages = messages_before,
1115 "ReasonAtom: proactive compaction triggered (budget threshold exceeded)"
1116 );
1117
1118 let _ = self
1120 .event_emitter
1121 .emit(EventRequest::new(
1122 session_id,
1123 streaming_event_context.clone(),
1124 ContextCompactingData {
1125 reason: CompactionReason::ProactiveBudget,
1126 strategy: config.strategy.to_string(),
1127 messages_before,
1128 },
1129 ))
1130 .await;
1131
1132 let run_masking = matches!(
1133 config.strategy,
1134 CompactionStrategy::Auto | CompactionStrategy::ObservationMasking
1135 );
1136
1137 if run_masking {
1139 let step_start = Instant::now();
1140 let conversation_msgs = if has_system_prompt {
1141 &llm_messages_for_call[1..]
1142 } else {
1143 &llm_messages_for_call[..]
1144 };
1145
1146 let masking_result =
1147 apply_observation_masking(conversation_msgs, &config.observation_masking);
1148
1149 if masking_result.masked_count > 0 {
1150 let mut new_messages = Vec::new();
1151 if has_system_prompt {
1152 new_messages.push(llm_messages_for_call[0].clone());
1153 }
1154 new_messages.extend(masking_result.messages);
1155 llm_messages_for_call = new_messages;
1156
1157 let step_duration = step_start.elapsed().as_millis() as u64;
1158 strategies_used.push("observation_masking".to_string());
1159 steps.push(CompactionStepData {
1160 strategy: "observation_masking".to_string(),
1161 messages_after: llm_messages_for_call.len(),
1162 duration_ms: step_duration,
1163 });
1164 }
1165 }
1166
1167 let budget_tokens = (context_window as f32 * config.budget_percent) as usize;
1169 if estimate_total_tokens(&llm_messages_for_call) > budget_tokens {
1170 let step_start = Instant::now();
1171 llm_messages_for_call =
1172 aggressive_trim(&llm_messages_for_call, budget_tokens, has_system_prompt);
1173
1174 let step_duration = step_start.elapsed().as_millis() as u64;
1175 strategies_used.push("aggressive_trim".to_string());
1176 steps.push(CompactionStepData {
1177 strategy: "aggressive_trim".to_string(),
1178 messages_after: llm_messages_for_call.len(),
1179 duration_ms: step_duration,
1180 });
1181 }
1182
1183 let cascade_duration = cascade_start.elapsed().as_millis() as u64;
1184 let messages_after = llm_messages_for_call.len();
1185
1186 if !strategies_used.is_empty() {
1187 let strategy_used = strategies_used.join("+");
1188
1189 let _ = self
1190 .event_emitter
1191 .emit(EventRequest::new(
1192 session_id,
1193 streaming_event_context.clone(),
1194 ContextCompactedData {
1195 strategy_used: strategy_used.clone(),
1196 messages_before,
1197 messages_after,
1198 duration_ms: cascade_duration,
1199 steps,
1200 },
1201 ))
1202 .await;
1203
1204 tracing::info!(
1205 session_id = %session_id,
1206 strategy = %strategy_used,
1207 messages_before,
1208 messages_after,
1209 duration_ms = cascade_duration,
1210 "ReasonAtom: proactive compaction completed"
1211 );
1212 }
1213 }
1214 }
1215
1216 const DELTA_BATCH_INTERVAL_MS: u64 = 100;
1219 let (
1220 text,
1221 thinking,
1222 thinking_signature,
1223 tool_calls,
1224 completion_metadata,
1225 time_to_first_token_ms,
1226 ) = {
1227 let mut stream = match llm_driver
1228 .chat_completion_stream(llm_messages_for_call.clone(), &llm_config)
1229 .await
1230 {
1231 Ok(stream) => stream,
1232 Err(e) if e.is_request_too_large() => {
1233 use crate::capabilities::{CompactionStrategy, apply_observation_masking};
1235 use crate::events::{
1236 CompactionReason, CompactionStepData, ContextCompactedData,
1237 ContextCompactingData,
1238 };
1239
1240 let Some(config) = compaction_config.clone() else {
1241 tracing::warn!(
1242 session_id = %session_id,
1243 turn_id = %context.turn_id,
1244 "ReasonAtom: context too large and compaction capability is not enabled"
1245 );
1246 return Err(e);
1247 };
1248 let messages_before = llm_messages_for_call.len();
1249
1250 tracing::info!(
1251 session_id = %session_id,
1252 turn_id = %context.turn_id,
1253 strategy = %config.strategy,
1254 messages = messages_before,
1255 "ReasonAtom: context too large, attempting compaction"
1256 );
1257
1258 let _ = self
1260 .event_emitter
1261 .emit(EventRequest::new(
1262 session_id,
1263 streaming_event_context.clone(),
1264 ContextCompactingData {
1265 reason: CompactionReason::RequestTooLarge,
1266 strategy: config.strategy.to_string(),
1267 messages_before,
1268 },
1269 ))
1270 .await;
1271
1272 let cascade_start = Instant::now();
1273 let mut steps: Vec<CompactionStepData> = Vec::new();
1274 let mut strategies_used: Vec<String> = Vec::new();
1275
1276 let run_masking = matches!(
1278 config.strategy,
1279 CompactionStrategy::Auto | CompactionStrategy::ObservationMasking
1280 );
1281 let run_native = matches!(
1282 config.strategy,
1283 CompactionStrategy::Auto | CompactionStrategy::Native
1284 ) && llm_driver.supports_compact();
1285 let run_summarization = matches!(
1286 config.strategy,
1287 CompactionStrategy::Auto | CompactionStrategy::Summarization
1288 );
1289
1290 if run_masking {
1292 let step_start = Instant::now();
1293 let conversation_msgs = if has_system_prompt {
1294 &llm_messages_for_call[1..]
1295 } else {
1296 &llm_messages_for_call[..]
1297 };
1298
1299 let masking_result = apply_observation_masking(
1300 conversation_msgs,
1301 &config.observation_masking,
1302 );
1303
1304 if masking_result.masked_count > 0 {
1305 let mut new_messages = Vec::new();
1306 if has_system_prompt {
1307 new_messages.push(llm_messages_for_call[0].clone());
1308 }
1309 new_messages.extend(masking_result.messages);
1310 llm_messages_for_call = new_messages;
1311
1312 let step_duration = step_start.elapsed().as_millis() as u64;
1313 strategies_used.push("observation_masking".to_string());
1314 steps.push(CompactionStepData {
1315 strategy: "observation_masking".to_string(),
1316 messages_after: llm_messages_for_call.len(),
1317 duration_ms: step_duration,
1318 });
1319
1320 tracing::info!(
1321 session_id = %session_id,
1322 masked_count = masking_result.masked_count,
1323 duration_ms = step_duration,
1324 "ReasonAtom: observation masking applied"
1325 );
1326 }
1327 }
1328
1329 if run_native {
1331 let step_start = Instant::now();
1332 let messages_to_compact = if has_system_prompt {
1333 &llm_messages_for_call[1..]
1334 } else {
1335 &llm_messages_for_call[..]
1336 };
1337
1338 let compact_input = messages_to_compact_input(messages_to_compact);
1339 let input_count = compact_input.len();
1340
1341 let compact_request = CompactRequest {
1342 model: runtime_agent.model.clone(),
1343 input: compact_input,
1344 previous_response_id: previous_response_id.clone(),
1345 instructions: if has_system_prompt {
1346 Some(runtime_agent.system_prompt.clone())
1347 } else {
1348 None
1349 },
1350 };
1351
1352 match llm_driver.compact(compact_request).await {
1353 Ok(Some(compact_response)) => {
1354 let (compacted_messages, compaction_items) =
1355 compact_output_to_messages(&compact_response.output);
1356
1357 let input_tokens_after = compact_response
1358 .usage
1359 .as_ref()
1360 .and_then(|u| u.output_tokens);
1361
1362 compaction_info = Some(LlmCompactionInfo::new(
1363 Some(input_count as u32),
1364 input_tokens_after,
1365 Some(step_start.elapsed().as_millis() as u64),
1366 ));
1367
1368 let mut compacted_llm_messages = Vec::new();
1369 if has_system_prompt {
1370 compacted_llm_messages.push(llm_messages_for_call[0].clone());
1371 }
1372 compacted_llm_messages.extend(compacted_messages);
1373
1374 for item in compaction_items {
1375 if let CompactInputItem::Compaction { encrypted_content } = item
1376 {
1377 compacted_llm_messages.push(LlmMessage {
1378 role: LlmMessageRole::System,
1379 content: LlmMessageContent::Text(format!(
1380 "[COMPACTED_CONTEXT:{encrypted_content}]"
1381 )),
1382 tool_calls: None,
1383 tool_call_id: None,
1384 phase: None,
1385 thinking: None,
1386 thinking_signature: None,
1387 });
1388 }
1389 }
1390
1391 llm_messages_for_call = compacted_llm_messages;
1392
1393 let step_duration = step_start.elapsed().as_millis() as u64;
1394 strategies_used.push("native".to_string());
1395 steps.push(CompactionStepData {
1396 strategy: "native".to_string(),
1397 messages_after: llm_messages_for_call.len(),
1398 duration_ms: step_duration,
1399 });
1400
1401 tracing::info!(
1402 session_id = %session_id,
1403 duration_ms = step_duration,
1404 messages_after = llm_messages_for_call.len(),
1405 "ReasonAtom: native compaction applied"
1406 );
1407 }
1408 Ok(None) | Err(_) => {
1409 tracing::warn!(
1410 session_id = %session_id,
1411 "ReasonAtom: native compaction unavailable, continuing cascade"
1412 );
1413 }
1414 }
1415 }
1416
1417 if run_summarization && !strategies_used.contains(&"native".to_string()) {
1420 use crate::capabilities::{
1421 build_summarization_prompt, build_summary_message,
1422 format_messages_for_summarization,
1423 };
1424
1425 let step_start = Instant::now();
1426 let conversation_msgs = if has_system_prompt {
1427 &llm_messages_for_call[1..]
1428 } else {
1429 &llm_messages_for_call[..]
1430 };
1431
1432 let keep_recent = 10.min(conversation_msgs.len());
1434 let to_summarize =
1435 &conversation_msgs[..conversation_msgs.len() - keep_recent];
1436 let recent = &conversation_msgs[conversation_msgs.len() - keep_recent..];
1437
1438 if !to_summarize.is_empty() {
1439 let summary_prompt = build_summarization_prompt(&config.summarization);
1440 let messages_text = format_messages_for_summarization(to_summarize);
1441
1442 let summary_messages = vec![
1444 LlmMessage {
1445 role: LlmMessageRole::System,
1446 content: LlmMessageContent::Text(summary_prompt),
1447 tool_calls: None,
1448 tool_call_id: None,
1449 phase: None,
1450 thinking: None,
1451 thinking_signature: None,
1452 },
1453 LlmMessage {
1454 role: LlmMessageRole::User,
1455 content: LlmMessageContent::Text(messages_text),
1456 tool_calls: None,
1457 tool_call_id: None,
1458 phase: None,
1459 thinking: None,
1460 thinking_signature: None,
1461 },
1462 ];
1463
1464 let summary_config = crate::llm_driver_registry::LlmCallConfig {
1465 model: config
1466 .summarization
1467 .model
1468 .clone()
1469 .unwrap_or_else(|| runtime_agent.model.clone()),
1470 temperature: Some(0.0),
1471 max_tokens: Some(2000),
1472 tools: vec![],
1473 reasoning_effort: None,
1474 metadata: HashMap::new(),
1475 previous_response_id: None,
1476 tool_search: None,
1477 prompt_cache: None,
1478 };
1479
1480 match llm_driver
1481 .chat_completion(summary_messages, &summary_config)
1482 .await
1483 {
1484 Ok(response) => {
1485 let summary_text = response.text;
1486 let summary_msg = build_summary_message(&summary_text);
1487
1488 let mut new_messages = Vec::new();
1489 if has_system_prompt {
1490 new_messages.push(llm_messages_for_call[0].clone());
1491 }
1492 new_messages.push(summary_msg);
1493 new_messages.extend_from_slice(recent);
1494 llm_messages_for_call = new_messages;
1495
1496 let step_duration = step_start.elapsed().as_millis() as u64;
1497 strategies_used.push("summarization".to_string());
1498 steps.push(CompactionStepData {
1499 strategy: "summarization".to_string(),
1500 messages_after: llm_messages_for_call.len(),
1501 duration_ms: step_duration,
1502 });
1503
1504 tracing::info!(
1505 session_id = %session_id,
1506 duration_ms = step_duration,
1507 messages_after = llm_messages_for_call.len(),
1508 "ReasonAtom: summarization applied"
1509 );
1510 }
1511 Err(e) => {
1512 tracing::warn!(
1513 session_id = %session_id,
1514 error = %e,
1515 "ReasonAtom: summarization failed, continuing"
1516 );
1517 }
1518 }
1519 }
1520 }
1521
1522 if strategies_used.is_empty()
1526 || llm_messages_for_call.len() > messages_before / 2
1527 {
1528 use crate::capabilities::aggressive_trim;
1529 let step_start = Instant::now();
1530 let estimated_total =
1532 crate::capabilities::estimate_total_tokens(&llm_messages_for_call);
1533 let target = estimated_total / 2;
1534 let trimmed =
1535 aggressive_trim(&llm_messages_for_call, target, has_system_prompt);
1536 if trimmed.len() < llm_messages_for_call.len() {
1537 llm_messages_for_call = trimmed;
1538 let step_duration = step_start.elapsed().as_millis() as u64;
1539 strategies_used.push("aggressive_trim".to_string());
1540 steps.push(CompactionStepData {
1541 strategy: "aggressive_trim".to_string(),
1542 messages_after: llm_messages_for_call.len(),
1543 duration_ms: step_duration,
1544 });
1545 tracing::info!(
1546 session_id = %session_id,
1547 messages_after = llm_messages_for_call.len(),
1548 "ReasonAtom: aggressive trim applied (last resort)"
1549 );
1550 }
1551 }
1552
1553 let cascade_duration = cascade_start.elapsed().as_millis() as u64;
1554 let messages_after = llm_messages_for_call.len();
1555
1556 let strategy_used = if strategies_used.is_empty() {
1558 "none".to_string()
1559 } else {
1560 strategies_used.join("+")
1561 };
1562
1563 let _ = self
1564 .event_emitter
1565 .emit(EventRequest::new(
1566 session_id,
1567 streaming_event_context.clone(),
1568 ContextCompactedData {
1569 strategy_used: strategy_used.clone(),
1570 messages_before,
1571 messages_after,
1572 duration_ms: cascade_duration,
1573 steps,
1574 },
1575 ))
1576 .await;
1577
1578 tracing::info!(
1579 session_id = %session_id,
1580 strategy = %strategy_used,
1581 messages_before,
1582 messages_after,
1583 duration_ms = cascade_duration,
1584 "ReasonAtom: compaction cascade completed, retrying LLM call"
1585 );
1586
1587 llm_driver
1588 .chat_completion_stream(llm_messages_for_call.clone(), &llm_config)
1589 .await?
1590 }
1591 Err(e) => return Err(e),
1592 };
1593
1594 let mut text = String::new();
1595 let mut thinking = String::new();
1596 let mut thinking_signature: Option<String> = None;
1597 let mut tool_calls = Vec::new();
1598 let mut completion_metadata: Option<LlmCompletionMetadata> = None;
1599 let mut pending_delta = String::new();
1600 let mut pending_thinking_delta = String::new();
1601 let mut last_delta_emit = Instant::now();
1602 let mut last_thinking_delta_emit = Instant::now();
1603 let mut time_to_first_token_ms: Option<u64> = None;
1604
1605 let stall_timeout = self
1607 .provider_stall_timeout
1608 .unwrap_or(std::time::Duration::from_secs(120));
1609 let mut stall_sleep = Box::pin(tokio::time::sleep(stall_timeout));
1610 let mut keepalive_ticker = tokio::time::interval(std::time::Duration::from_secs(12));
1611 keepalive_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1612 keepalive_ticker.tick().await; let mut last_stream_heartbeat = Instant::now();
1614 let mut last_token_at_unix: u64 = std::time::SystemTime::now()
1619 .duration_since(std::time::UNIX_EPOCH)
1620 .unwrap_or_default()
1621 .as_secs();
1622
1623 loop {
1624 let event = tokio::select! {
1625 biased;
1626 next = stream.next() => match next {
1627 Some(e) => e,
1628 None => break,
1629 },
1630 _ = &mut stall_sleep => {
1631 tracing::warn!(
1632 session_id = %session_id,
1633 turn_id = %context.turn_id,
1634 stall_secs = stall_timeout.as_secs(),
1635 "ReasonAtom: provider stream stall timeout"
1636 );
1637 return Err(AgentLoopError::llm(format!(
1638 "provider stream stall: no tokens for {}s",
1639 stall_timeout.as_secs()
1640 )));
1641 },
1642 _ = keepalive_ticker.tick() => {
1643 if let Some(ref hb) = self.stream_heartbeater {
1644 hb.heartbeat(crate::traits::StreamProgress {
1645 accumulated_len: text.len() + thinking.len(),
1646 last_delta_at: last_token_at_unix,
1647 })
1648 .await;
1649 last_stream_heartbeat = Instant::now();
1650 }
1651 continue;
1652 },
1653 };
1654 stall_sleep
1656 .as_mut()
1657 .reset(tokio::time::Instant::now() + stall_timeout);
1658 match event? {
1659 LlmStreamEvent::TextDelta(delta) => {
1660 if time_to_first_token_ms.is_none() && !delta.is_empty() {
1662 let ttft = llm_start.elapsed().as_millis() as u64;
1663 time_to_first_token_ms = Some(ttft);
1664 tracing::info!(
1665 session_id = %session_id,
1666 time_to_first_token_ms = ttft,
1667 "ReasonAtom: received first token from LLM"
1668 );
1669 }
1670 text.push_str(&delta);
1671 pending_delta.push_str(&delta);
1672 last_token_at_unix = std::time::SystemTime::now()
1673 .duration_since(std::time::UNIX_EPOCH)
1674 .unwrap_or_default()
1675 .as_secs();
1676
1677 if !armed_guardrails.is_empty()
1684 && let Some(t) =
1685 evaluate_guardrails(&mut armed_guardrails, &text, &delta)
1686 {
1687 tracing::warn!(
1688 session_id = %session_id,
1689 turn_id = %context.turn_id,
1690 guardrail_capability_id = %t.capability_id,
1691 guardrail_id = %t.guardrail_id,
1692 reason_code = %t.block.reason_code,
1693 "ReasonAtom: output guardrail tripped, replacing assistant message"
1694 );
1695 pending_delta.clear();
1696 tripped = Some(t);
1697 break;
1698 }
1699
1700 if last_delta_emit.elapsed().as_millis() as u64 >= DELTA_BATCH_INTERVAL_MS
1702 && !pending_delta.is_empty()
1703 {
1704 if let Err(e) = self
1705 .event_emitter
1706 .emit(EventRequest::new(
1707 session_id,
1708 streaming_event_context.clone(),
1709 OutputMessageDeltaData {
1710 turn_id: context.turn_id,
1711 delta: pending_delta.clone(),
1712 accumulated: text.clone(),
1713 },
1714 ))
1715 .await
1716 {
1717 tracing::warn!(
1718 session_id = %session_id,
1719 error = %e,
1720 "ReasonAtom: failed to emit output.message.delta event"
1721 );
1722 }
1723 pending_delta.clear();
1724 last_delta_emit = Instant::now();
1725 }
1726 }
1727 LlmStreamEvent::ThinkingDelta(delta) => {
1728 thinking.push_str(&delta);
1730 pending_thinking_delta.push_str(&delta);
1731 last_token_at_unix = std::time::SystemTime::now()
1732 .duration_since(std::time::UNIX_EPOCH)
1733 .unwrap_or_default()
1734 .as_secs();
1735 tracing::debug!(
1736 session_id = %session_id,
1737 delta_len = delta.len(),
1738 total_thinking_len = thinking.len(),
1739 "ReasonAtom: received ThinkingDelta from LLM"
1740 );
1741
1742 if last_thinking_delta_emit.elapsed().as_millis() as u64
1744 >= DELTA_BATCH_INTERVAL_MS
1745 && !pending_thinking_delta.is_empty()
1746 {
1747 if let Err(e) = self
1748 .event_emitter
1749 .emit(EventRequest::new(
1750 session_id,
1751 streaming_event_context.clone(),
1752 ReasonThinkingDeltaData {
1753 turn_id: context.turn_id,
1754 delta: pending_thinking_delta.clone(),
1755 accumulated: thinking.clone(),
1756 },
1757 ))
1758 .await
1759 {
1760 tracing::warn!(
1761 session_id = %session_id,
1762 error = %e,
1763 "ReasonAtom: failed to emit reason.thinking.delta event"
1764 );
1765 }
1766 pending_thinking_delta.clear();
1767 last_thinking_delta_emit = Instant::now();
1768 }
1769 }
1770 LlmStreamEvent::ThinkingSignature(signature) => {
1771 tracing::debug!(
1773 session_id = %session_id,
1774 signature_len = signature.len(),
1775 "ReasonAtom: received ThinkingSignature from LLM"
1776 );
1777 thinking_signature = Some(signature);
1778 }
1779 LlmStreamEvent::ReasonItem {
1780 provider,
1781 model,
1782 item_id,
1783 encrypted_content,
1784 summary,
1785 token_count,
1786 } => {
1787 if let Some(sig) = encrypted_content.as_ref() {
1793 tracing::debug!(
1794 session_id = %session_id,
1795 signature_len = sig.len(),
1796 provider = %provider,
1797 item_id = %item_id,
1798 "ReasonAtom: captured encrypted reasoning content from ReasonItem"
1799 );
1800 thinking_signature = Some(sig.clone());
1801 }
1802 if let Err(e) = self
1803 .event_emitter
1804 .emit(EventRequest::new(
1805 session_id,
1806 streaming_event_context.clone(),
1807 ReasonItemData {
1808 turn_id: context.turn_id,
1809 provider,
1810 model,
1811 item_id,
1812 encrypted_content,
1813 summary,
1814 token_count,
1815 },
1816 ))
1817 .await
1818 {
1819 tracing::warn!(
1820 session_id = %session_id,
1821 error = %e,
1822 "ReasonAtom: failed to emit reason.item event"
1823 );
1824 }
1825 }
1826 LlmStreamEvent::ToolCalls(calls) => {
1827 tool_calls = calls;
1828 }
1829 LlmStreamEvent::Done(metadata) => {
1830 if !pending_delta.is_empty()
1832 && let Err(e) = self
1833 .event_emitter
1834 .emit(EventRequest::new(
1835 session_id,
1836 streaming_event_context.clone(),
1837 OutputMessageDeltaData {
1838 turn_id: context.turn_id,
1839 delta: pending_delta.clone(),
1840 accumulated: text.clone(),
1841 },
1842 ))
1843 .await
1844 {
1845 tracing::warn!(
1846 session_id = %session_id,
1847 error = %e,
1848 "ReasonAtom: failed to emit final output.message.delta event"
1849 );
1850 }
1851
1852 if !pending_thinking_delta.is_empty()
1854 && let Err(e) = self
1855 .event_emitter
1856 .emit(EventRequest::new(
1857 session_id,
1858 streaming_event_context.clone(),
1859 ReasonThinkingDeltaData {
1860 turn_id: context.turn_id,
1861 delta: pending_thinking_delta.clone(),
1862 accumulated: thinking.clone(),
1863 },
1864 ))
1865 .await
1866 {
1867 tracing::warn!(
1868 session_id = %session_id,
1869 error = %e,
1870 "ReasonAtom: failed to emit final reason.thinking.delta event"
1871 );
1872 }
1873
1874 if !thinking.is_empty()
1876 && let Err(e) = self
1877 .event_emitter
1878 .emit(EventRequest::new(
1879 session_id,
1880 streaming_event_context.clone(),
1881 ReasonThinkingCompletedData {
1882 turn_id: context.turn_id,
1883 thinking: thinking.clone(),
1884 },
1885 ))
1886 .await
1887 {
1888 tracing::warn!(
1889 session_id = %session_id,
1890 error = %e,
1891 "ReasonAtom: failed to emit reason.thinking.completed event"
1892 );
1893 }
1894 completion_metadata = Some(*metadata);
1895 break;
1896 }
1897 LlmStreamEvent::Error(err) => {
1898 let has_partial_output = !tool_calls.is_empty() || !text.is_empty();
1903
1904 if has_partial_output {
1905 tracing::warn!(
1906 session_id = %session_id,
1907 error = %err,
1908 tool_call_count = tool_calls.len(),
1909 text_len = text.len(),
1910 "ReasonAtom: trailing stream error after valid output — treating as partial success"
1911 );
1912 break;
1916 }
1917
1918 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
1920 let event_context = EventContext::from_atom_context(context).with_span(
1921 trace_id.to_string(),
1922 Uuid::now_v7().to_string(),
1923 Some(reason_span_id.to_string()),
1924 );
1925 let tools_summary: Vec<ToolDefinitionSummary> =
1926 runtime_agent.tools.iter().map(|t| t.into()).collect();
1927 let generation_data = LlmGenerationData::failure(
1928 messages_for_event.clone(),
1929 tools_summary,
1930 runtime_agent.model.clone(),
1931 Some(model_with_provider.provider_type.to_string()),
1932 err.clone(),
1933 Some(llm_duration_ms),
1934 time_to_first_token_ms,
1935 );
1936 let _ = self
1937 .event_emitter
1938 .emit(EventRequest::new(
1939 session_id,
1940 event_context,
1941 generation_data,
1942 ))
1943 .await;
1944 return Err(AgentLoopError::llm(err));
1945 }
1946 }
1947 if last_stream_heartbeat.elapsed().as_millis() as u64 >= 5_000
1950 && let Some(ref hb) = self.stream_heartbeater
1951 {
1952 hb.heartbeat(crate::traits::StreamProgress {
1953 accumulated_len: text.len() + thinking.len(),
1954 last_delta_at: last_token_at_unix,
1955 })
1956 .await;
1957 last_stream_heartbeat = Instant::now();
1958 }
1959 }
1960 (
1961 text,
1962 thinking,
1963 thinking_signature,
1964 tool_calls,
1965 completion_metadata,
1966 time_to_first_token_ms,
1967 )
1968 };
1969 let (mut text, mut thinking, thinking_signature, mut tool_calls) =
1970 (text, thinking, thinking_signature, tool_calls);
1971
1972 if let Some(ref t) = tripped {
1978 let replaced_event_context = EventContext::from_atom_context(context).with_span(
1979 trace_id.to_string(),
1980 Uuid::now_v7().to_string(),
1981 Some(reason_span_id.to_string()),
1982 );
1983 if let Err(e) = self
1984 .event_emitter
1985 .emit(EventRequest::new(
1986 session_id,
1987 replaced_event_context,
1988 OutputMessageReplacedData {
1989 turn_id: context.turn_id,
1990 guardrail_capability_id: t.capability_id.clone(),
1991 guardrail_id: t.guardrail_id.clone(),
1992 reason_code: t.block.reason_code.clone(),
1993 replacement: t.block.replacement.clone(),
1994 },
1995 ))
1996 .await
1997 {
1998 tracing::warn!(
1999 session_id = %session_id,
2000 error = %e,
2001 "ReasonAtom: failed to emit output.message.replaced event"
2002 );
2003 }
2004 text = t.block.replacement.clone();
2005 tool_calls.clear();
2006 thinking.clear();
2007 }
2008
2009 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
2010
2011 let response_id = completion_metadata
2013 .as_ref()
2014 .and_then(|meta| meta.response_id.clone());
2015
2016 let usage = completion_metadata.as_ref().and_then(|meta| {
2024 match (meta.prompt_tokens, meta.completion_tokens) {
2025 (Some(input), Some(output)) => {
2026 let actual_cost_usd = meta.provider_cost_usd;
2027 let estimated_cost_usd = crate::llm_model_profiles::estimate_cost_usd(
2028 &model_with_provider.provider_type,
2029 &runtime_agent.model,
2030 input,
2031 output,
2032 );
2033 Some(
2034 TokenUsage::with_cache(
2035 input,
2036 output,
2037 meta.cache_read_tokens,
2038 meta.cache_creation_tokens,
2039 )
2040 .with_cost(actual_cost_usd, estimated_cost_usd),
2041 )
2042 }
2043 _ => None,
2044 }
2045 });
2046
2047 let event_context = EventContext::from_atom_context(context).with_span(
2049 trace_id.to_string(),
2050 Uuid::now_v7().to_string(),
2051 Some(reason_span_id.to_string()),
2052 );
2053 let tools_summary: Vec<ToolDefinitionSummary> =
2054 runtime_agent.tools.iter().map(|t| t.into()).collect();
2055 let finish_reasons = if !tool_calls.is_empty() {
2057 Some(vec!["tool_calls".to_string()])
2058 } else {
2059 Some(vec!["stop".to_string()])
2060 };
2061 let retry_info = completion_metadata
2063 .as_ref()
2064 .and_then(|meta| meta.retry_metadata.as_ref())
2065 .filter(|rm| rm.had_retries())
2066 .map(|rm| LlmRetryInfo {
2067 attempts: rm.attempts,
2068 total_wait_ms: rm.total_retry_wait.as_millis() as u64,
2069 });
2070 let mut generation_data = LlmGenerationData::success_with_retry(
2072 messages_for_event.clone(),
2073 tools_summary,
2074 Some(text.clone()).filter(|s| !s.is_empty()),
2075 tool_calls.clone(),
2076 runtime_agent.model.clone(),
2077 Some(model_with_provider.provider_type.to_string()),
2078 usage.clone(),
2079 Some(llm_duration_ms),
2080 time_to_first_token_ms,
2081 finish_reasons,
2082 response_id.clone(),
2083 retry_info,
2084 );
2085
2086 if let Some(info) = compaction_info {
2088 generation_data = generation_data.with_compaction(info);
2089 }
2090
2091 if let Some(request_options) =
2092 build_request_options(&llm_config, &model_with_provider.provider_type.to_string())
2093 {
2094 generation_data = generation_data.with_request_options(request_options);
2095 }
2096
2097 if let Err(e) = self
2098 .event_emitter
2099 .emit(EventRequest::new(
2100 session_id,
2101 event_context,
2102 generation_data,
2103 ))
2104 .await
2105 {
2106 tracing::warn!(
2107 session_id = %session_id,
2108 error = %e,
2109 "ReasonAtom: failed to emit llm.generation event"
2110 );
2111 }
2112
2113 let mut metadata = std::collections::HashMap::new();
2115 metadata.insert(
2116 "model".to_string(),
2117 serde_json::Value::String(runtime_agent.model.clone()),
2118 );
2119 if let Some(ref effort) = reasoning_effort {
2120 metadata.insert(
2121 "reasoning_effort".to_string(),
2122 serde_json::Value::String(effort.clone()),
2123 );
2124 }
2125
2126 let has_tool_calls = !tool_calls.is_empty();
2128 let mut assistant_message = if has_tool_calls {
2129 Message::assistant_with_tools(&text, tool_calls.clone())
2130 } else {
2131 Message::assistant(&text)
2132 };
2133 assistant_message.phase = completion_metadata
2137 .as_ref()
2138 .and_then(|meta| meta.phase.as_deref())
2139 .and_then(crate::message::ExecutionPhase::from_provider_str)
2140 .or_else(|| {
2141 Some(crate::message::ExecutionPhase::from_has_tool_calls(
2142 has_tool_calls,
2143 ))
2144 });
2145 assistant_message.metadata = Some(metadata);
2146 if !thinking.is_empty() {
2149 assistant_message.thinking = Some(thinking.clone());
2150 assistant_message.thinking_signature = thinking_signature.clone();
2151 }
2152 let output_message_id = assistant_message.id;
2153
2154 let message_event_context = EventContext::from_atom_context(context).with_span(
2157 trace_id.to_string(),
2158 Uuid::now_v7().to_string(),
2159 Some(reason_span_id.to_string()),
2160 );
2161 let mut output_message_data = OutputMessageCompletedData::new(assistant_message);
2162 if let Some(ref u) = usage {
2163 output_message_data = output_message_data.with_usage(u.clone());
2164 }
2165 self.event_emitter
2166 .emit(EventRequest::new(
2167 session_id,
2168 message_event_context,
2169 output_message_data,
2170 ))
2171 .await?;
2172
2173 tracing::info!(
2174 session_id = %session_id,
2175 turn_id = %context.turn_id,
2176 has_tool_calls = %has_tool_calls,
2177 tool_count = %tool_calls.len(),
2178 "ReasonAtom: LLM call completed"
2179 );
2180
2181 Ok(ReasonResult {
2182 success: true,
2183 text,
2184 tool_calls,
2185 has_tool_calls,
2186 tool_definitions: runtime_agent.tools.clone(),
2187 max_iterations: runtime_agent.max_iterations,
2188 error: None,
2189 usage,
2190 output_message_id: Some(output_message_id),
2191 time_to_first_token_ms,
2192 response_id,
2193 locale: resolved_locale,
2194 network_access: runtime_agent.network_access.clone(),
2195 })
2196 }
2197
2198 fn create_llm_driver(
2201 &self,
2202 model: &ModelWithProvider,
2203 ) -> Result<crate::llm_driver_registry::BoxedLlmDriver> {
2204 let provider_type = match model.provider_type {
2205 crate::llm_models::LlmProviderType::Openai => ProviderType::OpenAI,
2206 crate::llm_models::LlmProviderType::Openrouter => ProviderType::OpenRouter,
2207 crate::llm_models::LlmProviderType::AzureOpenai => ProviderType::AzureOpenAI,
2208 crate::llm_models::LlmProviderType::OpenaiCompletions => {
2209 ProviderType::OpenAICompletions
2210 }
2211 crate::llm_models::LlmProviderType::Anthropic => ProviderType::Anthropic,
2212 crate::llm_models::LlmProviderType::Gemini => ProviderType::Gemini,
2213 crate::llm_models::LlmProviderType::LlmSim => ProviderType::LlmSim,
2214 crate::llm_models::LlmProviderType::Bedrock => ProviderType::Bedrock,
2215 };
2216
2217 let mut config = ProviderConfig::new(provider_type);
2218 if let Some(ref api_key) = model.api_key {
2219 config = config.with_api_key(api_key);
2220 }
2221 if let Some(ref base_url) = model.base_url {
2222 config = config.with_base_url(base_url);
2223 }
2224
2225 self.driver_registry.create_driver(&config)
2226 }
2227
2228 async fn resolve_images(&self, messages: &[Message]) -> HashMap<Uuid, ResolvedImage> {
2239 let mut resolved = HashMap::new();
2240
2241 let resolver = match &self.image_resolver {
2243 Some(r) => r,
2244 None => return resolved,
2245 };
2246
2247 let image_ids: Vec<Uuid> = messages
2249 .iter()
2250 .flat_map(LlmMessage::extract_image_file_ids)
2251 .collect::<std::collections::HashSet<_>>()
2252 .into_iter()
2253 .collect();
2254
2255 if image_ids.is_empty() {
2256 return resolved;
2257 }
2258
2259 tracing::debug!(
2260 image_count = image_ids.len(),
2261 "ReasonAtom: resolving image_file references"
2262 );
2263
2264 for image_id in image_ids {
2266 match resolver.resolve_image(image_id).await {
2267 Ok(Some(image)) => {
2268 resolved.insert(image_id, image);
2269 }
2270 Ok(None) => {
2271 tracing::warn!(
2272 image_id = %image_id,
2273 "ReasonAtom: image not found during resolution"
2274 );
2275 }
2276 Err(e) => {
2277 tracing::warn!(
2278 image_id = %image_id,
2279 error = %e,
2280 "ReasonAtom: failed to resolve image"
2281 );
2282 }
2283 }
2284 }
2285
2286 tracing::debug!(
2287 resolved_count = resolved.len(),
2288 "ReasonAtom: image resolution complete"
2289 );
2290
2291 resolved
2292 }
2293}
2294
2295#[cfg(test)]
2300mod tests {
2301 use super::*;
2302 use crate::llm_driver_registry::{LlmCallConfig, PromptCacheConfig, PromptCacheStrategy};
2303 use std::collections::HashMap;
2304
2305 #[test]
2306 fn test_reason_result_default() {
2307 let result = ReasonResult::default();
2308 assert!(!result.success);
2309 assert!(result.text.is_empty());
2310 assert!(result.tool_calls.is_empty());
2311 assert!(!result.has_tool_calls);
2312 assert_eq!(result.max_iterations, 0);
2314 }
2315
2316 #[test]
2317 fn test_reason_result_serde_default() {
2318 let json = r#"{"success":true,"text":"","has_tool_calls":false}"#;
2320 let result: ReasonResult = serde_json::from_str(json).unwrap();
2321 assert_eq!(result.max_iterations, 500);
2322 }
2323
2324 #[test]
2325 fn test_capability_usage_snapshot_keeps_resolved_and_exposed_separate() {
2326 let registry = CapabilityRegistry::with_builtins();
2327 let tool = ToolDefinition::Builtin(crate::tool_types::BuiltinTool {
2328 name: "demo_tool".to_string(),
2329 display_name: None,
2330 description: "demo".to_string(),
2331 parameters: json!({"type": "object"}),
2332 policy: crate::tool_types::ToolPolicy::Auto,
2333 category: None,
2334 deferrable: crate::tool_types::DeferrablePolicy::default(),
2335 hints: crate::tool_types::ToolHints::default(),
2336 full_parameters: None,
2337 })
2338 .with_capability_attribution("cap:demo", Some("Demo Capability"));
2339
2340 let records = capability_usage_snapshot_records(
2341 ®istry,
2342 &[crate::AgentCapabilityConfig::new("current_time")],
2343 &[tool],
2344 );
2345
2346 assert!(records.iter().any(|record| {
2347 matches!(record.usage_kind, CapabilityUsageKind::Resolved)
2348 && record.capability_id == "current_time"
2349 && record.tool_name.is_none()
2350 }));
2351 assert!(records.iter().any(|record| {
2352 matches!(record.usage_kind, CapabilityUsageKind::Exposed)
2353 && record.capability_id == "cap:demo"
2354 && record.tool_name.as_deref() == Some("demo_tool")
2355 }));
2356 }
2357
2358 #[test]
2359 fn test_patch_dangling_tool_calls_no_tool_calls() {
2360 let messages = vec![Message::user("Hello"), Message::assistant("Hi there!")];
2361 let patched = patch_dangling_tool_calls(&messages);
2362 assert_eq!(patched.len(), 2);
2363 }
2364
2365 #[test]
2366 fn test_patch_dangling_tool_calls_with_result() {
2367 let tool_call = ToolCall {
2368 id: "call_123".to_string(),
2369 name: "get_weather".to_string(),
2370 arguments: serde_json::json!({"city": "NYC"}),
2371 };
2372
2373 let messages = vec![
2374 Message::user("What's the weather?"),
2375 Message::assistant_with_tools("Let me check", vec![tool_call]),
2376 Message::tool_result("call_123", Some(serde_json::json!({"temp": 72})), None),
2377 ];
2378
2379 let patched = patch_dangling_tool_calls(&messages);
2380 assert_eq!(patched.len(), 3);
2381 }
2382
2383 #[test]
2384 fn test_patch_dangling_tool_calls_missing_result() {
2385 let tool_call = ToolCall {
2386 id: "call_456".to_string(),
2387 name: "search_web".to_string(),
2388 arguments: serde_json::json!({"query": "rust"}),
2389 };
2390
2391 let messages = vec![
2392 Message::user("Search for rust"),
2393 Message::assistant_with_tools("Searching...", vec![tool_call]),
2394 Message::user("Actually, never mind"),
2395 ];
2396
2397 let patched = patch_dangling_tool_calls(&messages);
2398 assert_eq!(patched.len(), 4);
2400 assert_eq!(patched[2].role, MessageRole::ToolResult);
2401 assert_eq!(patched[2].tool_call_id(), Some("call_456"));
2402 }
2403
2404 #[test]
2405 fn test_build_request_options_for_openai_prompt_cache() {
2406 let config = LlmCallConfig {
2407 model: "gpt-5.4".to_string(),
2408 temperature: None,
2409 max_tokens: None,
2410 tools: vec![],
2411 reasoning_effort: None,
2412 metadata: HashMap::new(),
2413 previous_response_id: Some("resp_123".to_string()),
2414 tool_search: None,
2415 prompt_cache: Some(PromptCacheConfig {
2416 enabled: true,
2417 strategy: PromptCacheStrategy::Auto,
2418 gemini_cached_content: None,
2419 }),
2420 };
2421
2422 let request_options = build_request_options(&config, "openai").unwrap();
2423 assert_eq!(
2424 request_options
2425 .prompt_cache
2426 .and_then(|info| info.provider_mode),
2427 Some("prompt_cache_key".to_string())
2428 );
2429 assert_eq!(
2430 request_options.provider_options.get("openai"),
2431 Some(&json!({ "previous_response_id": true }))
2432 );
2433 }
2434
2435 #[test]
2436 fn test_build_request_options_for_gemini_explicit_cache() {
2437 let config = LlmCallConfig {
2438 model: "gemini-2.5-pro".to_string(),
2439 temperature: None,
2440 max_tokens: None,
2441 tools: vec![],
2442 reasoning_effort: None,
2443 metadata: HashMap::new(),
2444 previous_response_id: None,
2445 tool_search: None,
2446 prompt_cache: Some(PromptCacheConfig {
2447 enabled: true,
2448 strategy: PromptCacheStrategy::Auto,
2449 gemini_cached_content: Some("cachedContents/demo-cache".to_string()),
2450 }),
2451 };
2452
2453 let request_options = build_request_options(&config, "gemini").unwrap();
2454 assert_eq!(
2455 request_options
2456 .prompt_cache
2457 .and_then(|info| info.provider_mode),
2458 Some("cached_content".to_string())
2459 );
2460 assert_eq!(
2461 request_options.provider_options.get("gemini"),
2462 Some(&json!({ "cached_content": true }))
2463 );
2464 }
2465
2466 #[test]
2467 fn test_build_request_options_omits_gemini_cache_flag_when_disabled() {
2468 let config = LlmCallConfig {
2469 model: "gemini-2.5-pro".to_string(),
2470 temperature: None,
2471 max_tokens: None,
2472 tools: vec![],
2473 reasoning_effort: None,
2474 metadata: HashMap::new(),
2475 previous_response_id: None,
2476 tool_search: None,
2477 prompt_cache: Some(PromptCacheConfig {
2478 enabled: false,
2479 strategy: PromptCacheStrategy::Auto,
2480 gemini_cached_content: Some("cachedContents/demo-cache".to_string()),
2481 }),
2482 };
2483
2484 assert!(build_request_options(&config, "gemini").is_none());
2485 }
2486}