1use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69 TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70 Usage,
71};
72use agentkit_task_manager::{
73 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74 TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79 AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80 PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutionScope, ToolExecutor, ToolRequest,
81 ToolResources, ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108 pub session_id: SessionId,
110 pub metadata: MetadataMap,
112 pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117 pub fn new(session_id: impl Into<SessionId>) -> Self {
119 Self {
120 session_id: session_id.into(),
121 metadata: MetadataMap::new(),
122 cache: None,
123 }
124 }
125
126 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128 self.metadata = metadata;
129 self
130 }
131
132 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134 self.cache = Some(cache);
135 self
136 }
137
138 pub fn without_cache(mut self) -> Self {
140 self.cache = None;
141 self
142 }
143}
144
145#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152 Disabled,
154 #[default]
156 BestEffort,
157 Required,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168 Default,
170 Short,
172 Extended,
174}
175
176#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179 #[default]
181 Automatic,
182 Explicit {
184 breakpoints: Vec<PromptCacheBreakpoint>,
186 },
187}
188
189impl PromptCacheStrategy {
190 pub fn automatic() -> Self {
193 Self::Automatic
194 }
195
196 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198 Self::Explicit {
199 breakpoints: breakpoints.into_iter().collect(),
200 }
201 }
202}
203
204#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207 ToolsEnd,
209 TranscriptItemEnd { index: usize },
211 TranscriptPartEnd {
217 item_index: usize,
218 part_index: usize,
219 },
220}
221
222impl PromptCacheBreakpoint {
223 pub fn tools_end() -> Self {
225 Self::ToolsEnd
226 }
227
228 pub fn transcript_item_end(index: usize) -> Self {
230 Self::TranscriptItemEnd { index }
231 }
232
233 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235 Self::TranscriptPartEnd {
236 item_index,
237 part_index,
238 }
239 }
240}
241
242#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245 pub mode: PromptCacheMode,
247 pub strategy: PromptCacheStrategy,
249 pub retention: Option<PromptCacheRetention>,
251 pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256 pub fn automatic() -> Self {
258 Self::best_effort(PromptCacheStrategy::automatic())
259 }
260
261 pub fn automatic_required() -> Self {
263 Self::required(PromptCacheStrategy::automatic())
264 }
265
266 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269 }
270
271 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273 Self::required(PromptCacheStrategy::explicit(breakpoints))
274 }
275
276 pub fn disabled() -> Self {
278 Self {
279 mode: PromptCacheMode::Disabled,
280 strategy: PromptCacheStrategy::Automatic,
281 retention: None,
282 key: None,
283 }
284 }
285
286 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288 Self {
289 mode: PromptCacheMode::BestEffort,
290 strategy,
291 retention: None,
292 key: None,
293 }
294 }
295
296 pub fn required(strategy: PromptCacheStrategy) -> Self {
298 Self {
299 mode: PromptCacheMode::Required,
300 strategy,
301 retention: None,
302 key: None,
303 }
304 }
305
306 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308 self.mode = mode;
309 self
310 }
311
312 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314 self.strategy = strategy;
315 self
316 }
317
318 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320 self.retention = Some(retention);
321 self
322 }
323
324 pub fn with_key(mut self, key: impl Into<String>) -> Self {
326 self.key = Some(key.into());
327 self
328 }
329
330 pub fn without_retention(mut self) -> Self {
332 self.retention = None;
333 self
334 }
335
336 pub fn without_key(mut self) -> Self {
338 self.key = None;
339 self
340 }
341
342 pub fn is_enabled(&self) -> bool {
344 !matches!(self.mode, PromptCacheMode::Disabled)
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355 pub session_id: SessionId,
357 pub turn_id: agentkit_core::TurnId,
359 pub transcript: Vec<Item>,
361 pub available_tools: Vec<ToolSpec>,
363 pub cache: Option<PromptCacheRequest>,
365 pub metadata: MetadataMap,
367}
368
369#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375 pub finish_reason: FinishReason,
377 pub output_items: Vec<Item>,
379 pub usage: Option<Usage>,
381 pub metadata: MetadataMap,
383 #[serde(default)]
387 pub model: Option<String>,
388 #[serde(default)]
392 pub response_id: Option<String>,
393}
394
395#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401pub enum ModelTurnEvent {
402 Delta(Delta),
404 ToolCall(ToolCallPart),
406 Usage(Usage),
408 Finished(ModelTurnResult),
410}
411
412#[async_trait]
449pub trait ModelAdapter: Send + Sync {
450 type Session: ModelSession;
452
453 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
459
460 fn provider_name(&self) -> Option<&str> {
467 None
468 }
469}
470
471#[async_trait]
478pub trait ModelSession: Send {
479 type Turn: ModelTurn;
481
482 async fn begin_turn(
495 &mut self,
496 request: TurnRequest,
497 cancellation: Option<TurnCancellation>,
498 ) -> Result<Self::Turn, LoopError>;
499
500 fn model_name(&self) -> Option<&str> {
506 None
507 }
508}
509
510#[async_trait]
516pub trait ModelTurn: Send {
517 async fn next_event(
526 &mut self,
527 cancellation: Option<TurnCancellation>,
528 ) -> Result<Option<ModelTurnEvent>, LoopError>;
529}
530
531pub trait LoopObserver: Send + Sync {
551 fn handle_event(&self, event: AgentEvent);
556}
557
558pub trait TranscriptObserver: Send + Sync {
593 fn on_item_appended(&self, item: &Item);
598}
599
600#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
605#[non_exhaustive]
606pub enum MutationPoint {
607 AfterToolResult,
610 AfterTurnEnded,
613}
614
615pub trait EventEmitter: Send + Sync {
618 fn emit(&self, event: AgentEvent);
620}
621
622#[non_exhaustive]
624pub struct LoopCtx<'a> {
625 pub session_id: &'a SessionId,
627 pub turn_id: Option<&'a agentkit_core::TurnId>,
629 pub point: MutationPoint,
631 pub cancellation: Option<TurnCancellation>,
633 pub emitter: &'a dyn EventEmitter,
635}
636
637pub struct TranscriptCursor<'a> {
645 items: &'a mut Vec<Item>,
646 pub(crate) dirty: bool,
647}
648
649impl<'a> std::ops::Deref for TranscriptCursor<'a> {
650 type Target = Vec<Item>;
651 fn deref(&self) -> &Vec<Item> {
652 self.items
653 }
654}
655
656impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
657 fn deref_mut(&mut self) -> &mut Vec<Item> {
658 self.dirty = true;
659 self.items
660 }
661}
662
663#[async_trait]
671pub trait LoopMutator: Send + Sync {
672 async fn mutate(
677 &self,
678 cursor: &mut TranscriptCursor<'_>,
679 ctx: LoopCtx<'_>,
680 ) -> Result<(), LoopError> {
681 let _ = (cursor, ctx);
682 Ok(())
683 }
684}
685
686#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
691pub enum AgentEvent {
692 RunStarted { session_id: SessionId },
694 TurnStarted {
696 session_id: SessionId,
697 turn_id: agentkit_core::TurnId,
698 },
699 InputAccepted {
701 session_id: SessionId,
702 items: Vec<Item>,
703 },
704 ContentDelta(Delta),
706 ToolCallRequested(ToolCallPart),
708 ToolResultReceived(ToolResultPart),
720 ApprovalRequired(ApprovalRequest),
722 ApprovalResolved { approved: bool },
724 ToolCatalogChanged(ToolCatalogEvent),
726 MutationStarted {
729 session_id: SessionId,
730 turn_id: Option<agentkit_core::TurnId>,
731 mutator: String,
732 point: MutationPoint,
733 },
734 MutationFinished {
738 session_id: SessionId,
739 turn_id: Option<agentkit_core::TurnId>,
740 mutator: String,
741 dirty: bool,
742 metadata: MetadataMap,
743 },
744 UsageUpdated(Usage),
746 Warning { message: String },
748 RunFailed { message: String },
750 TurnFinished(TurnResult),
752}
753
754#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
776pub struct PendingApproval {
777 pub request: ApprovalRequest,
779}
780
781impl std::ops::Deref for PendingApproval {
782 type Target = ApprovalRequest;
783 fn deref(&self) -> &ApprovalRequest {
784 &self.request
785 }
786}
787
788impl PendingApproval {
789 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
791 let call_id = self
792 .request
793 .call_id
794 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
795 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
796 }
797
798 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
800 let call_id = self
801 .request
802 .call_id
803 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
804 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
805 }
806
807 pub fn deny_with_reason<S: ModelSession>(
809 self,
810 driver: &mut LoopDriver<S>,
811 reason: impl Into<String>,
812 ) -> Result<(), LoopError> {
813 let call_id = self
814 .request
815 .call_id
816 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
817 driver.resolve_approval_for(
818 call_id,
819 ApprovalDecision::Deny {
820 reason: Some(reason.into()),
821 },
822 )
823 }
824
825 pub fn approve_with_patched_input<S: ModelSession>(
835 self,
836 driver: &mut LoopDriver<S>,
837 input: serde_json::Value,
838 ) -> Result<(), LoopError> {
839 let call_id = self
840 .request
841 .call_id
842 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
843 driver.resolve_approval_for_with_patched_input(call_id, input)
844 }
845}
846
847#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
872pub struct InputRequest {
873 pub session_id: SessionId,
875 pub reason: String,
877}
878
879impl InputRequest {
880 pub fn submit<S: ModelSession>(
882 self,
883 driver: &mut LoopDriver<S>,
884 items: Vec<Item>,
885 ) -> Result<(), LoopError> {
886 driver.submit_input(items)
887 }
888}
889
890#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
895pub struct TurnResult {
896 pub turn_id: agentkit_core::TurnId,
898 pub finish_reason: FinishReason,
900 pub items: Vec<Item>,
902 pub usage: Option<Usage>,
904 pub metadata: MetadataMap,
906}
907
908#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
944pub enum LoopInterrupt {
945 ApprovalRequest(PendingApproval),
947 AwaitingInput(InputRequest),
949 AfterToolResult(ToolRoundInfo),
959}
960
961impl LoopInterrupt {
962 pub fn is_blocking(&self) -> bool {
968 matches!(self, LoopInterrupt::ApprovalRequest(_))
969 }
970}
971
972#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
975pub struct ToolRoundInfo {
976 pub session_id: SessionId,
978 pub turn_id: agentkit_core::TurnId,
980 pub transcript_len: usize,
982}
983
984impl ToolRoundInfo {
985 pub fn submit<S: ModelSession>(
988 self,
989 driver: &mut LoopDriver<S>,
990 items: Vec<Item>,
991 ) -> Result<(), LoopError> {
992 driver.submit_input(items)
993 }
994}
995
996#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1024pub enum LoopStep {
1025 Interrupt(LoopInterrupt),
1027 Finished(TurnResult),
1029}
1030
1031#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1037pub struct LoopSnapshot {
1038 pub session_id: SessionId,
1040 pub transcript: Vec<Item>,
1042 pub pending_input: Vec<Item>,
1044}
1045
1046#[derive(Clone, Debug)]
1047struct PendingApprovalToolCall {
1048 request: ApprovalRequest,
1049 decision: Option<ApprovalDecision>,
1050 surfaced: bool,
1051 turn_id: agentkit_core::TurnId,
1052 task_id: TaskId,
1053 call: ToolCallPart,
1054 tool_request: ToolRequest,
1055}
1056
1057#[derive(Clone, Debug, Default)]
1058struct ActiveToolRound {
1059 turn_id: agentkit_core::TurnId,
1060 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1061 background_pending: bool,
1062 foreground_progressed: bool,
1063}
1064
1065pub struct Agent<M>
1107where
1108 M: ModelAdapter,
1109{
1110 model: M,
1111 tool_sources: Vec<Arc<dyn ToolSource>>,
1112 tool_executor: Option<Arc<dyn ToolExecutor>>,
1113 task_manager: Arc<dyn TaskManager>,
1114 permissions: Arc<dyn PermissionChecker>,
1115 resources: Arc<dyn ToolResources>,
1116 cancellation: Option<CancellationHandle>,
1117 mutators: Vec<Arc<dyn LoopMutator>>,
1118 observers: Vec<Arc<dyn LoopObserver>>,
1119 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1120 transcript: Vec<Item>,
1121 input: Vec<Item>,
1122}
1123
1124impl<M> Agent<M>
1125where
1126 M: ModelAdapter,
1127{
1128 pub fn builder() -> AgentBuilder<M> {
1130 AgentBuilder::default()
1131 }
1132
1133 pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1149 let session_id = config.session_id.clone();
1150 let default_cache = config.cache.clone();
1151 let session = self.model.start_session(config).await?;
1152 let tool_executor = self
1153 .tool_executor
1154 .clone()
1155 .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1156 let driver = LoopDriver {
1157 session_id: session_id.clone(),
1158 provider_name: self.model.provider_name().map(str::to_owned),
1159 default_cache,
1160 next_turn_cache: None,
1161 session: Some(session),
1162 tool_executor,
1163 task_manager: self.task_manager.clone(),
1164 permissions: self.permissions.clone(),
1165 resources: self.resources.clone(),
1166 cancellation: self.cancellation.clone(),
1167 mutators: self.mutators.clone(),
1168 observers: self.observers.clone(),
1169 transcript_observers: self.transcript_observers.clone(),
1170 transcript: self.transcript.clone(),
1171 pending_input: self.input.clone(),
1172 pending_approvals: BTreeMap::new(),
1173 pending_approval_order: VecDeque::new(),
1174 active_tool_round: None,
1175 pending_round_resume: None,
1176 next_turn_index: 1,
1177 detached_call_ids: HashSet::new(),
1178 };
1179 driver.emit(AgentEvent::RunStarted { session_id });
1180 Ok(driver)
1181 }
1182}
1183
1184pub struct AgentBuilder<M>
1190where
1191 M: ModelAdapter,
1192{
1193 model: Option<M>,
1194 tool_sources: Vec<Arc<dyn ToolSource>>,
1195 tool_executor: Option<Arc<dyn ToolExecutor>>,
1196 task_manager: Option<Arc<dyn TaskManager>>,
1197 permissions: Arc<dyn PermissionChecker>,
1198 resources: Arc<dyn ToolResources>,
1199 cancellation: Option<CancellationHandle>,
1200 mutators: Vec<Arc<dyn LoopMutator>>,
1201 observers: Vec<Arc<dyn LoopObserver>>,
1202 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1203 transcript: Vec<Item>,
1204 input: Vec<Item>,
1205}
1206
1207impl<M> Default for AgentBuilder<M>
1208where
1209 M: ModelAdapter,
1210{
1211 fn default() -> Self {
1212 Self {
1213 model: None,
1214 tool_sources: Vec::new(),
1215 tool_executor: None,
1216 task_manager: None,
1217 permissions: Arc::new(AllowAllPermissions),
1218 resources: Arc::new(()),
1219 cancellation: None,
1220 mutators: Vec::new(),
1221 observers: Vec::new(),
1222 transcript_observers: Vec::new(),
1223 transcript: Vec::new(),
1224 input: Vec::new(),
1225 }
1226 }
1227}
1228
1229impl<M> AgentBuilder<M>
1230where
1231 M: ModelAdapter,
1232{
1233 pub fn model(mut self, model: M) -> Self {
1235 self.model = Some(model);
1236 self
1237 }
1238
1239 pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1252 self.tool_sources.push(Arc::new(source));
1253 self
1254 }
1255
1256 pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1262 self.tool_executor = Some(Arc::new(executor));
1263 self
1264 }
1265
1266 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1271 self.task_manager = Some(Arc::new(manager));
1272 self
1273 }
1274
1275 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1279 self.permissions = Arc::new(permissions);
1280 self
1281 }
1282
1283 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1285 self.resources = Arc::new(resources);
1286 self
1287 }
1288
1289 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1291 self.cancellation = Some(handle);
1292 self
1293 }
1294
1295 pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1303 self.mutators.push(Arc::new(mutator));
1304 self
1305 }
1306
1307 pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1311 self.observers.push(Arc::new(observer));
1312 self
1313 }
1314
1315 pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1324 self.transcript_observers.push(Arc::new(observer));
1325 self
1326 }
1327
1328 pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1336 self.transcript = transcript;
1337 self
1338 }
1339
1340 pub fn input(mut self, input: Vec<Item>) -> Self {
1349 self.input = input;
1350 self
1351 }
1352
1353 pub fn build(self) -> Result<Agent<M>, LoopError> {
1359 let model = self
1360 .model
1361 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1362 Ok(Agent {
1363 model,
1364 tool_sources: self.tool_sources,
1365 tool_executor: self.tool_executor,
1366 task_manager: self
1367 .task_manager
1368 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1369 permissions: self.permissions,
1370 resources: self.resources,
1371 cancellation: self.cancellation,
1372 mutators: self.mutators,
1373 observers: self.observers,
1374 transcript_observers: self.transcript_observers,
1375 transcript: self.transcript,
1376 input: self.input,
1377 })
1378 }
1379}
1380
1381pub struct LoopDriver<S>
1412where
1413 S: ModelSession,
1414{
1415 session_id: SessionId,
1416 provider_name: Option<String>,
1417 default_cache: Option<PromptCacheRequest>,
1418 next_turn_cache: Option<PromptCacheRequest>,
1419 session: Option<S>,
1420 tool_executor: Arc<dyn ToolExecutor>,
1421 task_manager: Arc<dyn TaskManager>,
1422 permissions: Arc<dyn PermissionChecker>,
1423 resources: Arc<dyn ToolResources>,
1424 cancellation: Option<CancellationHandle>,
1425 mutators: Vec<Arc<dyn LoopMutator>>,
1426 observers: Vec<Arc<dyn LoopObserver>>,
1427 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1428 transcript: Vec<Item>,
1429 pending_input: Vec<Item>,
1430 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1431 pending_approval_order: VecDeque<ToolCallId>,
1432 active_tool_round: Option<ActiveToolRound>,
1433 pending_round_resume: Option<agentkit_core::TurnId>,
1434 next_turn_index: u64,
1435 detached_call_ids: HashSet<ToolCallId>,
1443}
1444
1445impl<S> LoopDriver<S>
1446where
1447 S: ModelSession,
1448{
1449 fn execute_tool_span(
1450 &self,
1451 request: &ToolRequest,
1452 turn_id: &agentkit_core::TurnId,
1453 launch_kind: &'static str,
1454 ) -> tracing::Span {
1455 tracing::info_span!(
1456 "agent.execute_tool",
1457 "otel.name" = %format!("execute_tool {}", request.tool_name),
1458 "gen_ai.operation.name" = "execute_tool",
1459 "gen_ai.tool.name" = %request.tool_name,
1460 "gen_ai.tool.call.id" = %request.call_id,
1461 "gen_ai.conversation.id" = %self.session_id,
1462 "error.type" = tracing::field::Empty,
1463 session.id = %self.session_id,
1464 turn.id = %turn_id,
1465 launch_kind = launch_kind,
1466 )
1467 }
1468
1469 fn start_task_via_manager(
1470 &self,
1471 task_id: Option<TaskId>,
1472 tool_request: ToolRequest,
1473 kind: TaskLaunchKind,
1474 cancellation: Option<TurnCancellation>,
1475 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1476 {
1477 let task_manager = self.task_manager.clone();
1478 let tool_executor = self.tool_executor.clone();
1479 let permissions = self.permissions.clone();
1480 let resources = self.resources.clone();
1481 let session_id = self.session_id.clone();
1482 let turn_id = tool_request.turn_id.clone();
1483 let metadata = tool_request.metadata.clone();
1484
1485 async move {
1486 task_manager
1487 .start_task(
1488 TaskLaunchRequest {
1489 task_id,
1490 request: tool_request.clone(),
1491 kind,
1492 },
1493 TaskStartContext {
1494 executor: tool_executor.clone(),
1495 tool_context: {
1496 let execution_scope = ToolExecutionScope {
1497 executor: tool_executor,
1498 session_id: session_id.clone(),
1499 turn_id: turn_id.clone(),
1500 permissions: permissions.clone(),
1501 resources: resources.clone(),
1502 cancellation: cancellation.clone(),
1503 };
1504 OwnedToolContext {
1505 session_id,
1506 turn_id,
1507 metadata,
1508 permissions,
1509 resources,
1510 cancellation,
1511 execution_scope: Some(execution_scope),
1512 approved_request: None,
1513 }
1514 },
1515 },
1516 )
1517 .await
1518 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1519 }
1520 }
1521
1522 fn has_pending_interrupts(&self) -> bool {
1523 !self.pending_approvals.is_empty()
1524 }
1525
1526 fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1527 for event in events {
1528 self.emit(AgentEvent::ToolCatalogChanged(event));
1529 }
1530 }
1531
1532 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1533 let call_id = task.tool_request.call_id.clone();
1534 let call = ToolCallPart {
1535 id: call_id.clone(),
1536 name: task.tool_request.tool_name.to_string(),
1537 input: task.tool_request.input.clone(),
1538 metadata: task.tool_request.metadata.clone(),
1539 };
1540 let mut request = task.approval;
1541 request.call_id = Some(call_id.clone());
1542 let pending = PendingApprovalToolCall {
1543 request: request.clone(),
1544 decision: None,
1545 surfaced: false,
1546 turn_id: turn_id.clone(),
1547 task_id: task.task_id,
1548 call,
1549 tool_request: task.tool_request,
1550 };
1551 self.pending_approvals.insert(call_id.clone(), pending);
1552 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1553 self.pending_approval_order.push_back(call_id);
1554 }
1555 self.emit(AgentEvent::ApprovalRequired(request));
1556 }
1557
1558 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1559 for call_id in self.pending_approval_order.clone() {
1560 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1561 continue;
1562 };
1563 if pending.decision.is_none() && !pending.surfaced {
1564 pending.surfaced = true;
1565 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1566 PendingApproval {
1567 request: pending.request.clone(),
1568 },
1569 )));
1570 }
1571 }
1572 None
1573 }
1574
1575 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1576 self.pending_approval_order.iter().find_map(|call_id| {
1577 self.pending_approvals.get(call_id).and_then(|pending| {
1578 pending.decision.is_none().then(|| {
1579 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1580 request: pending.request.clone(),
1581 }))
1582 })
1583 })
1584 })
1585 }
1586
1587 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1588 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1589 self.pending_approvals
1590 .get(call_id)
1591 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1592 })?;
1593 self.pending_approval_order.retain(|id| id != &call_id);
1594 self.pending_approvals.remove(&call_id)
1595 }
1596
1597 fn queue_resolution_interrupt(
1598 &mut self,
1599 turn_id: &agentkit_core::TurnId,
1600 resolution: TaskResolution,
1601 ) -> Option<LoopStep> {
1602 match resolution {
1603 TaskResolution::Item(item) => {
1604 self.append_tool_result_item(item);
1605 None
1606 }
1607 TaskResolution::Approval(task) => {
1608 self.enqueue_pending_approval(turn_id, task);
1609 self.take_next_unsurfaced_approval_interrupt()
1610 }
1611 }
1612 }
1613
1614 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1615 let PendingLoopUpdates { mut resolutions } = self
1616 .task_manager
1617 .take_pending_loop_updates()
1618 .await
1619 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1620 let mut saw_items = false;
1621 while let Some(resolution) = resolutions.pop_front() {
1622 match resolution {
1623 TaskResolution::Item(item) => {
1624 self.append_tool_result_item(item);
1625 saw_items = true;
1626 }
1627 TaskResolution::Approval(task) => {
1628 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1629 }
1630 }
1631 }
1632 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1633 }
1634
1635 async fn run_mutators(
1636 &mut self,
1637 point: MutationPoint,
1638 turn_id: Option<&agentkit_core::TurnId>,
1639 cancellation: Option<TurnCancellation>,
1640 ) -> Result<(), LoopError> {
1641 if self.mutators.is_empty() {
1642 return Ok(());
1643 }
1644 if cancellation
1645 .as_ref()
1646 .is_some_and(TurnCancellation::is_cancelled)
1647 {
1648 return Err(LoopError::Cancelled);
1649 }
1650 let mutators = self.mutators.clone();
1651 let session_id = self.session_id.clone();
1652 let observers = self.observers.clone();
1653 let emitter = DriverEmitter {
1654 observers: &observers,
1655 };
1656 let mut cursor = TranscriptCursor {
1657 items: &mut self.transcript,
1658 dirty: false,
1659 };
1660 for mutator in &mutators {
1661 if cancellation
1662 .as_ref()
1663 .is_some_and(TurnCancellation::is_cancelled)
1664 {
1665 return Err(LoopError::Cancelled);
1666 }
1667 let ctx = LoopCtx {
1668 session_id: &session_id,
1669 turn_id,
1670 point,
1671 cancellation: cancellation.clone(),
1672 emitter: &emitter,
1673 };
1674 mutator.mutate(&mut cursor, ctx).await?;
1675 }
1676 if cursor.dirty {
1677 validate_transcript_invariants(cursor.items)?;
1678 }
1679 Ok(())
1680 }
1681
1682 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1683 let Some(_) = self.active_tool_round.as_ref() else {
1684 return Ok(None);
1685 };
1686 loop {
1687 let cancellation = self
1688 .cancellation
1689 .as_ref()
1690 .map(CancellationHandle::checkpoint);
1691 let turn_id = self
1692 .active_tool_round
1693 .as_ref()
1694 .map(|active| active.turn_id.clone())
1695 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1696
1697 if cancellation
1698 .as_ref()
1699 .is_some_and(TurnCancellation::is_cancelled)
1700 {
1701 self.task_manager
1702 .on_turn_interrupted(&turn_id)
1703 .await
1704 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1705 self.active_tool_round = None;
1706 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1707 }
1708
1709 let next_call = self
1710 .active_tool_round
1711 .as_mut()
1712 .and_then(|active| active.pending_calls.pop_front());
1713 if let Some((_call, tool_request)) = next_call {
1714 use tracing::Instrument;
1715 let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1716 match self
1717 .start_task_via_manager(
1718 None,
1719 tool_request.clone(),
1720 TaskLaunchKind::Plain,
1721 cancellation.clone(),
1722 )
1723 .instrument(dispatch_span.clone())
1724 .await?
1725 {
1726 TaskStartOutcome::Ready(resolution) => {
1727 let resolution = *resolution;
1728 match resolution {
1729 TaskResolution::Item(item) => {
1730 if tool_result_is_error(&item) {
1731 dispatch_span.record("error.type", "tool_error");
1732 }
1733 if let Some(active) = self.active_tool_round.as_mut() {
1734 active.foreground_progressed = true;
1735 }
1736 self.append_tool_result_item(item);
1737 }
1738 TaskResolution::Approval(task) => {
1739 self.enqueue_pending_approval(&turn_id, task);
1740 }
1741 }
1742 continue;
1743 }
1744 TaskStartOutcome::Pending { kind, .. } => {
1745 if kind == agentkit_task_manager::TaskKind::Background
1746 && let Some(active) = self.active_tool_round.as_mut()
1747 {
1748 active.background_pending = true;
1749 }
1750 continue;
1751 }
1752 }
1753 }
1754
1755 match self
1756 .task_manager
1757 .wait_for_turn(&turn_id, cancellation.clone())
1758 .await
1759 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1760 {
1761 Some(TurnTaskUpdate::Resolution(resolution)) => {
1762 let resolution = *resolution;
1763 match resolution {
1764 TaskResolution::Item(item) => {
1765 if let Some(active) = self.active_tool_round.as_mut() {
1766 active.foreground_progressed = true;
1767 }
1768 self.append_tool_result_item(item);
1769 }
1770 TaskResolution::Approval(task) => {
1771 self.enqueue_pending_approval(&turn_id, task);
1772 }
1773 }
1774 }
1775 Some(TurnTaskUpdate::Detached(snapshot)) => {
1776 let detached_call_id = snapshot.call_id.clone();
1794 self.append_tool_result_item(Item {
1795 id: None,
1796 kind: ItemKind::Tool,
1797 parts: vec![Part::ToolResult(ToolResultPart {
1798 call_id: detached_call_id.clone(),
1799 output: ToolOutput::Text(format!(
1800 "Tool {} is now running in the background. \
1801 The result will be delivered when it completes.",
1802 snapshot.tool_name,
1803 )),
1804 is_error: false,
1805 metadata: MetadataMap::new(),
1806 })],
1807 metadata: MetadataMap::new(),
1808 usage: None,
1809 finish_reason: None,
1810 created_at: None,
1811 });
1812 self.detached_call_ids.insert(detached_call_id);
1813 if let Some(active) = self.active_tool_round.as_mut() {
1814 active.background_pending = true;
1815 active.foreground_progressed = true;
1816 }
1817 }
1818 None => {
1819 if cancellation
1820 .as_ref()
1821 .is_some_and(TurnCancellation::is_cancelled)
1822 {
1823 self.task_manager
1824 .on_turn_interrupted(&turn_id)
1825 .await
1826 .map_err(|error| {
1827 LoopError::Tool(ToolError::Internal(error.to_string()))
1828 })?;
1829 self.active_tool_round = None;
1830 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1831 }
1832 let active = self.active_tool_round.take().ok_or_else(|| {
1833 LoopError::InvalidState("missing active tool round".into())
1834 })?;
1835 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1836 return Ok(Some(step));
1837 }
1838 if let Some(step) = self.next_unresolved_approval_interrupt() {
1839 return Ok(Some(step));
1840 }
1841 if active.background_pending && !active.foreground_progressed {
1842 return Ok(None);
1843 }
1844 let info = ToolRoundInfo {
1851 session_id: self.session_id.clone(),
1852 turn_id: turn_id.clone(),
1853 transcript_len: self.transcript.len(),
1854 };
1855 self.pending_round_resume = Some(turn_id);
1856 return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1857 info,
1858 ))));
1859 }
1860 }
1861 }
1862 }
1863
1864 #[tracing::instrument(
1865 name = "agent.turn",
1866 skip_all,
1867 fields(
1868 otel.name = "invoke_agent",
1869 gen_ai.operation.name = "invoke_agent",
1870 gen_ai.conversation.id = %self.session_id,
1871 gen_ai.provider.name = tracing::field::Empty,
1872 gen_ai.usage.input_tokens = tracing::field::Empty,
1873 gen_ai.usage.output_tokens = tracing::field::Empty,
1874 session.id = %self.session_id,
1875 turn.id = %turn_id,
1876 transcript.len = self.transcript.len(),
1877 saw_tool_call = tracing::field::Empty,
1878 finish_reason = tracing::field::Empty,
1879 ),
1880 )]
1881 async fn drive_turn(
1882 &mut self,
1883 turn_id: agentkit_core::TurnId,
1884 emit_started: bool,
1885 mutation_point: MutationPoint,
1886 ) -> Result<LoopStep, LoopError> {
1887 if let Some(provider) = &self.provider_name {
1888 tracing::Span::current().record("gen_ai.provider.name", provider.as_str());
1889 }
1890 let cancellation = self
1891 .cancellation
1892 .as_ref()
1893 .map(CancellationHandle::checkpoint);
1894 match self
1895 .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1896 .await
1897 {
1898 Ok(()) => {}
1899 Err(LoopError::Cancelled) => {
1900 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1901 }
1902 Err(error) => return Err(error),
1903 }
1904
1905 if !transcript_has_pending_input(&self.transcript) {
1911 let turn_result = TurnResult {
1912 turn_id,
1913 finish_reason: FinishReason::Completed,
1914 items: Vec::new(),
1915 usage: None,
1916 metadata: MetadataMap::new(),
1917 };
1918 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1919 return Ok(LoopStep::Finished(turn_result));
1920 }
1921
1922 if emit_started {
1923 self.emit(AgentEvent::TurnStarted {
1924 session_id: self.session_id.clone(),
1925 turn_id: turn_id.clone(),
1926 });
1927 }
1928 if cancellation
1929 .as_ref()
1930 .is_some_and(TurnCancellation::is_cancelled)
1931 {
1932 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1933 }
1934
1935 let catalog_events = self.tool_executor.drain_catalog_events();
1936 self.emit_tool_catalog_events(catalog_events);
1937
1938 let request = TurnRequest {
1939 session_id: self.session_id.clone(),
1940 turn_id: turn_id.clone(),
1941 transcript: self.transcript.clone(),
1942 available_tools: self.tool_executor.specs(),
1943 cache: self
1944 .next_turn_cache
1945 .take()
1946 .or_else(|| self.default_cache.clone()),
1947 metadata: MetadataMap::new(),
1948 };
1949
1950 let session = self
1951 .session
1952 .as_mut()
1953 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1954
1955 let chat_span = tracing::info_span!(
1962 "chat",
1963 "otel.name" = tracing::field::Empty,
1964 "otel.kind" = "client",
1965 "gen_ai.operation.name" = "chat",
1966 "gen_ai.provider.name" = tracing::field::Empty,
1967 "gen_ai.conversation.id" = %self.session_id,
1968 "gen_ai.request.model" = tracing::field::Empty,
1969 "gen_ai.response.model" = tracing::field::Empty,
1970 "gen_ai.response.id" = tracing::field::Empty,
1971 "gen_ai.response.finish_reasons" = tracing::field::Empty,
1972 "gen_ai.usage.input_tokens" = tracing::field::Empty,
1973 "gen_ai.usage.output_tokens" = tracing::field::Empty,
1974 );
1975 if let Some(provider) = &self.provider_name {
1976 chat_span.record("gen_ai.provider.name", provider.as_str());
1977 }
1978 match session.model_name() {
1979 Some(model) => {
1980 chat_span.record("gen_ai.request.model", model);
1981 chat_span.record("otel.name", format!("chat {model}").as_str());
1982 }
1983 None => {
1984 chat_span.record("otel.name", "chat");
1985 }
1986 }
1987
1988 use tracing::Instrument;
1989 let mut turn = match session
1990 .begin_turn(request, cancellation.clone())
1991 .instrument(chat_span.clone())
1992 .await
1993 {
1994 Ok(turn) => turn,
1995 Err(LoopError::Cancelled) => {
1996 self.task_manager
1997 .on_turn_interrupted(&turn_id)
1998 .await
1999 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2000 return self.finish_cancelled(turn_id, interrupted_assistant_items());
2001 }
2002 Err(error) => return Err(error),
2003 };
2004 let mut saw_tool_call = false;
2005 let mut finished_result = None;
2006
2007 while let Some(event) = match turn
2008 .next_event(cancellation.clone())
2009 .instrument(chat_span.clone())
2010 .await
2011 {
2012 Ok(event) => event,
2013 Err(LoopError::Cancelled) => {
2014 self.task_manager
2015 .on_turn_interrupted(&turn_id)
2016 .await
2017 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2018 return self.finish_cancelled(turn_id, interrupted_assistant_items());
2019 }
2020 Err(error) => return Err(error),
2021 } {
2022 if cancellation
2023 .as_ref()
2024 .is_some_and(TurnCancellation::is_cancelled)
2025 {
2026 self.task_manager
2027 .on_turn_interrupted(&turn_id)
2028 .await
2029 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2030 return self.finish_cancelled(turn_id, interrupted_assistant_items());
2031 }
2032 match event {
2033 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
2034 ModelTurnEvent::Usage(usage) => {
2035 if let Some(tokens) = &usage.tokens {
2036 chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2037 chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2038 }
2039 self.emit(AgentEvent::UsageUpdated(usage));
2040 }
2041 ModelTurnEvent::ToolCall(call) => {
2042 saw_tool_call = true;
2043 self.emit(AgentEvent::ToolCallRequested(call.clone()));
2044 }
2045 ModelTurnEvent::Finished(result) => {
2046 finished_result = Some(result);
2047 break;
2048 }
2049 }
2050 }
2051
2052 let mut result = finished_result.ok_or_else(|| {
2053 LoopError::Provider("model turn ended without a Finished event".into())
2054 })?;
2055 if let Some(model) = &result.model {
2056 chat_span.record("gen_ai.response.model", model.as_str());
2057 }
2058 if let Some(id) = &result.response_id {
2059 chat_span.record("gen_ai.response.id", id.as_str());
2060 }
2061 if let Some(tokens) = result
2062 .usage
2063 .as_ref()
2064 .and_then(|usage| usage.tokens.as_ref())
2065 {
2066 chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2067 chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2068 }
2069 chat_span.record(
2070 "gen_ai.response.finish_reasons",
2071 tracing::field::debug(&result.finish_reason),
2072 );
2073 drop(chat_span);
2074 tracing::Span::current().record("saw_tool_call", saw_tool_call);
2075 tracing::Span::current().record(
2076 "finish_reason",
2077 tracing::field::debug(&result.finish_reason),
2078 );
2079 if let Some(tokens) = result
2080 .usage
2081 .as_ref()
2082 .and_then(|usage| usage.tokens.as_ref())
2083 {
2084 tracing::Span::current().record("gen_ai.usage.input_tokens", tokens.input_tokens);
2085 tracing::Span::current().record("gen_ai.usage.output_tokens", tokens.output_tokens);
2086 }
2087 let now = Timestamp::now();
2088 let usage = result.usage.clone();
2089 let finish_reason = result.finish_reason.clone();
2090 let output_items: Vec<Item> = result
2091 .output_items
2092 .drain(..)
2093 .map(|mut item| {
2094 if matches!(item.kind, ItemKind::Assistant) {
2095 if item.usage.is_none() {
2096 item.usage = usage.clone();
2097 }
2098 if item.finish_reason.is_none() {
2099 item.finish_reason = Some(finish_reason.clone());
2100 }
2101 }
2102 if item.created_at.is_none() {
2103 item.created_at = Some(now);
2104 }
2105 item
2106 })
2107 .collect();
2108 self.extend_transcript(output_items.clone());
2109
2110 if saw_tool_call {
2111 let pending_calls = extract_tool_calls(&output_items)
2112 .into_iter()
2113 .map(|call| {
2114 let tool_request = ToolRequest {
2115 call_id: call.id.clone(),
2116 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
2117 input: call.input.clone(),
2118 session_id: self.session_id.clone(),
2119 turn_id: turn_id.clone(),
2120 metadata: call.metadata.clone(),
2121 };
2122 (call, tool_request)
2123 })
2124 .collect();
2125 self.active_tool_round = Some(ActiveToolRound {
2126 turn_id: turn_id.clone(),
2127 pending_calls,
2128 background_pending: false,
2129 foreground_progressed: false,
2130 });
2131 if let Some(step) = self.continue_active_tool_round().await? {
2132 return Ok(step);
2133 }
2134 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2135 InputRequest {
2136 session_id: self.session_id.clone(),
2137 reason: "driver is waiting for input".into(),
2138 },
2139 )));
2140 }
2141
2142 let turn_result = TurnResult {
2143 turn_id,
2144 finish_reason: result.finish_reason,
2145 items: output_items,
2146 usage: result.usage,
2147 metadata: result.metadata,
2148 };
2149 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2150 Ok(LoopStep::Finished(turn_result))
2151 }
2152
2153 async fn resume_after_approval(
2154 &mut self,
2155 pending: PendingApprovalToolCall,
2156 ) -> Result<LoopStep, LoopError> {
2157 let decision = pending
2158 .decision
2159 .clone()
2160 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2161
2162 match decision {
2163 ApprovalDecision::Approve => {
2164 use tracing::Instrument;
2165 let dispatch_span =
2166 self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2167 match self
2168 .start_task_via_manager(
2169 Some(pending.task_id.clone()),
2170 pending.tool_request.clone(),
2171 TaskLaunchKind::Approved(pending.request.clone()),
2172 self.cancellation
2173 .as_ref()
2174 .map(CancellationHandle::checkpoint),
2175 )
2176 .instrument(dispatch_span.clone())
2177 .await?
2178 {
2179 TaskStartOutcome::Ready(resolution) => {
2180 let resolution = *resolution;
2181 if let TaskResolution::Item(item) = &resolution
2182 && tool_result_is_error(item)
2183 {
2184 dispatch_span.record("error.type", "tool_error");
2185 }
2186 if let Some(step) =
2187 self.queue_resolution_interrupt(&pending.turn_id, resolution)
2188 {
2189 return Ok(step);
2190 }
2191 }
2192 TaskStartOutcome::Pending { .. } => {}
2193 }
2194 }
2195 ApprovalDecision::Deny { reason } => {
2196 self.append_tool_result_item(Item {
2197 id: None,
2198 kind: ItemKind::Tool,
2199 parts: vec![Part::ToolResult(ToolResultPart {
2200 call_id: pending.call.id.clone(),
2201 output: ToolOutput::Text(
2202 reason.unwrap_or_else(|| "approval denied".into()),
2203 ),
2204 is_error: true,
2205 metadata: pending.call.metadata.clone(),
2206 })],
2207 metadata: MetadataMap::new(),
2208 usage: None,
2209 finish_reason: None,
2210 created_at: None,
2211 });
2212 }
2213 }
2214
2215 if let Some(step) = self.continue_active_tool_round().await? {
2216 Ok(step)
2217 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2218 Ok(step)
2219 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2220 Ok(step)
2221 } else {
2222 self.drive_turn(pending.turn_id, false, MutationPoint::AfterToolResult)
2223 .await
2224 }
2225 }
2226
2227 fn finish_cancelled(
2228 &mut self,
2229 turn_id: agentkit_core::TurnId,
2230 items: Vec<Item>,
2231 ) -> Result<LoopStep, LoopError> {
2232 self.extend_transcript(items.clone());
2233 let turn_result = TurnResult {
2234 turn_id,
2235 finish_reason: FinishReason::Cancelled,
2236 items,
2237 usage: None,
2238 metadata: interrupted_metadata("turn"),
2239 };
2240 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2241 Ok(LoopStep::Finished(turn_result))
2242 }
2243
2244 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2254 if self.has_pending_interrupts() {
2255 return Err(LoopError::InvalidState(
2256 "cannot submit input while an interrupt is pending".into(),
2257 ));
2258 }
2259 self.emit(AgentEvent::InputAccepted {
2260 session_id: self.session_id.clone(),
2261 items: input.clone(),
2262 });
2263 self.pending_input.extend(input);
2264 Ok(())
2265 }
2266
2267 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2272 if self.has_pending_interrupts() {
2273 return Err(LoopError::InvalidState(
2274 "cannot update next-turn cache while an interrupt is pending".into(),
2275 ));
2276 }
2277 self.next_turn_cache = Some(cache);
2278 Ok(())
2279 }
2280
2281 #[cfg(test)]
2282 pub(crate) fn submit_input_with_cache(
2283 &mut self,
2284 input: Vec<Item>,
2285 cache: PromptCacheRequest,
2286 ) -> Result<(), LoopError> {
2287 self.set_next_turn_cache(cache)?;
2288 self.submit_input(input)
2289 }
2290
2291 pub fn resolve_approval_for(
2301 &mut self,
2302 call_id: ToolCallId,
2303 decision: ApprovalDecision,
2304 ) -> Result<(), LoopError> {
2305 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2306 return Err(LoopError::InvalidState(format!(
2307 "no approval request is pending for call {}",
2308 call_id.0
2309 )));
2310 };
2311 pending.decision = Some(decision.clone());
2312 self.emit(AgentEvent::ApprovalResolved {
2313 approved: matches!(decision, ApprovalDecision::Approve),
2314 });
2315 Ok(())
2316 }
2317
2318 pub fn resolve_approval_for_with_patched_input(
2331 &mut self,
2332 call_id: ToolCallId,
2333 input: serde_json::Value,
2334 ) -> Result<(), LoopError> {
2335 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2336 return Err(LoopError::InvalidState(format!(
2337 "no approval request is pending for call {}",
2338 call_id.0
2339 )));
2340 };
2341 pending.tool_request.input = input;
2342 self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2343 }
2344
2345 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2348 let mut unresolved = self
2349 .pending_approval_order
2350 .iter()
2351 .filter(|call_id| {
2352 self.pending_approvals
2353 .get(*call_id)
2354 .is_some_and(|pending| pending.decision.is_none())
2355 })
2356 .cloned();
2357 let Some(call_id) = unresolved.next() else {
2358 return Err(LoopError::InvalidState(
2359 "no approval request is pending".into(),
2360 ));
2361 };
2362 if unresolved.next().is_some() {
2363 return Err(LoopError::InvalidState(
2364 "multiple approvals are pending; use resolve_approval_for".into(),
2365 ));
2366 }
2367 self.resolve_approval_for(call_id, decision)
2368 }
2369
2370 pub fn snapshot(&self) -> LoopSnapshot {
2372 LoopSnapshot {
2373 session_id: self.session_id.clone(),
2374 transcript: self.transcript.clone(),
2375 pending_input: self.pending_input.clone(),
2376 }
2377 }
2378
2379 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2400 if let Some(pending) = self.take_next_resolved_approval() {
2401 return self.resume_after_approval(pending).await;
2402 }
2403
2404 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2405 return Ok(step);
2406 }
2407
2408 if let Some(step) = self.next_unresolved_approval_interrupt() {
2409 return Ok(step);
2410 }
2411
2412 if let Some(step) = self.continue_active_tool_round().await? {
2413 return Ok(step);
2414 }
2415
2416 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2417 if let Some(step) = loop_step {
2418 return Ok(step);
2419 }
2420
2421 if let Some(turn_id) = self.pending_round_resume.take() {
2426 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2427 self.extend_transcript(drained);
2428 return self
2429 .drive_turn(turn_id, false, MutationPoint::AfterToolResult)
2430 .await;
2431 }
2432
2433 if self.pending_input.is_empty() && !had_loop_updates {
2434 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2435 InputRequest {
2436 session_id: self.session_id.clone(),
2437 reason: "driver is waiting for input".into(),
2438 },
2439 )));
2440 }
2441
2442 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2443 self.next_turn_index += 1;
2444 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2445 self.extend_transcript(drained);
2446 self.drive_turn(turn_id, true, MutationPoint::AfterTurnEnded)
2447 .await
2448 }
2449
2450 fn emit(&self, event: AgentEvent) {
2451 for observer in &self.observers {
2452 observer.handle_event(event.clone());
2453 }
2454 }
2455
2456 fn append_item(&mut self, mut item: Item) {
2461 if item.created_at.is_none() {
2462 item.created_at = Some(Timestamp::now());
2463 }
2464 for observer in &self.transcript_observers {
2465 observer.on_item_appended(&item);
2466 }
2467 self.transcript.push(item);
2468 }
2469
2470 fn append_tool_result_item(&mut self, item: Item) {
2483 for part in &item.parts {
2484 if let Part::ToolResult(result) = part {
2485 self.emit(AgentEvent::ToolResultReceived(result.clone()));
2486 }
2487 }
2488 let item = self.maybe_convert_detached(item);
2489 self.append_item(item);
2490 }
2491
2492 fn maybe_convert_detached(&mut self, item: Item) -> Item {
2493 if !matches!(item.kind, ItemKind::Tool) {
2494 return item;
2495 }
2496 let results: Vec<&ToolResultPart> = item
2497 .parts
2498 .iter()
2499 .filter_map(|p| match p {
2500 Part::ToolResult(r) => Some(r),
2501 _ => None,
2502 })
2503 .collect();
2504 if results.is_empty()
2505 || !results
2506 .iter()
2507 .all(|r| self.detached_call_ids.contains(&r.call_id))
2508 {
2509 return item;
2510 }
2511 let mut text = String::new();
2512 for result in &results {
2513 self.detached_call_ids.remove(&result.call_id);
2514 if !text.is_empty() {
2515 text.push_str("\n\n");
2516 }
2517 let label = if result.is_error {
2518 "failed"
2519 } else {
2520 "completed"
2521 };
2522 let body = render_tool_output_brief(&result.output);
2523 text.push_str(&format!(
2524 "Background tool call {} {}: {body}",
2525 result.call_id.0, label
2526 ));
2527 }
2528 Item::notification(text)
2529 }
2530
2531 fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2535 let now = Timestamp::now();
2536 for mut item in items {
2537 if item.created_at.is_none() {
2538 item.created_at = Some(now);
2539 }
2540 self.append_item(item);
2541 }
2542 }
2543}
2544
2545fn render_tool_output_brief(output: &ToolOutput) -> String {
2546 match output {
2547 ToolOutput::Text(t) => t.clone(),
2548 ToolOutput::Structured(value) => value.to_string(),
2549 ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2550 ToolOutput::Files(files) => format!("[{} files]", files.len()),
2551 }
2552}
2553
2554fn interrupted_metadata(stage: &str) -> MetadataMap {
2555 let mut metadata = MetadataMap::new();
2556 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2557 metadata.insert(
2558 INTERRUPT_REASON_METADATA_KEY.into(),
2559 USER_CANCELLED_REASON.into(),
2560 );
2561 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2562 metadata
2563}
2564
2565fn interrupted_assistant_items() -> Vec<Item> {
2566 vec![Item {
2567 id: None,
2568 kind: ItemKind::Assistant,
2569 parts: vec![Part::Text(TextPart {
2570 text: "Previous assistant response was interrupted by the user before completion."
2571 .into(),
2572 metadata: interrupted_metadata("assistant"),
2573 })],
2574 metadata: interrupted_metadata("assistant"),
2575 usage: None,
2576 finish_reason: None,
2577 created_at: None,
2578 }]
2579}
2580
2581fn transcript_has_pending_input(transcript: &[Item]) -> bool {
2587 matches!(
2588 transcript.last().map(|item| item.kind),
2589 Some(ItemKind::User | ItemKind::Tool | ItemKind::Notification)
2590 )
2591}
2592
2593fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2594 let mut calls = Vec::new();
2595 for item in items {
2596 for part in &item.parts {
2597 if let Part::ToolCall(call) = part {
2598 calls.push(call.clone());
2599 }
2600 }
2601 }
2602 calls
2603}
2604
2605fn tool_result_is_error(item: &Item) -> bool {
2606 item.parts
2607 .iter()
2608 .any(|part| matches!(part, Part::ToolResult(result) if result.is_error))
2609}
2610
2611#[derive(Debug, Error)]
2613pub enum LoopError {
2614 #[error("invalid driver state: {0}")]
2616 InvalidState(String),
2617 #[error("turn cancelled")]
2619 Cancelled,
2620 #[error("provider error: {0}")]
2622 Provider(String),
2623 #[error("tool error: {0}")]
2625 Tool(#[from] ToolError),
2626 #[error("mutator error: {0}")]
2628 Mutator(String),
2629 #[error("unsupported operation: {0}")]
2631 Unsupported(String),
2632}
2633
2634struct DriverEmitter<'a> {
2639 observers: &'a [Arc<dyn LoopObserver>],
2640}
2641
2642impl<'a> EventEmitter for DriverEmitter<'a> {
2643 fn emit(&self, event: AgentEvent) {
2644 for observer in self.observers {
2645 observer.handle_event(event.clone());
2646 }
2647 }
2648}
2649
2650fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2655 let mut pending: HashSet<ToolCallId> = HashSet::new();
2656 let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2657 let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2658 for item in transcript {
2659 for part in &item.parts {
2660 match part {
2661 Part::ToolCall(call) => {
2662 if !seen_calls.insert(call.id.clone()) {
2663 return Err(LoopError::Mutator(format!(
2664 "transcript invariant violation: duplicate tool_use: {}",
2665 call.id.0
2666 )));
2667 }
2668 pending.insert(call.id.clone());
2669 }
2670 Part::ToolResult(result) => {
2671 if !pending.remove(&result.call_id) {
2672 let kind = if seen_results.contains(&result.call_id) {
2673 "duplicate"
2674 } else {
2675 "orphaned"
2676 };
2677 return Err(LoopError::Mutator(format!(
2678 "transcript invariant violation: {kind} tool_result: {}",
2679 result.call_id.0
2680 )));
2681 }
2682 seen_results.insert(result.call_id.clone());
2683 }
2684 _ => {}
2685 }
2686 }
2687 }
2688 if !pending.is_empty() {
2689 let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2690 return Err(LoopError::Mutator(format!(
2691 "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2692 missing.join(", ")
2693 )));
2694 }
2695 Ok(())
2696}
2697
2698#[cfg(test)]
2699mod tests {
2700 use std::collections::VecDeque;
2701 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2702 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2703
2704 use agentkit_core::{
2705 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2706 ToolResultPart,
2707 };
2708 use agentkit_task_manager::{
2709 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2710 TaskRoutingPolicy,
2711 };
2712 use agentkit_tools_core::{
2713 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2714 ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2715 ToolResult, ToolSpec,
2716 };
2717 use serde_json::{Value, json};
2718 use tokio::sync::Notify;
2719 use tokio::time::{Duration, timeout};
2720
2721 use super::*;
2722
2723 struct FakeAdapter;
2724 struct SlowAdapter;
2725 struct RecordingAdapter {
2726 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2727 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2728 }
2729 struct MultiToolAdapter;
2730 struct DualApprovalAdapter;
2731
2732 struct FakeSession;
2733 struct SlowSession;
2734 struct RecordingSession {
2735 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2736 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2737 }
2738 struct MultiToolSession;
2739 struct DualApprovalSession;
2740
2741 struct FakeTurn {
2742 events: VecDeque<ModelTurnEvent>,
2743 }
2744
2745 struct SlowTurn {
2746 emitted: bool,
2747 }
2748
2749 struct RecordingTurn {
2750 emitted: bool,
2751 }
2752 struct MultiToolTurn {
2753 events: VecDeque<ModelTurnEvent>,
2754 }
2755 struct DualApprovalTurn {
2756 events: VecDeque<ModelTurnEvent>,
2757 }
2758
2759 #[async_trait]
2760 impl ModelAdapter for FakeAdapter {
2761 type Session = FakeSession;
2762
2763 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2764 Ok(FakeSession)
2765 }
2766 }
2767
2768 #[async_trait]
2769 impl ModelAdapter for SlowAdapter {
2770 type Session = SlowSession;
2771
2772 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2773 Ok(SlowSession)
2774 }
2775 }
2776
2777 #[async_trait]
2778 impl ModelAdapter for RecordingAdapter {
2779 type Session = RecordingSession;
2780
2781 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2782 Ok(RecordingSession {
2783 seen_descriptions: self.seen_descriptions.clone(),
2784 seen_caches: self.seen_caches.clone(),
2785 })
2786 }
2787 }
2788
2789 #[async_trait]
2790 impl ModelAdapter for MultiToolAdapter {
2791 type Session = MultiToolSession;
2792
2793 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2794 Ok(MultiToolSession)
2795 }
2796 }
2797
2798 #[async_trait]
2799 impl ModelAdapter for DualApprovalAdapter {
2800 type Session = DualApprovalSession;
2801
2802 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2803 Ok(DualApprovalSession)
2804 }
2805 }
2806
2807 #[async_trait]
2808 impl ModelSession for FakeSession {
2809 type Turn = FakeTurn;
2810
2811 async fn begin_turn(
2812 &mut self,
2813 request: TurnRequest,
2814 _cancellation: Option<TurnCancellation>,
2815 ) -> Result<Self::Turn, LoopError> {
2816 let has_tool_result = request.transcript.iter().any(|item| {
2817 item.kind == ItemKind::Tool
2818 && item
2819 .parts
2820 .iter()
2821 .any(|part| matches!(part, Part::ToolResult(_)))
2822 });
2823 let tool_name = request
2824 .available_tools
2825 .first()
2826 .map(|tool| tool.name.0.clone())
2827 .unwrap_or_else(|| "echo".into());
2828
2829 let events = if has_tool_result {
2830 let result_text = request
2831 .transcript
2832 .iter()
2833 .rev()
2834 .find_map(|item| {
2835 item.parts.iter().find_map(|part| match part {
2836 Part::ToolResult(ToolResultPart {
2837 output: ToolOutput::Text(text),
2838 ..
2839 }) => Some(text.clone()),
2840 _ => None,
2841 })
2842 })
2843 .unwrap_or_else(|| "missing".into());
2844
2845 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2846 model: None,
2847 response_id: None,
2848 finish_reason: FinishReason::Completed,
2849 output_items: vec![Item {
2850 id: None,
2851 kind: ItemKind::Assistant,
2852 parts: vec![Part::Text(TextPart {
2853 text: format!("tool said: {result_text}"),
2854 metadata: MetadataMap::new(),
2855 })],
2856 metadata: MetadataMap::new(),
2857 usage: None,
2858 finish_reason: None,
2859 created_at: None,
2860 }],
2861 usage: None,
2862 metadata: MetadataMap::new(),
2863 })])
2864 } else {
2865 VecDeque::from([
2866 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2867 id: ToolCallId::new("call-1"),
2868 name: tool_name.clone(),
2869 input: json!({ "value": "pong" }),
2870 metadata: MetadataMap::new(),
2871 }),
2872 ModelTurnEvent::Finished(ModelTurnResult {
2873 model: None,
2874 response_id: None,
2875 finish_reason: FinishReason::ToolCall,
2876 output_items: vec![Item {
2877 id: None,
2878 kind: ItemKind::Assistant,
2879 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2880 id: ToolCallId::new("call-1"),
2881 name: tool_name,
2882 input: json!({ "value": "pong" }),
2883 metadata: MetadataMap::new(),
2884 })],
2885 metadata: MetadataMap::new(),
2886 usage: None,
2887 finish_reason: None,
2888 created_at: None,
2889 }],
2890 usage: None,
2891 metadata: MetadataMap::new(),
2892 }),
2893 ])
2894 };
2895
2896 Ok(FakeTurn { events })
2897 }
2898 }
2899
2900 #[async_trait]
2901 impl ModelSession for SlowSession {
2902 type Turn = SlowTurn;
2903
2904 async fn begin_turn(
2905 &mut self,
2906 request: TurnRequest,
2907 cancellation: Option<TurnCancellation>,
2908 ) -> Result<Self::Turn, LoopError> {
2909 let should_block = request
2910 .transcript
2911 .iter()
2912 .rev()
2913 .find(|item| item.kind == ItemKind::User)
2914 .is_some_and(|item| {
2915 item.parts.iter().any(|part| match part {
2916 Part::Text(text) => text.text == "do the long task",
2917 _ => false,
2918 })
2919 });
2920
2921 if should_block && let Some(cancellation) = cancellation {
2922 cancellation.cancelled().await;
2923 return Err(LoopError::Cancelled);
2924 }
2925
2926 Ok(SlowTurn { emitted: false })
2927 }
2928 }
2929
2930 #[async_trait]
2931 impl ModelSession for RecordingSession {
2932 type Turn = RecordingTurn;
2933
2934 async fn begin_turn(
2935 &mut self,
2936 request: TurnRequest,
2937 _cancellation: Option<TurnCancellation>,
2938 ) -> Result<Self::Turn, LoopError> {
2939 let descriptions = request
2940 .available_tools
2941 .iter()
2942 .map(|tool| tool.description.clone())
2943 .collect::<Vec<_>>();
2944 self.seen_descriptions.lock().unwrap().push(descriptions);
2945 self.seen_caches.lock().unwrap().push(request.cache.clone());
2946
2947 Ok(RecordingTurn { emitted: false })
2948 }
2949 }
2950
2951 #[async_trait]
2952 impl ModelSession for MultiToolSession {
2953 type Turn = MultiToolTurn;
2954
2955 async fn begin_turn(
2956 &mut self,
2957 request: TurnRequest,
2958 _cancellation: Option<TurnCancellation>,
2959 ) -> Result<Self::Turn, LoopError> {
2960 let has_tool_result = request.transcript.iter().any(|item| {
2961 item.kind == ItemKind::Tool
2962 && item
2963 .parts
2964 .iter()
2965 .any(|part| matches!(part, Part::ToolResult(_)))
2966 });
2967
2968 let events = if has_tool_result {
2969 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2970 model: None,
2971 response_id: None,
2972 finish_reason: FinishReason::Completed,
2973 output_items: vec![Item {
2974 id: None,
2975 kind: ItemKind::Assistant,
2976 parts: vec![Part::Text(TextPart {
2977 text: "mixed tools finished".into(),
2978 metadata: MetadataMap::new(),
2979 })],
2980 metadata: MetadataMap::new(),
2981 usage: None,
2982 finish_reason: None,
2983 created_at: None,
2984 }],
2985 usage: None,
2986 metadata: MetadataMap::new(),
2987 })])
2988 } else {
2989 let foreground = agentkit_core::ToolCallPart {
2990 id: ToolCallId::new("call-foreground"),
2991 name: "foreground-wait".into(),
2992 input: json!({}),
2993 metadata: MetadataMap::new(),
2994 };
2995 let background = agentkit_core::ToolCallPart {
2996 id: ToolCallId::new("call-background"),
2997 name: "background-wait".into(),
2998 input: json!({}),
2999 metadata: MetadataMap::new(),
3000 };
3001 VecDeque::from([
3002 ModelTurnEvent::ToolCall(foreground.clone()),
3003 ModelTurnEvent::ToolCall(background.clone()),
3004 ModelTurnEvent::Finished(ModelTurnResult {
3005 model: None,
3006 response_id: None,
3007 finish_reason: FinishReason::ToolCall,
3008 output_items: vec![Item {
3009 id: None,
3010 kind: ItemKind::Assistant,
3011 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
3012 metadata: MetadataMap::new(),
3013 usage: None,
3014 finish_reason: None,
3015 created_at: None,
3016 }],
3017 usage: None,
3018 metadata: MetadataMap::new(),
3019 }),
3020 ])
3021 };
3022
3023 Ok(MultiToolTurn { events })
3024 }
3025 }
3026
3027 #[async_trait]
3028 impl ModelSession for DualApprovalSession {
3029 type Turn = DualApprovalTurn;
3030
3031 async fn begin_turn(
3032 &mut self,
3033 request: TurnRequest,
3034 _cancellation: Option<TurnCancellation>,
3035 ) -> Result<Self::Turn, LoopError> {
3036 let tool_results = request
3037 .transcript
3038 .iter()
3039 .flat_map(|item| item.parts.iter())
3040 .filter(|part| matches!(part, Part::ToolResult(_)))
3041 .count();
3042
3043 let events = if tool_results >= 2 {
3044 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3045 model: None,
3046 response_id: None,
3047 finish_reason: FinishReason::Completed,
3048 output_items: vec![Item {
3049 id: None,
3050 kind: ItemKind::Assistant,
3051 parts: vec![Part::Text(TextPart {
3052 text: "both approvals finished".into(),
3053 metadata: MetadataMap::new(),
3054 })],
3055 metadata: MetadataMap::new(),
3056 usage: None,
3057 finish_reason: None,
3058 created_at: None,
3059 }],
3060 usage: None,
3061 metadata: MetadataMap::new(),
3062 })])
3063 } else {
3064 let first = agentkit_core::ToolCallPart {
3065 id: ToolCallId::new("call-1"),
3066 name: "echo".into(),
3067 input: json!({ "value": "first" }),
3068 metadata: MetadataMap::new(),
3069 };
3070 let second = agentkit_core::ToolCallPart {
3071 id: ToolCallId::new("call-2"),
3072 name: "echo".into(),
3073 input: json!({ "value": "second" }),
3074 metadata: MetadataMap::new(),
3075 };
3076 VecDeque::from([
3077 ModelTurnEvent::ToolCall(first.clone()),
3078 ModelTurnEvent::ToolCall(second.clone()),
3079 ModelTurnEvent::Finished(ModelTurnResult {
3080 model: None,
3081 response_id: None,
3082 finish_reason: FinishReason::ToolCall,
3083 output_items: vec![Item {
3084 id: None,
3085 kind: ItemKind::Assistant,
3086 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
3087 metadata: MetadataMap::new(),
3088 usage: None,
3089 finish_reason: None,
3090 created_at: None,
3091 }],
3092 usage: None,
3093 metadata: MetadataMap::new(),
3094 }),
3095 ])
3096 };
3097
3098 Ok(DualApprovalTurn { events })
3099 }
3100 }
3101
3102 #[async_trait]
3103 impl ModelTurn for FakeTurn {
3104 async fn next_event(
3105 &mut self,
3106 _cancellation: Option<TurnCancellation>,
3107 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3108 Ok(self.events.pop_front())
3109 }
3110 }
3111
3112 #[async_trait]
3113 impl ModelTurn for SlowTurn {
3114 async fn next_event(
3115 &mut self,
3116 cancellation: Option<TurnCancellation>,
3117 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3118 if let Some(cancellation) = cancellation
3119 && cancellation.is_cancelled()
3120 {
3121 return Err(LoopError::Cancelled);
3122 }
3123
3124 if self.emitted {
3125 Ok(None)
3126 } else {
3127 self.emitted = true;
3128 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3129 model: None,
3130 response_id: None,
3131 finish_reason: FinishReason::Completed,
3132 output_items: vec![Item {
3133 id: None,
3134 kind: ItemKind::Assistant,
3135 parts: vec![Part::Text(TextPart {
3136 text: "done".into(),
3137 metadata: MetadataMap::new(),
3138 })],
3139 metadata: MetadataMap::new(),
3140 usage: None,
3141 finish_reason: None,
3142 created_at: None,
3143 }],
3144 usage: None,
3145 metadata: MetadataMap::new(),
3146 })))
3147 }
3148 }
3149 }
3150
3151 #[async_trait]
3152 impl ModelTurn for RecordingTurn {
3153 async fn next_event(
3154 &mut self,
3155 _cancellation: Option<TurnCancellation>,
3156 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3157 if self.emitted {
3158 Ok(None)
3159 } else {
3160 self.emitted = true;
3161 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3162 model: None,
3163 response_id: None,
3164 finish_reason: FinishReason::Completed,
3165 output_items: vec![Item {
3166 id: None,
3167 kind: ItemKind::Assistant,
3168 parts: vec![Part::Text(TextPart {
3169 text: "done".into(),
3170 metadata: MetadataMap::new(),
3171 })],
3172 metadata: MetadataMap::new(),
3173 usage: None,
3174 finish_reason: None,
3175 created_at: None,
3176 }],
3177 usage: None,
3178 metadata: MetadataMap::new(),
3179 })))
3180 }
3181 }
3182 }
3183
3184 #[async_trait]
3185 impl ModelTurn for MultiToolTurn {
3186 async fn next_event(
3187 &mut self,
3188 _cancellation: Option<TurnCancellation>,
3189 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3190 Ok(self.events.pop_front())
3191 }
3192 }
3193
3194 #[async_trait]
3195 impl ModelTurn for DualApprovalTurn {
3196 async fn next_event(
3197 &mut self,
3198 _cancellation: Option<TurnCancellation>,
3199 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3200 Ok(self.events.pop_front())
3201 }
3202 }
3203
3204 #[derive(Clone)]
3205 struct EchoTool {
3206 spec: ToolSpec,
3207 }
3208
3209 impl Default for EchoTool {
3210 fn default() -> Self {
3211 Self {
3212 spec: ToolSpec {
3213 name: ToolName::new("echo"),
3214 description: "Echo back a value".into(),
3215 input_schema: json!({
3216 "type": "object",
3217 "properties": {
3218 "value": { "type": "string" }
3219 },
3220 "required": ["value"],
3221 "additionalProperties": false
3222 }),
3223 output_schema: None,
3224 annotations: ToolAnnotations::default(),
3225 metadata: MetadataMap::new(),
3226 },
3227 }
3228 }
3229 }
3230
3231 #[derive(Clone)]
3232 struct DynamicSpecTool {
3233 spec: ToolSpec,
3234 version: StdArc<AtomicUsize>,
3235 }
3236
3237 impl DynamicSpecTool {
3238 fn new(version: StdArc<AtomicUsize>) -> Self {
3239 Self {
3240 spec: ToolSpec {
3241 name: ToolName::new("dynamic"),
3242 description: "dynamic version 0".into(),
3243 input_schema: json!({
3244 "type": "object",
3245 "properties": {},
3246 "additionalProperties": false
3247 }),
3248 output_schema: None,
3249 annotations: ToolAnnotations::default(),
3250 metadata: MetadataMap::new(),
3251 },
3252 version,
3253 }
3254 }
3255 }
3256
3257 #[async_trait]
3258 impl Tool for EchoTool {
3259 fn spec(&self) -> &ToolSpec {
3260 &self.spec
3261 }
3262
3263 fn proposed_requests(
3264 &self,
3265 request: &agentkit_tools_core::ToolRequest,
3266 ) -> Result<
3267 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3268 agentkit_tools_core::ToolError,
3269 > {
3270 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3271 path: "/tmp/echo".into(),
3272 metadata: request.metadata.clone(),
3273 })])
3274 }
3275
3276 async fn invoke(
3277 &self,
3278 request: agentkit_tools_core::ToolRequest,
3279 _ctx: &mut ToolContext<'_>,
3280 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3281 let value = request
3282 .input
3283 .get("value")
3284 .and_then(Value::as_str)
3285 .ok_or_else(|| {
3286 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3287 })?;
3288
3289 Ok(ToolResult {
3290 result: ToolResultPart {
3291 call_id: request.call_id,
3292 output: ToolOutput::Text(value.into()),
3293 is_error: false,
3294 metadata: MetadataMap::new(),
3295 },
3296 duration: None,
3297 metadata: MetadataMap::new(),
3298 })
3299 }
3300 }
3301
3302 #[async_trait]
3303 impl Tool for DynamicSpecTool {
3304 fn spec(&self) -> &ToolSpec {
3305 &self.spec
3306 }
3307
3308 fn current_spec(&self) -> Option<ToolSpec> {
3309 let mut spec = self.spec.clone();
3310 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3311 Some(spec)
3312 }
3313
3314 async fn invoke(
3315 &self,
3316 request: agentkit_tools_core::ToolRequest,
3317 _ctx: &mut ToolContext<'_>,
3318 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3319 Ok(ToolResult {
3320 result: ToolResultPart {
3321 call_id: request.call_id,
3322 output: ToolOutput::Text("ok".into()),
3323 is_error: false,
3324 metadata: MetadataMap::new(),
3325 },
3326 duration: None,
3327 metadata: MetadataMap::new(),
3328 })
3329 }
3330 }
3331
3332 struct DenyFsReads;
3333
3334 impl PermissionChecker for DenyFsReads {
3335 fn evaluate(
3336 &self,
3337 request: &dyn agentkit_tools_core::PermissionRequest,
3338 ) -> PermissionDecision {
3339 if request.kind() == "filesystem.read" {
3340 return PermissionDecision::Deny(PermissionDenial {
3341 code: PermissionCode::PathNotAllowed,
3342 message: "reads denied in test".into(),
3343 metadata: MetadataMap::new(),
3344 });
3345 }
3346
3347 PermissionDecision::Allow
3348 }
3349 }
3350
3351 struct ApproveFsReads;
3352
3353 impl PermissionChecker for ApproveFsReads {
3354 fn evaluate(
3355 &self,
3356 request: &dyn agentkit_tools_core::PermissionRequest,
3357 ) -> PermissionDecision {
3358 if request.kind() == "filesystem.read" {
3359 return PermissionDecision::RequireApproval(ApprovalRequest {
3360 task_id: None,
3361 call_id: None,
3362 id: "approval:fs-read".into(),
3363 request_kind: request.kind().into(),
3364 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3365 summary: request.summary(),
3366 metadata: request.metadata().clone(),
3367 });
3368 }
3369
3370 PermissionDecision::Allow
3371 }
3372 }
3373
3374 struct KeepRecentMutator {
3375 keep: usize,
3376 }
3377
3378 #[async_trait]
3379 impl LoopMutator for KeepRecentMutator {
3380 async fn mutate(
3381 &self,
3382 cursor: &mut TranscriptCursor<'_>,
3383 ctx: LoopCtx<'_>,
3384 ) -> Result<(), LoopError> {
3385 if cursor.len() < 2 {
3386 return Ok(());
3387 }
3388 let drop = cursor.len().saturating_sub(self.keep);
3389 ctx.emitter.emit(AgentEvent::MutationStarted {
3390 session_id: ctx.session_id.clone(),
3391 turn_id: ctx.turn_id.cloned(),
3392 mutator: "keep-recent".into(),
3393 point: ctx.point,
3394 });
3395 cursor.drain(..drop);
3396 ctx.emitter.emit(AgentEvent::MutationFinished {
3397 session_id: ctx.session_id.clone(),
3398 turn_id: ctx.turn_id.cloned(),
3399 mutator: "keep-recent".into(),
3400 dirty: true,
3401 metadata: MetadataMap::new(),
3402 });
3403 Ok(())
3404 }
3405 }
3406
3407 struct PointRecordingMutator {
3410 points: StdArc<StdMutex<Vec<MutationPoint>>>,
3411 }
3412
3413 #[async_trait]
3414 impl LoopMutator for PointRecordingMutator {
3415 async fn mutate(
3416 &self,
3417 _cursor: &mut TranscriptCursor<'_>,
3418 ctx: LoopCtx<'_>,
3419 ) -> Result<(), LoopError> {
3420 self.points.lock().unwrap().push(ctx.point);
3421 Ok(())
3422 }
3423 }
3424
3425 struct RecordingObserver {
3426 events: StdArc<StdMutex<Vec<AgentEvent>>>,
3427 }
3428
3429 impl LoopObserver for RecordingObserver {
3430 fn handle_event(&self, event: AgentEvent) {
3431 self.events.lock().unwrap().push(event);
3432 }
3433 }
3434
3435 struct CatalogExecutor {
3436 version: AtomicUsize,
3437 events: StdMutex<Vec<ToolCatalogEvent>>,
3438 }
3439
3440 impl CatalogExecutor {
3441 fn new() -> Self {
3442 Self {
3443 version: AtomicUsize::new(0),
3444 events: StdMutex::new(Vec::new()),
3445 }
3446 }
3447
3448 fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3449 self.version.store(version, Ordering::SeqCst);
3450 self.events.lock().unwrap().push(event);
3451 }
3452 }
3453
3454 #[async_trait]
3455 impl ToolExecutor for CatalogExecutor {
3456 fn specs(&self) -> Vec<ToolSpec> {
3457 vec![ToolSpec {
3458 name: ToolName::new("dynamic"),
3459 description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3460 input_schema: json!({
3461 "type": "object",
3462 "properties": {},
3463 "additionalProperties": false
3464 }),
3465 output_schema: None,
3466 annotations: ToolAnnotations::default(),
3467 metadata: MetadataMap::new(),
3468 }]
3469 }
3470
3471 fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3472 std::mem::take(&mut *self.events.lock().unwrap())
3473 }
3474
3475 async fn execute(
3476 &self,
3477 request: ToolRequest,
3478 _ctx: &mut ToolContext<'_>,
3479 ) -> ToolExecutionOutcome {
3480 ToolExecutionOutcome::Completed(ToolResult {
3481 result: ToolResultPart {
3482 call_id: request.call_id,
3483 output: ToolOutput::Text("dynamic-ok".into()),
3484 is_error: false,
3485 metadata: MetadataMap::new(),
3486 },
3487 duration: None,
3488 metadata: MetadataMap::new(),
3489 })
3490 }
3491 }
3492
3493 #[derive(Clone)]
3494 struct BlockingTool {
3495 spec: ToolSpec,
3496 entered: StdArc<AtomicBool>,
3497 release: StdArc<Notify>,
3498 output: &'static str,
3499 }
3500
3501 impl BlockingTool {
3502 fn new(
3503 name: &str,
3504 entered: StdArc<AtomicBool>,
3505 release: StdArc<Notify>,
3506 output: &'static str,
3507 ) -> Self {
3508 Self {
3509 spec: ToolSpec {
3510 name: ToolName::new(name),
3511 description: format!("blocking tool {name}"),
3512 input_schema: json!({
3513 "type": "object",
3514 "properties": {},
3515 "additionalProperties": false
3516 }),
3517 output_schema: None,
3518 annotations: ToolAnnotations::default(),
3519 metadata: MetadataMap::new(),
3520 },
3521 entered,
3522 release,
3523 output,
3524 }
3525 }
3526 }
3527
3528 #[async_trait]
3529 impl Tool for BlockingTool {
3530 fn spec(&self) -> &ToolSpec {
3531 &self.spec
3532 }
3533
3534 async fn invoke(
3535 &self,
3536 request: agentkit_tools_core::ToolRequest,
3537 _ctx: &mut ToolContext<'_>,
3538 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3539 self.entered.store(true, Ordering::SeqCst);
3540 self.release.notified().await;
3541 Ok(ToolResult {
3542 result: ToolResultPart {
3543 call_id: request.call_id,
3544 output: ToolOutput::Text(self.output.into()),
3545 is_error: false,
3546 metadata: MetadataMap::new(),
3547 },
3548 duration: None,
3549 metadata: MetadataMap::new(),
3550 })
3551 }
3552 }
3553
3554 struct NameRoutingPolicy {
3555 routes: Vec<(String, RoutingDecision)>,
3556 }
3557
3558 impl NameRoutingPolicy {
3559 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3560 Self {
3561 routes: routes
3562 .into_iter()
3563 .map(|(name, decision)| (name.into(), decision))
3564 .collect(),
3565 }
3566 }
3567 }
3568
3569 impl TaskRoutingPolicy for NameRoutingPolicy {
3570 fn route(&self, request: &ToolRequest) -> RoutingDecision {
3571 self.routes
3572 .iter()
3573 .find(|(name, _)| name == &request.tool_name.0)
3574 .map(|(_, decision)| *decision)
3575 .unwrap_or(RoutingDecision::Foreground)
3576 }
3577 }
3578
3579 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3580 timeout(Duration::from_secs(1), handle.next_event())
3581 .await
3582 .expect("timed out waiting for task event")
3583 .expect("task event stream ended unexpectedly")
3584 }
3585
3586 async fn wait_until_entered(flag: &AtomicBool) {
3587 timeout(Duration::from_secs(1), async {
3588 while !flag.load(Ordering::SeqCst) {
3589 tokio::task::yield_now().await;
3590 }
3591 })
3592 .await
3593 .expect("task never entered execution");
3594 }
3595
3596 #[tokio::test]
3597 async fn loop_continues_after_completed_tool_call() {
3598 let tools = ToolRegistry::new().with(EchoTool::default());
3599 let agent = Agent::builder()
3600 .model(FakeAdapter)
3601 .add_tool_source(tools)
3602 .permissions(AllowAllPermissions)
3603 .build()
3604 .unwrap();
3605
3606 let mut driver = agent
3607 .start(SessionConfig {
3608 session_id: SessionId::new("session-1"),
3609 metadata: MetadataMap::new(),
3610 cache: None,
3611 })
3612 .await
3613 .unwrap();
3614
3615 driver
3616 .submit_input(vec![Item {
3617 id: None,
3618 kind: ItemKind::User,
3619 parts: vec![Part::Text(TextPart {
3620 text: "ping".into(),
3621 metadata: MetadataMap::new(),
3622 })],
3623 metadata: MetadataMap::new(),
3624 usage: None,
3625 finish_reason: None,
3626 created_at: None,
3627 }])
3628 .unwrap();
3629
3630 let result = run_until_finished(&mut driver).await;
3631
3632 match result {
3633 LoopStep::Finished(turn) => {
3634 assert_eq!(turn.finish_reason, FinishReason::Completed);
3635 assert_eq!(turn.items.len(), 1);
3636 match &turn.items[0].parts[0] {
3637 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3638 other => panic!("unexpected part: {other:?}"),
3639 }
3640 }
3641 other => panic!("unexpected loop step: {other:?}"),
3642 }
3643 }
3644
3645 async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3649 loop {
3650 match driver.next().await.unwrap() {
3651 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3652 step => return step,
3653 }
3654 }
3655 }
3656
3657 #[tokio::test]
3664 async fn post_tool_continuation_reports_after_tool_result_mutation_point() {
3665 let points = StdArc::new(StdMutex::new(Vec::<MutationPoint>::new()));
3666 let tools = ToolRegistry::new().with(EchoTool::default());
3667 let agent = Agent::builder()
3668 .model(FakeAdapter)
3669 .add_tool_source(tools)
3670 .permissions(AllowAllPermissions)
3671 .mutator(PointRecordingMutator {
3672 points: points.clone(),
3673 })
3674 .build()
3675 .unwrap();
3676
3677 let mut driver = agent
3678 .start(SessionConfig {
3679 session_id: SessionId::new("session-mutation-point"),
3680 metadata: MetadataMap::new(),
3681 cache: None,
3682 })
3683 .await
3684 .unwrap();
3685
3686 driver
3687 .submit_input(vec![Item::text(ItemKind::User, "ping")])
3688 .unwrap();
3689
3690 let _ = run_until_finished(&mut driver).await;
3692
3693 let recorded = points.lock().unwrap().clone();
3694 assert_eq!(
3695 recorded.first(),
3696 Some(&MutationPoint::AfterTurnEnded),
3697 "first drive of a fresh turn must report AfterTurnEnded, got {recorded:?}"
3698 );
3699 assert!(
3700 recorded.contains(&MutationPoint::AfterToolResult),
3701 "post-tool continuation must report AfterToolResult, got {recorded:?}"
3702 );
3703 }
3704
3705 #[test]
3706 fn pending_input_requires_input_bearing_tail_role() {
3707 assert!(!transcript_has_pending_input(&[]));
3708 assert!(!transcript_has_pending_input(&[Item::text(
3709 ItemKind::System,
3710 "system"
3711 )]));
3712 assert!(!transcript_has_pending_input(&[Item::text(
3713 ItemKind::Developer,
3714 "developer"
3715 )]));
3716 assert!(!transcript_has_pending_input(&[Item::text(
3717 ItemKind::Context,
3718 "context"
3719 )]));
3720 assert!(!transcript_has_pending_input(&[Item::text(
3721 ItemKind::Assistant,
3722 "assistant"
3723 )]));
3724
3725 assert!(transcript_has_pending_input(&[Item::text(
3726 ItemKind::User,
3727 "user"
3728 )]));
3729 assert!(transcript_has_pending_input(&[Item::notification(
3730 "background update"
3731 )]));
3732 assert!(transcript_has_pending_input(&[Item {
3733 id: None,
3734 kind: ItemKind::Tool,
3735 parts: vec![Part::ToolResult(ToolResultPart {
3736 call_id: ToolCallId::new("call-test"),
3737 output: ToolOutput::Text("ok".into()),
3738 is_error: false,
3739 metadata: MetadataMap::new(),
3740 })],
3741 metadata: MetadataMap::new(),
3742 usage: None,
3743 finish_reason: None,
3744 created_at: None,
3745 }]));
3746 }
3747
3748 struct DropTrailingUserMutator;
3754
3755 #[async_trait]
3756 impl LoopMutator for DropTrailingUserMutator {
3757 async fn mutate(
3758 &self,
3759 cursor: &mut TranscriptCursor<'_>,
3760 _ctx: LoopCtx<'_>,
3761 ) -> Result<(), LoopError> {
3762 if cursor.last().map(|item| item.kind) == Some(ItemKind::User) {
3763 cursor.pop();
3764 }
3765 Ok(())
3766 }
3767 }
3768
3769 struct RejectAssistantPrefillAdapter {
3774 saw_assistant_tail: StdArc<AtomicBool>,
3775 }
3776
3777 struct RejectAssistantPrefillSession {
3778 saw_assistant_tail: StdArc<AtomicBool>,
3779 }
3780
3781 #[async_trait]
3782 impl ModelAdapter for RejectAssistantPrefillAdapter {
3783 type Session = RejectAssistantPrefillSession;
3784
3785 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
3786 Ok(RejectAssistantPrefillSession {
3787 saw_assistant_tail: self.saw_assistant_tail.clone(),
3788 })
3789 }
3790 }
3791
3792 #[async_trait]
3793 impl ModelSession for RejectAssistantPrefillSession {
3794 type Turn = FakeTurn;
3795
3796 async fn begin_turn(
3797 &mut self,
3798 request: TurnRequest,
3799 _cancellation: Option<TurnCancellation>,
3800 ) -> Result<Self::Turn, LoopError> {
3801 if request.transcript.last().map(|item| item.kind) == Some(ItemKind::Assistant) {
3802 self.saw_assistant_tail.store(true, Ordering::SeqCst);
3803 return Err(LoopError::Provider(
3804 "conversation must end with a user message".into(),
3805 ));
3806 }
3807 Ok(FakeTurn {
3808 events: VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3809 model: None,
3810 response_id: None,
3811 finish_reason: FinishReason::Completed,
3812 output_items: vec![Item::text(ItemKind::Assistant, "ok")],
3813 usage: None,
3814 metadata: MetadataMap::new(),
3815 })]),
3816 })
3817 }
3818 }
3819
3820 #[tokio::test]
3828 async fn drive_does_not_dispatch_without_valid_trailing_input() {
3829 let saw_assistant_tail = StdArc::new(AtomicBool::new(false));
3830 let agent = Agent::builder()
3831 .model(RejectAssistantPrefillAdapter {
3832 saw_assistant_tail: saw_assistant_tail.clone(),
3833 })
3834 .mutator(DropTrailingUserMutator)
3835 .transcript(vec![
3838 Item::text(ItemKind::User, "kickoff"),
3839 Item::text(ItemKind::Assistant, "prior reply"),
3840 ])
3841 .build()
3842 .unwrap();
3843
3844 let mut driver = agent
3845 .start(SessionConfig {
3846 session_id: SessionId::new("session-no-valid-input"),
3847 metadata: MetadataMap::new(),
3848 cache: None,
3849 })
3850 .await
3851 .unwrap();
3852
3853 driver
3854 .submit_input(vec![Item::text(ItemKind::User, "follow up")])
3855 .unwrap();
3856
3857 let outcome = driver.next().await;
3859
3860 assert!(
3861 !saw_assistant_tail.load(Ordering::SeqCst),
3862 "loop dispatched a model turn whose transcript ends in an assistant \
3863 message (outcome: {outcome:?}); with no valid trailing input the turn \
3864 must finish instead of driving"
3865 );
3866 }
3867
3868 #[tokio::test]
3869 async fn loop_uses_injected_permission_checker() {
3870 let tools = ToolRegistry::new().with(EchoTool::default());
3871 let agent = Agent::builder()
3872 .model(FakeAdapter)
3873 .add_tool_source(tools)
3874 .permissions(DenyFsReads)
3875 .build()
3876 .unwrap();
3877
3878 let mut driver = agent
3879 .start(SessionConfig {
3880 session_id: SessionId::new("session-2"),
3881 metadata: MetadataMap::new(),
3882 cache: None,
3883 })
3884 .await
3885 .unwrap();
3886
3887 driver
3888 .submit_input(vec![Item {
3889 id: None,
3890 kind: ItemKind::User,
3891 parts: vec![Part::Text(TextPart {
3892 text: "ping".into(),
3893 metadata: MetadataMap::new(),
3894 })],
3895 metadata: MetadataMap::new(),
3896 usage: None,
3897 finish_reason: None,
3898 created_at: None,
3899 }])
3900 .unwrap();
3901
3902 let result = run_until_finished(&mut driver).await;
3903
3904 match result {
3905 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3906 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3907 other => panic!("unexpected part: {other:?}"),
3908 },
3909 other => panic!("unexpected loop step: {other:?}"),
3910 }
3911 }
3912
3913 #[tokio::test]
3914 async fn async_task_manager_background_round_requires_explicit_continue() {
3915 let entered = StdArc::new(AtomicBool::new(false));
3916 let release = StdArc::new(Notify::new());
3917 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3918 "background-wait",
3919 RoutingDecision::Background,
3920 )]));
3921 let handle = task_manager.handle();
3922 let tools = ToolRegistry::new().with(BlockingTool::new(
3923 "background-wait",
3924 entered.clone(),
3925 release.clone(),
3926 "background-done",
3927 ));
3928 let agent = Agent::builder()
3929 .model(FakeAdapter)
3930 .add_tool_source(tools)
3931 .permissions(AllowAllPermissions)
3932 .task_manager(task_manager)
3933 .build()
3934 .unwrap();
3935
3936 let mut driver = agent
3937 .start(SessionConfig {
3938 session_id: SessionId::new("session-background"),
3939 metadata: MetadataMap::new(),
3940 cache: None,
3941 })
3942 .await
3943 .unwrap();
3944
3945 driver
3946 .submit_input(vec![Item {
3947 id: None,
3948 kind: ItemKind::User,
3949 parts: vec![Part::Text(TextPart {
3950 text: "ping".into(),
3951 metadata: MetadataMap::new(),
3952 })],
3953 metadata: MetadataMap::new(),
3954 usage: None,
3955 finish_reason: None,
3956 created_at: None,
3957 }])
3958 .unwrap();
3959
3960 let first = driver.next().await.unwrap();
3961 match first {
3962 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3963 other => panic!("unexpected first loop step: {other:?}"),
3964 }
3965
3966 match wait_for_task_event(&handle).await {
3967 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3968 other => panic!("unexpected task event: {other:?}"),
3969 }
3970 wait_until_entered(entered.as_ref()).await;
3971 release.notify_waiters();
3972
3973 match wait_for_task_event(&handle).await {
3974 TaskEvent::Completed(_, result) => {
3975 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3976 }
3977 other => panic!("unexpected completion event: {other:?}"),
3978 }
3979
3980 let resumed = driver.next().await.unwrap();
3981 match resumed {
3982 LoopStep::Finished(turn) => {
3983 assert_eq!(turn.finish_reason, FinishReason::Completed);
3984 match &turn.items[0].parts[0] {
3985 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3986 other => panic!("unexpected part after resume: {other:?}"),
3987 }
3988 }
3989 other => panic!("unexpected resumed step: {other:?}"),
3990 }
3991 }
3992
3993 #[tokio::test]
3994 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3995 let controller = CancellationController::new();
3996 let agent = Agent::builder()
3997 .model(SlowAdapter)
3998 .cancellation(controller.handle())
3999 .build()
4000 .unwrap();
4001
4002 let mut driver = agent
4003 .start(SessionConfig {
4004 session_id: SessionId::new("session-cancel"),
4005 metadata: MetadataMap::new(),
4006 cache: None,
4007 })
4008 .await
4009 .unwrap();
4010
4011 driver
4012 .submit_input(vec![Item {
4013 id: None,
4014 kind: ItemKind::User,
4015 parts: vec![Part::Text(TextPart {
4016 text: "do the long task".into(),
4017 metadata: MetadataMap::new(),
4018 })],
4019 metadata: MetadataMap::new(),
4020 usage: None,
4021 finish_reason: None,
4022 created_at: None,
4023 }])
4024 .unwrap();
4025
4026 let cancelled = tokio::join!(async { driver.next().await }, async {
4027 tokio::task::yield_now().await;
4028 controller.interrupt();
4029 })
4030 .0
4031 .unwrap();
4032
4033 match cancelled {
4034 LoopStep::Finished(turn) => {
4035 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
4036 assert_eq!(turn.items.len(), 1);
4037 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
4038 assert_eq!(
4039 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
4040 Some(&Value::Bool(true))
4041 );
4042 }
4043 other => panic!("unexpected loop step: {other:?}"),
4044 }
4045
4046 driver
4047 .submit_input(vec![Item {
4048 id: None,
4049 kind: ItemKind::User,
4050 parts: vec![Part::Text(TextPart {
4051 text: "try again".into(),
4052 metadata: MetadataMap::new(),
4053 })],
4054 metadata: MetadataMap::new(),
4055 usage: None,
4056 finish_reason: None,
4057 created_at: None,
4058 }])
4059 .unwrap();
4060
4061 let result = driver.next().await.unwrap();
4062 match result {
4063 LoopStep::Finished(turn) => {
4064 assert_eq!(turn.finish_reason, FinishReason::Completed);
4065 }
4066 other => panic!("unexpected loop step after retry: {other:?}"),
4067 }
4068 }
4069
4070 #[tokio::test]
4071 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
4072 let controller = CancellationController::new();
4073 let fg_entered = StdArc::new(AtomicBool::new(false));
4074 let fg_release = StdArc::new(Notify::new());
4075 let bg_entered = StdArc::new(AtomicBool::new(false));
4076 let bg_release = StdArc::new(Notify::new());
4077 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
4078 ("foreground-wait", RoutingDecision::Foreground),
4079 ("background-wait", RoutingDecision::Background),
4080 ]));
4081 let handle = task_manager.handle();
4082 let tools = ToolRegistry::new()
4083 .with(BlockingTool::new(
4084 "foreground-wait",
4085 fg_entered.clone(),
4086 fg_release,
4087 "foreground-done",
4088 ))
4089 .with(BlockingTool::new(
4090 "background-wait",
4091 bg_entered.clone(),
4092 bg_release.clone(),
4093 "background-done",
4094 ));
4095 let agent = Agent::builder()
4096 .model(MultiToolAdapter)
4097 .add_tool_source(tools)
4098 .permissions(AllowAllPermissions)
4099 .cancellation(controller.handle())
4100 .task_manager(task_manager)
4101 .build()
4102 .unwrap();
4103
4104 let mut driver = agent
4105 .start(SessionConfig {
4106 session_id: SessionId::new("session-mixed-cancel"),
4107 metadata: MetadataMap::new(),
4108 cache: None,
4109 })
4110 .await
4111 .unwrap();
4112
4113 driver
4114 .submit_input(vec![Item {
4115 id: None,
4116 kind: ItemKind::User,
4117 parts: vec![Part::Text(TextPart {
4118 text: "run both".into(),
4119 metadata: MetadataMap::new(),
4120 })],
4121 metadata: MetadataMap::new(),
4122 usage: None,
4123 finish_reason: None,
4124 created_at: None,
4125 }])
4126 .unwrap();
4127
4128 let cancelled = tokio::join!(async { driver.next().await }, async {
4129 let _ = wait_for_task_event(&handle).await;
4130 let _ = wait_for_task_event(&handle).await;
4131 wait_until_entered(fg_entered.as_ref()).await;
4132 wait_until_entered(bg_entered.as_ref()).await;
4133 controller.interrupt();
4134 })
4135 .0
4136 .unwrap();
4137
4138 match cancelled {
4139 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
4140 other => panic!("unexpected loop step after interrupt: {other:?}"),
4141 }
4142
4143 match wait_for_task_event(&handle).await {
4144 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
4145 other => panic!("unexpected post-interrupt event: {other:?}"),
4146 }
4147
4148 let running = handle.list_running().await;
4149 assert_eq!(running.len(), 1);
4150 assert_eq!(running[0].tool_name, "background-wait");
4151
4152 bg_release.notify_waiters();
4153 match wait_for_task_event(&handle).await {
4154 TaskEvent::Completed(snapshot, result) => {
4155 assert_eq!(snapshot.tool_name, "background-wait");
4156 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
4157 }
4158 other => panic!("unexpected background completion event: {other:?}"),
4159 }
4160 }
4161
4162 #[tokio::test]
4163 async fn loop_resumes_after_approved_tool_request() {
4164 let tools = ToolRegistry::new().with(EchoTool::default());
4165 let agent = Agent::builder()
4166 .model(FakeAdapter)
4167 .add_tool_source(tools)
4168 .permissions(ApproveFsReads)
4169 .build()
4170 .unwrap();
4171
4172 let mut driver = agent
4173 .start(SessionConfig {
4174 session_id: SessionId::new("session-approval"),
4175 metadata: MetadataMap::new(),
4176 cache: None,
4177 })
4178 .await
4179 .unwrap();
4180
4181 driver
4182 .submit_input(vec![Item {
4183 id: None,
4184 kind: ItemKind::User,
4185 parts: vec![Part::Text(TextPart {
4186 text: "ping".into(),
4187 metadata: MetadataMap::new(),
4188 })],
4189 metadata: MetadataMap::new(),
4190 usage: None,
4191 finish_reason: None,
4192 created_at: None,
4193 }])
4194 .unwrap();
4195
4196 let first = driver.next().await.unwrap();
4197 match first {
4198 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4199 assert!(pending.request.task_id.is_some());
4200 assert_eq!(pending.request.id.0, "approval:fs-read");
4201 pending.approve(&mut driver).unwrap();
4202 }
4203 other => panic!("unexpected loop step: {other:?}"),
4204 }
4205 let second = driver.next().await.unwrap();
4206 match second {
4207 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4208 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
4209 other => panic!("unexpected part: {other:?}"),
4210 },
4211 other => panic!("unexpected loop step after approval: {other:?}"),
4212 }
4213 }
4214
4215 #[tokio::test]
4216 async fn loop_resumes_with_patched_input_on_approval() {
4217 let tools = ToolRegistry::new().with(EchoTool::default());
4218 let agent = Agent::builder()
4219 .model(FakeAdapter)
4220 .add_tool_source(tools)
4221 .permissions(ApproveFsReads)
4222 .build()
4223 .unwrap();
4224
4225 let mut driver = agent
4226 .start(SessionConfig {
4227 session_id: SessionId::new("session-approval-patched"),
4228 metadata: MetadataMap::new(),
4229 cache: None,
4230 })
4231 .await
4232 .unwrap();
4233
4234 driver
4235 .submit_input(vec![Item {
4236 id: None,
4237 kind: ItemKind::User,
4238 parts: vec![Part::Text(TextPart {
4239 text: "ping".into(),
4240 metadata: MetadataMap::new(),
4241 })],
4242 metadata: MetadataMap::new(),
4243 usage: None,
4244 finish_reason: None,
4245 created_at: None,
4246 }])
4247 .unwrap();
4248
4249 match driver.next().await.unwrap() {
4250 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4251 pending
4252 .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
4253 .unwrap();
4254 }
4255 other => panic!("unexpected loop step: {other:?}"),
4256 }
4257 match driver.next().await.unwrap() {
4258 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4259 Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
4260 other => panic!("unexpected part: {other:?}"),
4261 },
4262 other => panic!("unexpected loop step after approval: {other:?}"),
4263 }
4264 }
4265
4266 #[tokio::test]
4267 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
4268 let tools = ToolRegistry::new().with(EchoTool::default());
4269 let agent = Agent::builder()
4270 .model(DualApprovalAdapter)
4271 .add_tool_source(tools)
4272 .permissions(ApproveFsReads)
4273 .build()
4274 .unwrap();
4275
4276 let mut driver = agent
4277 .start(SessionConfig {
4278 session_id: SessionId::new("session-dual-approval"),
4279 metadata: MetadataMap::new(),
4280 cache: None,
4281 })
4282 .await
4283 .unwrap();
4284
4285 driver
4286 .submit_input(vec![Item {
4287 id: None,
4288 kind: ItemKind::User,
4289 parts: vec![Part::Text(TextPart {
4290 text: "run both approvals".into(),
4291 metadata: MetadataMap::new(),
4292 })],
4293 metadata: MetadataMap::new(),
4294 usage: None,
4295 finish_reason: None,
4296 created_at: None,
4297 }])
4298 .unwrap();
4299
4300 let pending_first = match driver.next().await.unwrap() {
4301 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4302 assert_eq!(
4303 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4304 Some("call-1")
4305 );
4306 pending
4307 }
4308 other => panic!("unexpected first loop step: {other:?}"),
4309 };
4310
4311 let pending_second = match driver.next().await.unwrap() {
4312 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4313 assert_eq!(
4314 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4315 Some("call-2")
4316 );
4317 pending
4318 }
4319 other => panic!("unexpected second loop step: {other:?}"),
4320 };
4321
4322 pending_second.approve(&mut driver).unwrap();
4323 match driver.next().await.unwrap() {
4324 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4325 assert_eq!(
4326 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4327 Some("call-1")
4328 );
4329 }
4330 other => panic!("unexpected step after approving second request: {other:?}"),
4331 }
4332
4333 pending_first.approve(&mut driver).unwrap();
4334 match driver.next().await.unwrap() {
4335 LoopStep::Finished(turn) => {
4336 assert_eq!(turn.finish_reason, FinishReason::Completed);
4337 match &turn.items[0].parts[0] {
4338 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
4339 other => panic!("unexpected final part: {other:?}"),
4340 }
4341 }
4342 other => panic!("unexpected final loop step: {other:?}"),
4343 }
4344 }
4345
4346 #[tokio::test]
4347 async fn loop_compacts_transcript_before_new_turns() {
4348 let events = StdArc::new(StdMutex::new(Vec::new()));
4349 let agent = Agent::builder()
4350 .model(FakeAdapter)
4351 .mutator(KeepRecentMutator { keep: 1 })
4352 .observer(RecordingObserver {
4353 events: events.clone(),
4354 })
4355 .build()
4356 .unwrap();
4357
4358 let mut driver = agent
4359 .start(SessionConfig {
4360 session_id: SessionId::new("session-4"),
4361 metadata: MetadataMap::new(),
4362 cache: None,
4363 })
4364 .await
4365 .unwrap();
4366
4367 for text in ["first", "second"] {
4368 driver
4369 .submit_input(vec![Item {
4370 id: None,
4371 kind: ItemKind::User,
4372 parts: vec![Part::Text(TextPart {
4373 text: text.into(),
4374 metadata: MetadataMap::new(),
4375 })],
4376 metadata: MetadataMap::new(),
4377 usage: None,
4378 finish_reason: None,
4379 created_at: None,
4380 }])
4381 .unwrap();
4382 let _ = driver.next().await.unwrap();
4383 }
4384
4385 let events = events.lock().unwrap();
4386 assert!(
4387 events
4388 .iter()
4389 .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
4390 );
4391 }
4392
4393 #[test]
4394 fn transcript_validation_rejects_orphaned_tool_result() {
4395 let transcript = vec![Item {
4396 id: None,
4397 kind: ItemKind::Tool,
4398 parts: vec![Part::ToolResult(ToolResultPart {
4399 call_id: "call-1".into(),
4400 output: ToolOutput::Text("result".into()),
4401 is_error: false,
4402 metadata: MetadataMap::new(),
4403 })],
4404 metadata: MetadataMap::new(),
4405 usage: None,
4406 finish_reason: None,
4407 created_at: None,
4408 }];
4409
4410 let error = validate_transcript_invariants(&transcript).unwrap_err();
4411 assert!(error.to_string().contains("orphaned tool_result"));
4412 }
4413
4414 #[test]
4415 fn transcript_validation_rejects_duplicate_tool_result() {
4416 let transcript = vec![
4417 Item {
4418 id: None,
4419 kind: ItemKind::Assistant,
4420 parts: vec![Part::ToolCall(ToolCallPart {
4421 id: "call-1".into(),
4422 name: "lookup".into(),
4423 input: serde_json::json!({}),
4424 metadata: MetadataMap::new(),
4425 })],
4426 metadata: MetadataMap::new(),
4427 usage: None,
4428 finish_reason: None,
4429 created_at: None,
4430 },
4431 Item {
4432 id: None,
4433 kind: ItemKind::Tool,
4434 parts: vec![Part::ToolResult(ToolResultPart {
4435 call_id: "call-1".into(),
4436 output: ToolOutput::Text("result".into()),
4437 is_error: false,
4438 metadata: MetadataMap::new(),
4439 })],
4440 metadata: MetadataMap::new(),
4441 usage: None,
4442 finish_reason: None,
4443 created_at: None,
4444 },
4445 Item {
4446 id: None,
4447 kind: ItemKind::Tool,
4448 parts: vec![Part::ToolResult(ToolResultPart {
4449 call_id: "call-1".into(),
4450 output: ToolOutput::Text("again".into()),
4451 is_error: false,
4452 metadata: MetadataMap::new(),
4453 })],
4454 metadata: MetadataMap::new(),
4455 usage: None,
4456 finish_reason: None,
4457 created_at: None,
4458 },
4459 ];
4460
4461 let error = validate_transcript_invariants(&transcript).unwrap_err();
4462 assert!(error.to_string().contains("duplicate tool_result"));
4463 }
4464
4465 #[tokio::test]
4466 async fn loop_refreshes_tool_specs_each_turn() {
4467 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4468 let version = StdArc::new(AtomicUsize::new(1));
4469 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4470 let agent = Agent::builder()
4471 .model(RecordingAdapter {
4472 seen_descriptions: seen_descriptions.clone(),
4473 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4474 })
4475 .add_tool_source(tools)
4476 .permissions(AllowAllPermissions)
4477 .build()
4478 .unwrap();
4479
4480 let mut driver = agent
4481 .start(SessionConfig {
4482 session_id: SessionId::new("session-dynamic-tools"),
4483 metadata: MetadataMap::new(),
4484 cache: None,
4485 })
4486 .await
4487 .unwrap();
4488
4489 for text in ["first", "second"] {
4490 driver
4491 .submit_input(vec![Item {
4492 id: None,
4493 kind: ItemKind::User,
4494 parts: vec![Part::Text(TextPart {
4495 text: text.into(),
4496 metadata: MetadataMap::new(),
4497 })],
4498 metadata: MetadataMap::new(),
4499 usage: None,
4500 finish_reason: None,
4501 created_at: None,
4502 }])
4503 .unwrap();
4504
4505 let _ = driver.next().await.unwrap();
4506 if text == "first" {
4507 version.store(2, Ordering::SeqCst);
4508 }
4509 }
4510
4511 let seen_descriptions = seen_descriptions.lock().unwrap();
4512 assert_eq!(seen_descriptions.len(), 2);
4513 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4514 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4515 }
4516
4517 #[tokio::test]
4518 async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4519 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4520 let events = StdArc::new(StdMutex::new(Vec::new()));
4521 let executor = StdArc::new(CatalogExecutor::new());
4522 let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4523 let agent = Agent::builder()
4524 .model(RecordingAdapter {
4525 seen_descriptions: seen_descriptions.clone(),
4526 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4527 })
4528 .tool_executor(executor_for_agent)
4529 .permissions(AllowAllPermissions)
4530 .observer(RecordingObserver {
4531 events: events.clone(),
4532 })
4533 .build()
4534 .unwrap();
4535
4536 let mut driver = agent
4537 .start(SessionConfig {
4538 session_id: SessionId::new("session-catalog-events"),
4539 metadata: MetadataMap::new(),
4540 cache: None,
4541 })
4542 .await
4543 .unwrap();
4544
4545 driver
4546 .submit_input(vec![Item::text(ItemKind::User, "first")])
4547 .unwrap();
4548 let _ = driver.next().await.unwrap();
4549
4550 executor.publish_change(
4551 1,
4552 ToolCatalogEvent {
4553 source: "mcp:mock".into(),
4554 added: vec!["dynamic".into()],
4555 removed: Vec::new(),
4556 changed: Vec::new(),
4557 },
4558 );
4559
4560 driver
4561 .submit_input(vec![Item::text(ItemKind::User, "second")])
4562 .unwrap();
4563 let _ = driver.next().await.unwrap();
4564
4565 let seen_descriptions = seen_descriptions.lock().unwrap();
4566 assert_eq!(seen_descriptions.len(), 2);
4567 assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4568 assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4569
4570 let events = events.lock().unwrap();
4571 assert!(events.iter().any(|event| matches!(
4572 event,
4573 AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4574 source,
4575 added,
4576 removed,
4577 changed,
4578 }) if source == "mcp:mock"
4579 && added == &vec!["dynamic".to_string()]
4580 && removed.is_empty()
4581 && changed.is_empty()
4582 )));
4583 }
4584
4585 #[tokio::test]
4586 async fn loop_passes_session_default_and_next_turn_cache_requests() {
4587 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4588 let agent = Agent::builder()
4589 .model(RecordingAdapter {
4590 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4591 seen_caches: seen_caches.clone(),
4592 })
4593 .permissions(AllowAllPermissions)
4594 .build()
4595 .unwrap();
4596
4597 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4598 .with_retention(PromptCacheRetention::Short);
4599 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4600 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4601 });
4602
4603 let mut driver = agent
4604 .start(SessionConfig {
4605 session_id: SessionId::new("session-cache"),
4606 metadata: MetadataMap::new(),
4607 cache: Some(default_cache.clone()),
4608 })
4609 .await
4610 .unwrap();
4611
4612 driver
4613 .submit_input(vec![Item {
4614 id: None,
4615 kind: ItemKind::User,
4616 parts: vec![Part::Text(TextPart {
4617 text: "first".into(),
4618 metadata: MetadataMap::new(),
4619 })],
4620 metadata: MetadataMap::new(),
4621 usage: None,
4622 finish_reason: None,
4623 created_at: None,
4624 }])
4625 .unwrap();
4626 let _ = driver.next().await.unwrap();
4627
4628 driver
4629 .submit_input_with_cache(
4630 vec![Item {
4631 id: None,
4632 kind: ItemKind::User,
4633 parts: vec![Part::Text(TextPart {
4634 text: "second".into(),
4635 metadata: MetadataMap::new(),
4636 })],
4637 metadata: MetadataMap::new(),
4638 usage: None,
4639 finish_reason: None,
4640 created_at: None,
4641 }],
4642 override_cache.clone(),
4643 )
4644 .unwrap();
4645 let _ = driver.next().await.unwrap();
4646
4647 let seen = seen_caches.lock().unwrap();
4648 assert_eq!(seen.len(), 2);
4649 assert_eq!(seen[0], Some(default_cache));
4650 assert_eq!(seen[1], Some(override_cache));
4651 }
4652
4653 #[tokio::test]
4654 async fn loop_yields_after_tool_result_between_rounds() {
4655 let tools = ToolRegistry::new().with(EchoTool::default());
4656 let agent = Agent::builder()
4657 .model(FakeAdapter)
4658 .add_tool_source(tools)
4659 .permissions(AllowAllPermissions)
4660 .build()
4661 .unwrap();
4662
4663 let mut driver = agent
4664 .start(SessionConfig {
4665 session_id: SessionId::new("yield-session"),
4666 metadata: MetadataMap::new(),
4667 cache: None,
4668 })
4669 .await
4670 .unwrap();
4671
4672 driver
4673 .submit_input(vec![Item::text(ItemKind::User, "ping")])
4674 .unwrap();
4675
4676 let step = driver.next().await.unwrap();
4679 let info = match step {
4680 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4681 other => panic!("expected AfterToolResult, got {other:?}"),
4682 };
4683 assert_eq!(info.session_id, SessionId::new("yield-session"));
4684 assert_eq!(info.transcript_len, 3);
4686
4687 let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4689 assert!(!interrupt.is_blocking());
4690
4691 driver
4693 .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4694 .unwrap();
4695
4696 let step = driver.next().await.unwrap();
4699 match step {
4700 LoopStep::Finished(turn) => {
4701 assert_eq!(turn.finish_reason, FinishReason::Completed);
4702 }
4703 other => panic!("expected Finished, got {other:?}"),
4704 }
4705
4706 let snapshot = driver.snapshot();
4708 let has_injected_message = snapshot.transcript.iter().any(|item| {
4709 item.kind == ItemKind::User
4710 && item.parts.iter().any(|part| match part {
4711 Part::Text(text) => text.text == "also: report back",
4712 _ => false,
4713 })
4714 });
4715 assert!(
4716 has_injected_message,
4717 "injected user message should be in transcript, got: {:?}",
4718 snapshot.transcript
4719 );
4720 }
4721
4722 struct RecordingTranscriptObserver {
4723 items: StdArc<StdMutex<Vec<Item>>>,
4724 }
4725
4726 impl TranscriptObserver for RecordingTranscriptObserver {
4727 fn on_item_appended(&self, item: &Item) {
4728 self.items.lock().unwrap().push(item.clone());
4729 }
4730 }
4731
4732 #[tokio::test]
4733 async fn observers_see_full_tool_round() {
4734 let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4740 let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4741 let agent = Agent::builder()
4742 .model(FakeAdapter)
4743 .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4744 .permissions(AllowAllPermissions)
4745 .observer(RecordingObserver {
4746 events: events.clone(),
4747 })
4748 .transcript_observer(RecordingTranscriptObserver {
4749 items: items.clone(),
4750 })
4751 .build()
4752 .unwrap();
4753
4754 let mut driver = agent
4755 .start(SessionConfig {
4756 session_id: SessionId::new("observer-session"),
4757 metadata: MetadataMap::new(),
4758 cache: None,
4759 })
4760 .await
4761 .unwrap();
4762
4763 driver
4764 .submit_input(vec![Item {
4765 id: None,
4766 kind: ItemKind::User,
4767 parts: vec![Part::Text(TextPart {
4768 text: "ping".into(),
4769 metadata: MetadataMap::new(),
4770 })],
4771 metadata: MetadataMap::new(),
4772 usage: None,
4773 finish_reason: None,
4774 created_at: None,
4775 }])
4776 .unwrap();
4777
4778 let result = run_until_finished(&mut driver).await;
4779 assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4780
4781 let events = events.lock().unwrap().clone();
4784 let tool_call_id = events.iter().find_map(|e| match e {
4785 AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4786 _ => None,
4787 });
4788 let tool_results: Vec<_> = events
4789 .iter()
4790 .filter_map(|e| match e {
4791 AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4792 _ => None,
4793 })
4794 .collect();
4795 assert_eq!(tool_results.len(), 1, "events: {events:?}");
4796 assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4797 assert!(!tool_results[0].is_error);
4798
4799 let items = items.lock().unwrap().clone();
4803 assert_eq!(items.len(), 4, "items: {items:?}");
4804 assert_eq!(items[0].kind, ItemKind::User);
4805 assert_eq!(items[1].kind, ItemKind::Assistant);
4806 assert!(
4807 items[1]
4808 .parts
4809 .iter()
4810 .any(|p| matches!(p, Part::ToolCall(_)))
4811 );
4812 assert_eq!(items[2].kind, ItemKind::Tool);
4813 assert!(
4814 items[2]
4815 .parts
4816 .iter()
4817 .any(|p| matches!(p, Part::ToolResult(_)))
4818 );
4819 assert_eq!(items[3].kind, ItemKind::Assistant);
4820 }
4821
4822 #[test]
4823 fn convenience_cache_builders_construct_expected_defaults() {
4824 let cache = PromptCacheRequest::automatic()
4825 .with_retention(PromptCacheRetention::Short)
4826 .with_key("workspace:demo");
4827 let session = SessionConfig::new("demo").with_cache(cache.clone());
4828
4829 assert_eq!(session.session_id, SessionId::new("demo"));
4830 assert_eq!(session.cache, Some(cache));
4831
4832 let explicit = PromptCacheRequest::explicit([
4833 PromptCacheBreakpoint::tools_end(),
4834 PromptCacheBreakpoint::transcript_item_end(2),
4835 PromptCacheBreakpoint::transcript_part_end(3, 1),
4836 ]);
4837
4838 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4839 assert_eq!(
4840 explicit.strategy,
4841 PromptCacheStrategy::Explicit {
4842 breakpoints: vec![
4843 PromptCacheBreakpoint::ToolsEnd,
4844 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4845 PromptCacheBreakpoint::TranscriptPartEnd {
4846 item_index: 3,
4847 part_index: 1,
4848 },
4849 ],
4850 }
4851 );
4852 }
4853}