1mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10#[cfg(test)]
11mod hooks_behavior_tests;
12mod runner;
13pub mod skills;
14mod state;
15#[cfg(test)]
16#[doc(hidden)]
17pub(crate) mod test_turn_state_handle;
18use crate::budget::Budget;
19use crate::comms::{
20 CommsCommand, CommsTrustMutation, CommsTrustMutationResult, EventStream, PeerDirectoryEntry,
21 PeerId, SendAndStreamError, SendError, SendReceipt, StreamError, StreamScope,
22 TrustedPeerDescriptor,
23};
24use crate::compact::SessionCompactionCadence;
25use crate::completion_feed::CompletionSeq;
26use crate::config::{AgentConfig, HookRunOverrides};
27use crate::error::AgentError;
28use crate::event::ExternalToolDelta;
29use crate::hooks::HookEngine;
30use crate::lifecycle::RunId;
31use crate::lifecycle::run_primitive::ProviderParamsOverride;
32use crate::ops::OperationId;
33use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
34use crate::retry::RetryPolicy;
35use crate::schema::{CompiledSchema, SchemaError};
36use crate::session::Session;
37use crate::state::LoopState;
38#[cfg(target_arch = "wasm32")]
39use crate::tokio;
40use crate::tool_catalog::{
41 ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
42 select_catalog_mode_from_snapshot,
43};
44use crate::tool_scope::ToolScope;
45use crate::turn_execution_authority::{
46 ContentShape, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
47};
48use crate::types::{
49 AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
50 ToolDef, ToolName, ToolNameSet, Usage,
51};
52use async_trait::async_trait;
53use serde::{Deserialize, Serialize};
54use std::collections::{BTreeMap, BTreeSet};
55use std::sync::Arc;
56
57pub use builder::{AgentBuildPolicyError, AgentBuilder, DefaultSystemPromptPolicy};
58pub use runner::{AgentRunner, SnapshotProjectionError, SystemContextStateError};
59
60#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
62#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
63pub trait AgentLlmClient: Send + Sync {
64 async fn stream_response(
66 &self,
67 messages: &[Message],
68 tools: &[Arc<ToolDef>],
69 max_tokens: u32,
70 temperature: Option<f32>,
71 provider_params: Option<&ProviderParamsOverride>,
72 ) -> Result<LlmStreamResult, AgentError>;
73
74 fn provider(&self) -> crate::provider::Provider;
81
82 fn model(&self) -> &str;
88
89 fn prepare_model_fallback(&self, _failure: &AgentError) -> Option<AgentLlmFallbackSwitch> {
96 None
97 }
98
99 fn commit_model_fallback(&self, _identity: &crate::SessionLlmIdentity) {}
102
103 fn active_capability_base_filter(&self) -> crate::ToolFilter {
109 crate::ToolFilter::All
110 }
111
112 fn active_max_output_tokens(&self) -> Option<u32> {
116 None
117 }
118
119 fn begin_stream_output_observation(&self) {}
126
127 fn stream_output_observed(&self) -> bool {
134 false
135 }
136
137 fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
143 Ok(CompiledSchema {
145 schema: output_schema.schema.as_value().clone(),
146 warnings: Vec::new(),
147 })
148 }
149}
150
151pub type AgentLlmClientDecorator =
157 Arc<dyn Fn(Arc<dyn AgentLlmClient>) -> Arc<dyn AgentLlmClient> + Send + Sync + 'static>;
158
159#[derive(Debug, Clone)]
161pub struct AgentLlmFallbackSkippedTarget {
162 pub identity: crate::SessionLlmIdentity,
163 pub reason: String,
164}
165
166#[derive(Debug, Clone)]
172pub struct AgentLlmFallbackSwitch {
173 pub previous_identity: crate::SessionLlmIdentity,
174 pub new_identity: crate::SessionLlmIdentity,
175 pub request_policy: crate::SessionLlmRequestPolicy,
176 pub capability_base_filter: crate::ToolFilter,
177 pub context_window: Option<u32>,
178 pub max_output_tokens: Option<u32>,
179 pub skipped_targets: Vec<AgentLlmFallbackSkippedTarget>,
180}
181
182pub struct LlmStreamResult {
184 blocks: Vec<AssistantBlock>,
185 stop_reason: StopReason,
186 usage: Usage,
187}
188
189impl LlmStreamResult {
190 pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
191 Self {
192 blocks,
193 stop_reason,
194 usage,
195 }
196 }
197
198 pub fn blocks(&self) -> &[AssistantBlock] {
199 &self.blocks
200 }
201 pub fn stop_reason(&self) -> StopReason {
202 self.stop_reason
203 }
204 pub fn usage(&self) -> &Usage {
205 &self.usage
206 }
207
208 pub fn into_message(self) -> BlockAssistantMessage {
209 BlockAssistantMessage::new(self.blocks, self.stop_reason)
210 }
211
212 pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
213 (self.blocks, self.stop_reason, self.usage)
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct AgentExecutionSnapshot {
224 pub loop_state: LoopState,
225 pub turn_phase: TurnPhase,
226 pub turn_terminal: bool,
232 pub active_run_id: Option<RunId>,
233 pub primitive_kind: TurnPrimitiveKind,
234 pub admitted_content_shape: Option<ContentShape>,
235 pub vision_enabled: bool,
236 pub image_tool_results_enabled: bool,
237 pub tool_calls_pending: u32,
238 pub pending_operation_ids: Option<Vec<OperationId>>,
239 pub barrier_operation_ids: Vec<OperationId>,
240 pub has_barrier_ops: bool,
241 pub barrier_satisfied: bool,
242 pub boundary_count: u32,
243 pub cancel_after_boundary: bool,
244 pub terminal_outcome: TurnTerminalOutcome,
245 pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
246 pub extraction_attempts: u32,
247 pub max_extraction_retries: u32,
248 pub applied_cursor: CompletionSeq,
249}
250
251#[derive(Debug, Clone, Default)]
255pub struct ExternalToolUpdate {
256 pub notices: Vec<ExternalToolDelta>,
258 pub pending: Vec<String>,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub struct CancelAfterBoundaryCommand;
272
273pub type CancelAfterBoundarySender = tokio::sync::mpsc::UnboundedSender<CancelAfterBoundaryCommand>;
280
281#[derive(Debug, Clone, Default, PartialEq, Eq)]
288pub struct ToolDispatchContext {
289 current_turn: Option<CurrentTurnContent>,
290 turn_metadata: BTreeMap<String, serde_json::Value>,
291}
292
293impl ToolDispatchContext {
294 pub fn from_current_turn_input(input: &crate::types::ContentInput) -> Self {
295 let blocks = match input {
296 crate::types::ContentInput::Text(_) => None,
297 crate::types::ContentInput::Blocks(blocks) => Some(blocks.clone()),
298 };
299 Self {
300 current_turn: blocks.map(CurrentTurnContent::new),
301 turn_metadata: BTreeMap::new(),
302 }
303 }
304
305 pub fn from_run_input(input: &crate::types::RunInput) -> Self {
309 match input {
310 crate::types::RunInput::Content { content } => Self::from_current_turn_input(content),
311 crate::types::RunInput::PendingToolResults => Self::default(),
312 }
313 }
314
315 #[must_use]
316 pub fn with_turn_metadata(mut self, metadata: BTreeMap<String, serde_json::Value>) -> Self {
317 self.turn_metadata = metadata;
318 self
319 }
320
321 pub fn turn_metadata(&self, key: &str) -> Option<&serde_json::Value> {
322 self.turn_metadata.get(key)
323 }
324
325 pub fn current_turn(&self) -> Option<&CurrentTurnContent> {
326 self.current_turn.as_ref()
327 }
328
329 pub fn current_turn_image(
330 &self,
331 image_ref: CurrentTurnImageRef,
332 ) -> Option<&crate::types::ContentBlock> {
333 self.current_turn
334 .as_ref()
335 .and_then(|current_turn| current_turn.image(image_ref))
336 }
337}
338
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
354#[serde(transparent)]
355pub struct CurrentTurnImageRef(usize);
356
357impl std::fmt::Display for CurrentTurnImageRef {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 std::fmt::Display::fmt(&self.0, f)
360 }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct CurrentTurnContent {
366 blocks: Vec<crate::types::ContentBlock>,
367}
368
369impl CurrentTurnContent {
370 pub fn new(blocks: Vec<crate::types::ContentBlock>) -> Self {
371 Self { blocks }
372 }
373
374 pub fn blocks(&self) -> &[crate::types::ContentBlock] {
375 &self.blocks
376 }
377
378 pub fn image_ref(&self, n: usize) -> Option<CurrentTurnImageRef> {
382 self.images().nth(n).map(|_| CurrentTurnImageRef(n))
383 }
384
385 pub fn image(&self, image_ref: CurrentTurnImageRef) -> Option<&crate::types::ContentBlock> {
386 self.images().nth(image_ref.0)
387 }
388
389 fn images(&self) -> impl Iterator<Item = &crate::types::ContentBlock> {
390 self.blocks
391 .iter()
392 .filter(|block| matches!(block, crate::types::ContentBlock::Image { .. }))
393 }
394}
395
396#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct DetachedOpCompletion {
404 pub job_id: String,
406 pub kind: OperationKind,
408 pub status: OperationStatus,
410 pub terminal_outcome: Option<OperationTerminalOutcome>,
412 pub display_name: String,
414 pub detail: String,
416 pub elapsed_ms: Option<u64>,
418}
419
420#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
425pub struct DispatcherCapabilities {
426 pub ops_lifecycle: bool,
428}
429
430pub enum BindOutcome {
441 Bound(Arc<dyn AgentToolDispatcher>),
443 Skipped(Arc<dyn AgentToolDispatcher>),
446}
447
448impl BindOutcome {
449 pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
451 match self {
452 Self::Bound(d) | Self::Skipped(d) => d,
453 }
454 }
455
456 pub fn was_bound(&self) -> bool {
458 matches!(self, Self::Bound(_))
459 }
460}
461
462#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
464#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
465pub trait AgentToolDispatcher: Send + Sync {
466 fn tools(&self) -> Arc<[Arc<ToolDef>]>;
468
469 fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
475 ToolCatalogCapabilities::default()
476 }
477
478 fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
484 self.tools()
485 .iter()
486 .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
487 .collect::<Vec<_>>()
488 .into()
489 }
490
491 fn pending_catalog_sources(&self) -> Arc<[String]> {
496 Arc::from([])
497 }
498
499 async fn dispatch(
505 &self,
506 call: ToolCallView<'_>,
507 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
508
509 async fn dispatch_with_context(
515 &self,
516 call: ToolCallView<'_>,
517 _context: &ToolDispatchContext,
518 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
519 self.dispatch(call).await
520 }
521
522 async fn poll_external_updates(&self) -> ExternalToolUpdate {
528 ExternalToolUpdate::default()
529 }
530
531 fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
537 None
538 }
539
540 fn capabilities(&self) -> DispatcherCapabilities {
542 DispatcherCapabilities::default()
543 }
544
545 fn bind_ops_lifecycle(
553 self: Arc<Self>,
554 _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
555 _owner_bridge_session_id: crate::types::SessionId,
556 ) -> Result<BindOutcome, OpsLifecycleBindError> {
557 Err(OpsLifecycleBindError::Unsupported)
558 }
559
560 fn completion_enrichment(
565 &self,
566 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
567 None
568 }
569
570 fn bind_mcp_server_lifecycle_handle(
577 &self,
578 _handle: Arc<dyn crate::handles::McpServerLifecycleHandle>,
579 ) {
580 }
581
582 fn bind_external_tool_surface_handle(
589 &self,
590 _handle: Arc<dyn crate::handles::ExternalToolSurfaceHandle>,
591 ) {
592 }
593}
594
595pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
597where
598 T: AgentToolDispatcher + ?Sized,
599{
600 let capabilities = dispatcher.tool_catalog_capabilities();
601 if !capabilities.exact_catalog {
602 return ToolCatalogMode::Inline;
603 }
604 let pending_sources = dispatcher.pending_catalog_sources();
605 let catalog = dispatcher.tool_catalog();
606 select_catalog_mode_from_snapshot(
607 capabilities.exact_catalog,
608 catalog.as_ref(),
609 pending_sources.as_ref(),
610 )
611}
612
613pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
616where
617 T: AgentToolDispatcher + ?Sized,
618{
619 let capabilities = dispatcher.tool_catalog_capabilities();
620 if !capabilities.exact_catalog {
621 return false;
622 }
623 if capabilities.may_require_catalog_control_plane {
624 return true;
625 }
626
627 let pending_sources = dispatcher.pending_catalog_sources();
628 if !pending_sources.is_empty() {
629 return true;
630 }
631
632 let catalog = dispatcher.tool_catalog();
633 deferred_session_entry_count(catalog.as_ref()) > 0
634}
635
636#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
638pub enum OpsLifecycleBindError {
639 #[error("ops lifecycle binding is unsupported")]
640 Unsupported,
641 #[error("dispatcher has shared ownership and cannot be rebound")]
642 SharedOwnership,
643}
644
645pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
652 inner: Arc<T>,
653 allowed_tools: ToolNameSet,
654 filtered_tools: Arc<[Arc<ToolDef>]>,
656}
657
658impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
659 pub fn new<I, N>(inner: Arc<T>, allowed_tools: I) -> Self
660 where
661 I: IntoIterator<Item = N>,
662 N: Into<ToolName>,
663 {
664 let allowed_set: ToolNameSet = allowed_tools
665 .into_iter()
666 .map(Into::into)
667 .collect::<ToolNameSet>();
668
669 let filtered: Vec<Arc<ToolDef>> = if inner.tool_catalog_capabilities().exact_catalog {
670 inner
671 .tool_catalog()
672 .iter()
673 .filter(|entry| entry.currently_callable())
674 .map(|entry| Arc::clone(&entry.tool))
675 .filter(|t| allowed_set.contains(t.name.as_str()))
676 .collect()
677 } else {
678 inner
679 .tools()
680 .iter()
681 .filter(|t| allowed_set.contains(t.name.as_str()))
682 .map(Arc::clone)
683 .collect()
684 };
685
686 Self {
687 inner,
688 allowed_tools: allowed_set,
689 filtered_tools: filtered.into(),
690 }
691 }
692}
693
694#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
695#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
696impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
697 fn tools(&self) -> Arc<[Arc<ToolDef>]> {
698 if self.inner.tool_catalog_capabilities().exact_catalog {
699 return self
700 .inner
701 .tool_catalog()
702 .iter()
703 .filter(|entry| entry.currently_callable())
704 .map(|entry| Arc::clone(&entry.tool))
705 .filter(|tool| self.allowed_tools.contains(tool.name.as_str()))
706 .collect::<Vec<_>>()
707 .into();
708 }
709 Arc::clone(&self.filtered_tools)
710 }
711
712 async fn dispatch(
713 &self,
714 call: ToolCallView<'_>,
715 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
716 self.dispatch_with_context(call, &ToolDispatchContext::default())
717 .await
718 }
719
720 async fn dispatch_with_context(
721 &self,
722 call: ToolCallView<'_>,
723 context: &ToolDispatchContext,
724 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
725 if !self.allowed_tools.contains(call.name) {
726 let inner_knows_tool = if self.inner.tool_catalog_capabilities().exact_catalog {
727 self.inner
728 .tool_catalog()
729 .iter()
730 .any(|entry| entry.tool.name == call.name)
731 } else {
732 self.inner.tools().iter().any(|tool| tool.name == call.name)
733 };
734 if !inner_knows_tool {
735 return Err(crate::error::ToolError::not_found(call.name));
736 }
737 return Err(crate::error::ToolError::access_denied(call.name));
738 }
739 self.inner.dispatch_with_context(call, context).await
740 }
741
742 fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
743 self.inner.tool_catalog_capabilities()
744 }
745
746 fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
747 if !self.inner.tool_catalog_capabilities().exact_catalog {
748 return self
749 .tools()
750 .iter()
751 .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
752 .collect::<Vec<_>>()
753 .into();
754 }
755 self.inner
756 .tool_catalog()
757 .iter()
758 .filter(|entry| self.allowed_tools.contains(entry.tool.name.as_str()))
759 .cloned()
760 .collect::<Vec<_>>()
761 .into()
762 }
763
764 fn pending_catalog_sources(&self) -> Arc<[String]> {
765 self.inner.pending_catalog_sources()
766 }
767
768 async fn poll_external_updates(&self) -> ExternalToolUpdate {
769 self.inner.poll_external_updates().await
770 }
771
772 fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
773 self.inner.external_tool_surface_snapshot()
774 }
775
776 fn capabilities(&self) -> DispatcherCapabilities {
777 self.inner.capabilities()
778 }
779
780 fn bind_ops_lifecycle(
781 self: Arc<Self>,
782 registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
783 owner_bridge_session_id: crate::types::SessionId,
784 ) -> Result<BindOutcome, OpsLifecycleBindError> {
785 let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
786 if Arc::strong_count(&owned.inner) == 1 {
787 let outcome = owned
788 .inner
789 .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
790 let bound = outcome.was_bound();
791 let d = outcome.into_dispatcher();
792 let allowed_tools = owned.allowed_tools.into_iter().collect::<Vec<_>>();
793 Ok(if bound {
794 BindOutcome::Bound(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
795 } else {
796 BindOutcome::Skipped(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
797 })
798 } else {
799 Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
800 inner: owned.inner,
801 allowed_tools: owned.allowed_tools,
802 filtered_tools: owned.filtered_tools,
803 })))
804 }
805 }
806
807 fn completion_enrichment(
808 &self,
809 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
810 self.inner.completion_enrichment()
811 }
812}
813
814#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
816#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
817pub trait AgentSessionStore: Send + Sync {
818 async fn save(&self, session: &Session) -> Result<(), AgentError>;
819 async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
820}
821
822#[derive(Debug, Clone, Copy, PartialEq, Eq)]
824pub enum InlinePeerNotificationPolicy {
825 Always,
827 Never,
829 AtMost(usize),
831}
832
833pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
835
836impl InlinePeerNotificationPolicy {
837 pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
839 match raw {
840 None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
841 Some(-1) => Ok(Self::Always),
842 Some(0) => Ok(Self::Never),
843 Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
844 Some(v) => Err(v),
845 }
846 }
847}
848
849#[derive(Debug, thiserror::Error)]
851pub enum CommsCapabilityError {
852 #[error("comms capability not supported: {0}")]
854 Unsupported(String),
855}
856
857#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
859#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
860pub trait CommsRuntime: Send + Sync {
861 fn peer_id(&self) -> Option<PeerId> {
869 self.public_key()
870 .as_deref()
871 .and_then(|public_key| PeerId::parse(public_key).ok())
872 }
873
874 fn public_key(&self) -> Option<String> {
880 None
881 }
882
883 fn public_key_bytes(&self) -> Option<[u8; 32]> {
889 None
890 }
891
892 fn comms_name(&self) -> Option<String> {
898 None
899 }
900
901 fn advertised_address(&self) -> Option<String> {
907 None
908 }
909
910 fn bridge_bootstrap_token(&self) -> Option<String> {
913 None
914 }
915
916 async fn apply_trust_mutation(
921 &self,
922 _mutation: CommsTrustMutation,
923 ) -> Result<CommsTrustMutationResult, SendError> {
924 Err(SendError::Unsupported(
925 "apply_trust_mutation not supported for this CommsRuntime".to_string(),
926 ))
927 }
928
929 async fn install_generated_mob_trust_owner(
936 &self,
937 _owner: Arc<dyn std::any::Any + Send + Sync>,
938 ) -> Result<(), SendError> {
939 Err(SendError::Unsupported(
940 "generated mob trust owner binding not supported for this CommsRuntime".to_string(),
941 ))
942 }
943
944 async fn validate_recovered_generated_mob_trust_owner(
952 &self,
953 _owner: Arc<dyn std::any::Any + Send + Sync>,
954 ) -> Result<(), SendError> {
955 Err(SendError::Unsupported(
956 "recovered generated mob trust owner validation not supported for this CommsRuntime"
957 .to_string(),
958 ))
959 }
960
961 async fn install_recovered_generated_mob_trust_owner(
970 &self,
971 _owner: Arc<dyn std::any::Any + Send + Sync>,
972 ) -> Result<(), SendError> {
973 Err(SendError::Unsupported(
974 "recovered generated mob trust owner binding not supported for this CommsRuntime"
975 .to_string(),
976 ))
977 }
978
979 async fn add_private_trusted_peer(
990 &self,
991 _peer: TrustedPeerDescriptor,
992 ) -> Result<(), SendError> {
993 Err(SendError::Unsupported(
994 "generated comms private trust mutation authority required".to_string(),
995 ))
996 }
997
998 async fn remove_private_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
1003 Err(SendError::Unsupported(
1004 "generated comms private trust mutation authority required".to_string(),
1005 ))
1006 }
1007
1008 async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
1010 Err(SendError::Unsupported(
1011 "send not implemented for this CommsRuntime".to_string(),
1012 ))
1013 }
1014
1015 #[doc(hidden)]
1016 fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
1017 let scope_desc = match scope {
1018 StreamScope::Session(session_id) => format!("session {session_id}"),
1019 StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
1020 };
1021 Err(StreamError::NotFound(scope_desc))
1022 }
1023
1024 async fn peers(&self) -> Vec<PeerDirectoryEntry> {
1026 Vec::new()
1027 }
1028
1029 async fn peer_count(&self) -> usize {
1033 self.peers().await.len()
1034 }
1035
1036 #[doc(hidden)]
1037 async fn send_and_stream(
1038 &self,
1039 cmd: CommsCommand,
1040 ) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
1041 let receipt = self.send(cmd).await?;
1042 Err(SendAndStreamError::StreamAttach {
1043 receipt,
1044 error: StreamError::Internal(
1045 "send_and_stream is not implemented for this runtime".to_string(),
1046 ),
1047 })
1048 }
1049
1050 async fn drain_messages(&self) -> Vec<String>;
1052 fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
1054 fn dismiss_received(&self) -> bool {
1056 false
1057 }
1058 fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
1063 None
1064 }
1065
1066 #[doc(hidden)]
1068 fn interaction_event_injector(
1069 &self,
1070 ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
1071 None
1072 }
1073
1074 async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
1079 self.drain_messages()
1080 .await
1081 .into_iter()
1082 .map(|text| crate::interaction::InboxInteraction {
1083 id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
1084 from_route: None,
1085 from: "unknown".into(),
1086 content: crate::interaction::InteractionContent::Message {
1087 body: text.clone(),
1088 blocks: None,
1089 },
1090 rendered_text: text,
1091 handling_mode: crate::types::HandlingMode::Queue,
1092 render_metadata: None,
1093 })
1094 .collect()
1095 }
1096
1097 fn interaction_subscriber(
1102 &self,
1103 _id: &crate::interaction::InteractionId,
1104 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
1105 None
1106 }
1107
1108 fn take_interaction_stream_sender(
1110 &self,
1111 _id: &crate::interaction::InteractionId,
1112 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
1113 self.interaction_subscriber(_id)
1114 }
1115
1116 fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
1122
1123 fn peer_interaction_handle(
1129 &self,
1130 ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
1131 None
1132 }
1133
1134 fn peer_request_response_authority_handle(
1143 &self,
1144 ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
1145 None
1146 }
1147
1148 async fn drain_classified_inbox_interactions(
1156 &self,
1157 ) -> Result<Vec<crate::interaction::ClassifiedInboxInteraction>, CommsCapabilityError> {
1158 Err(CommsCapabilityError::Unsupported(
1159 "drain_classified_inbox_interactions".to_string(),
1160 ))
1161 }
1162
1163 async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
1170 self.drain_classified_inbox_interactions()
1171 .await
1172 .unwrap_or_default()
1173 }
1174
1175 async fn peer_ingress_queue_snapshot(
1180 &self,
1181 ) -> Result<crate::interaction::PeerIngressQueueSnapshot, CommsCapabilityError> {
1182 Err(CommsCapabilityError::Unsupported(
1183 "peer_ingress_queue_snapshot".to_string(),
1184 ))
1185 }
1186
1187 async fn peer_ingress_runtime_snapshot(
1192 &self,
1193 ) -> Result<crate::interaction::PeerIngressRuntimeSnapshot, CommsCapabilityError> {
1194 Err(CommsCapabilityError::Unsupported(
1195 "peer_ingress_runtime_snapshot".to_string(),
1196 ))
1197 }
1198
1199 async fn public_trusted_peer_projection_snapshot(
1206 &self,
1207 ) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
1208 Err(CommsCapabilityError::Unsupported(
1209 "public_trusted_peer_projection_snapshot".to_string(),
1210 ))
1211 }
1212
1213 async fn trusted_peer_projection_snapshot_for_source(
1220 &self,
1221 _source_kind: crate::comms::GeneratedCommsTrustAuthoritySourceKind,
1222 ) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
1223 Err(CommsCapabilityError::Unsupported(
1224 "trusted_peer_projection_snapshot_for_source".to_string(),
1225 ))
1226 }
1227
1228 fn actionable_input_notify(&self) -> Result<Arc<tokio::sync::Notify>, CommsCapabilityError> {
1233 Err(CommsCapabilityError::Unsupported(
1234 "actionable_input_notify".to_string(),
1235 ))
1236 }
1237}
1238
1239pub struct Agent<C, T, S>
1241where
1242 C: AgentLlmClient + ?Sized,
1243 T: AgentToolDispatcher + ?Sized,
1244 S: AgentSessionStore + ?Sized,
1245{
1246 config: AgentConfig,
1247 client: Arc<C>,
1248 tools: Arc<T>,
1249 tool_scope: ToolScope,
1250 store: Arc<S>,
1251 session: Session,
1252 budget: Budget,
1253 retry_policy: RetryPolicy,
1254 depth: u32,
1255 pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
1256 pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
1257 pub(super) hook_run_overrides: HookRunOverrides,
1258 pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
1260 pub(crate) last_input_tokens: u64,
1262 pub(crate) compaction_cadence: SessionCompactionCadence,
1264 pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
1266 pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
1268 pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
1271 pub(crate) event_tap: crate::event_tap::EventTap,
1273 pub(crate) system_context_state:
1275 Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
1276 pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
1279 pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
1285 pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
1287 pub(crate) terminal_error_detail: Option<String>,
1291 pub(crate) run_completed_hooks_applied: bool,
1293 pub(crate) run_completed_event_emitted: bool,
1296 #[allow(dead_code)] pub(crate) silent_comms_intents: Vec<String>,
1300 pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
1302 pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
1304 pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
1306 pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
1308 pub(crate) completion_enrichment:
1310 Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
1311 pub(crate) mob_authority_handle:
1316 Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
1317 pub(crate) turn_state_handle: Option<Arc<dyn crate::TurnStateHandle>>,
1319 pub(crate) runtime_execution_kind_required: bool,
1321 pub(crate) runtime_execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
1324 pub(crate) external_tool_surface_handle: Option<Arc<dyn crate::ExternalToolSurfaceHandle>>,
1327 pub(crate) auth_lease_handle: Option<crate::handles::GeneratedAuthLeaseHandle>,
1329 pub(crate) mcp_server_lifecycle_handle:
1333 Option<Arc<dyn crate::handles::McpServerLifecycleHandle>>,
1334 pub(crate) cancel_after_boundary_tx: CancelAfterBoundarySender,
1341 pub(crate) cancel_after_boundary_rx:
1349 tokio::sync::mpsc::UnboundedReceiver<CancelAfterBoundaryCommand>,
1350 pub(crate) model_defaults_resolver:
1353 Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
1354 pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
1357 pub(crate) extraction_state: extraction::ExtractionState,
1359 pub(crate) last_hidden_deferred_catalog_names: BTreeSet<crate::types::ToolName>,
1361 pub(crate) last_pending_catalog_sources: BTreeSet<String>,
1363 pub(crate) tool_dispatch_context: ToolDispatchContext,
1365 pub(crate) turn_tool_dispatch_metadata: BTreeMap<String, serde_json::Value>,
1367 pub(crate) tools_config: crate::config::ToolsConfig,
1372}
1373
1374#[cfg(test)]
1375#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1376mod tests {
1377 use super::{
1378 AgentToolDispatcher, CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS,
1379 FilteredToolDispatcher, InlinePeerNotificationPolicy, ToolDispatchContext,
1380 };
1381 use crate::comms::{
1382 PeerAddress, PeerId, PeerName, PeerTransport, SendError, TrustedPeerDescriptor,
1383 };
1384 use crate::types::{ContentBlock, ContentInput, ToolCallView, ToolDef, ToolResult};
1385 use async_trait::async_trait;
1386 use serde_json::json;
1387 use std::sync::Arc;
1388 use tokio::sync::Notify;
1389
1390 struct NoopCommsRuntime {
1391 notify: Arc<Notify>,
1392 }
1393
1394 struct ContextAwareToolDispatcher;
1395
1396 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1397 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1398 impl AgentToolDispatcher for ContextAwareToolDispatcher {
1399 fn tools(&self) -> Arc<[Arc<ToolDef>]> {
1400 Arc::from([Arc::new(ToolDef {
1401 name: "inspect_context".into(),
1402 description: "inspect context".to_string(),
1403 input_schema: json!({"type": "object"}),
1404 provenance: None,
1405 })])
1406 }
1407
1408 async fn dispatch(
1409 &self,
1410 call: ToolCallView<'_>,
1411 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1412 Ok(ToolResult::new(
1413 call.id.to_string(),
1414 json!({"saw_context_image": false}).to_string(),
1415 false,
1416 )
1417 .into())
1418 }
1419
1420 async fn dispatch_with_context(
1421 &self,
1422 call: ToolCallView<'_>,
1423 context: &ToolDispatchContext,
1424 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1425 let saw_context_image = context
1426 .current_turn()
1427 .and_then(|turn| turn.image_ref(0))
1428 .and_then(|image_ref| context.current_turn_image(image_ref))
1429 .is_some();
1430 Ok(ToolResult::new(
1431 call.id.to_string(),
1432 json!({"saw_context_image": saw_context_image}).to_string(),
1433 false,
1434 )
1435 .into())
1436 }
1437 }
1438
1439 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1440 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1441 impl CommsRuntime for NoopCommsRuntime {
1442 async fn drain_messages(&self) -> Vec<String> {
1443 Vec::new()
1444 }
1445
1446 fn inbox_notify(&self) -> std::sync::Arc<Notify> {
1447 self.notify.clone()
1448 }
1449 }
1450
1451 #[tokio::test]
1452 async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
1453 let runtime = NoopCommsRuntime {
1454 notify: Arc::new(Notify::new()),
1455 };
1456 assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
1457 let peer = TrustedPeerDescriptor {
1460 peer_id: PeerId::new(),
1461 name: PeerName::new("peer-a").expect("valid peer name"),
1462 address: PeerAddress::new(PeerTransport::Inproc, "peer-a"),
1463 pubkey: [0u8; 32],
1464 };
1465 let result =
1466 <NoopCommsRuntime as CommsRuntime>::add_private_trusted_peer(&runtime, peer).await;
1467 assert!(matches!(result, Err(SendError::Unsupported(_))));
1468 }
1469
1470 #[tokio::test]
1471 async fn filtered_tool_dispatcher_preserves_dispatch_context() {
1472 let dispatcher =
1473 FilteredToolDispatcher::new(Arc::new(ContextAwareToolDispatcher), ["inspect_context"]);
1474 let args = serde_json::value::RawValue::from_string("{}".to_string())
1475 .expect("empty object should be valid JSON");
1476 let call = ToolCallView {
1477 id: "ctx-1",
1478 name: "inspect_context",
1479 args: &args,
1480 };
1481 let context = ToolDispatchContext::from_current_turn_input(&ContentInput::Blocks(vec![
1482 ContentBlock::Image {
1483 media_type: "image/png".to_string(),
1484 data: "abc".into(),
1485 },
1486 ]));
1487
1488 let outcome = dispatcher
1489 .dispatch_with_context(call, &context)
1490 .await
1491 .expect("filtered wrapper should dispatch");
1492 let payload: serde_json::Value =
1493 serde_json::from_str(&outcome.result.text_content()).expect("tool result JSON");
1494 assert_eq!(payload["saw_context_image"], true);
1495 }
1496
1497 #[test]
1498 fn test_inline_peer_notification_policy_from_raw() {
1499 assert_eq!(
1500 InlinePeerNotificationPolicy::try_from_raw(None),
1501 Ok(InlinePeerNotificationPolicy::AtMost(
1502 DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
1503 ))
1504 );
1505 assert_eq!(
1506 InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
1507 Ok(InlinePeerNotificationPolicy::Always)
1508 );
1509 assert_eq!(
1510 InlinePeerNotificationPolicy::try_from_raw(Some(0)),
1511 Ok(InlinePeerNotificationPolicy::Never)
1512 );
1513 assert_eq!(
1514 InlinePeerNotificationPolicy::try_from_raw(Some(25)),
1515 Ok(InlinePeerNotificationPolicy::AtMost(25))
1516 );
1517 assert_eq!(
1518 InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
1519 Err(-42)
1520 );
1521 }
1522
1523 #[test]
1526 fn unit_002_detached_op_completion_has_no_operation_id() {
1527 use crate::agent::DetachedOpCompletion;
1528 use crate::ops_lifecycle::{OperationKind, OperationStatus};
1529
1530 let completion = DetachedOpCompletion {
1531 job_id: "j_test".into(),
1532 kind: OperationKind::BackgroundToolOp,
1533 status: OperationStatus::Completed,
1534 terminal_outcome: None,
1535 display_name: "test cmd".into(),
1536 detail: "ok".into(),
1537 elapsed_ms: None,
1538 };
1539 #[allow(clippy::unwrap_used)]
1540 let json = serde_json::to_value(&completion).unwrap();
1541 assert!(
1542 json.get("operation_id").is_none(),
1543 "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
1544 );
1545 assert!(
1546 json.get("job_id").is_some(),
1547 "job_id must be the app-facing control noun"
1548 );
1549 }
1550}