1use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69 TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70 Usage,
71};
72use agentkit_task_manager::{
73 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74 TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79 AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80 PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutor, ToolRequest, ToolResources,
81 ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108 pub session_id: SessionId,
110 pub metadata: MetadataMap,
112 pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117 pub fn new(session_id: impl Into<SessionId>) -> Self {
119 Self {
120 session_id: session_id.into(),
121 metadata: MetadataMap::new(),
122 cache: None,
123 }
124 }
125
126 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128 self.metadata = metadata;
129 self
130 }
131
132 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134 self.cache = Some(cache);
135 self
136 }
137
138 pub fn without_cache(mut self) -> Self {
140 self.cache = None;
141 self
142 }
143}
144
145#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152 Disabled,
154 #[default]
156 BestEffort,
157 Required,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168 Default,
170 Short,
172 Extended,
174}
175
176#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179 #[default]
181 Automatic,
182 Explicit {
184 breakpoints: Vec<PromptCacheBreakpoint>,
186 },
187}
188
189impl PromptCacheStrategy {
190 pub fn automatic() -> Self {
193 Self::Automatic
194 }
195
196 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198 Self::Explicit {
199 breakpoints: breakpoints.into_iter().collect(),
200 }
201 }
202}
203
204#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207 ToolsEnd,
209 TranscriptItemEnd { index: usize },
211 TranscriptPartEnd {
217 item_index: usize,
218 part_index: usize,
219 },
220}
221
222impl PromptCacheBreakpoint {
223 pub fn tools_end() -> Self {
225 Self::ToolsEnd
226 }
227
228 pub fn transcript_item_end(index: usize) -> Self {
230 Self::TranscriptItemEnd { index }
231 }
232
233 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235 Self::TranscriptPartEnd {
236 item_index,
237 part_index,
238 }
239 }
240}
241
242#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245 pub mode: PromptCacheMode,
247 pub strategy: PromptCacheStrategy,
249 pub retention: Option<PromptCacheRetention>,
251 pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256 pub fn automatic() -> Self {
258 Self::best_effort(PromptCacheStrategy::automatic())
259 }
260
261 pub fn automatic_required() -> Self {
263 Self::required(PromptCacheStrategy::automatic())
264 }
265
266 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269 }
270
271 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273 Self::required(PromptCacheStrategy::explicit(breakpoints))
274 }
275
276 pub fn disabled() -> Self {
278 Self {
279 mode: PromptCacheMode::Disabled,
280 strategy: PromptCacheStrategy::Automatic,
281 retention: None,
282 key: None,
283 }
284 }
285
286 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288 Self {
289 mode: PromptCacheMode::BestEffort,
290 strategy,
291 retention: None,
292 key: None,
293 }
294 }
295
296 pub fn required(strategy: PromptCacheStrategy) -> Self {
298 Self {
299 mode: PromptCacheMode::Required,
300 strategy,
301 retention: None,
302 key: None,
303 }
304 }
305
306 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308 self.mode = mode;
309 self
310 }
311
312 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314 self.strategy = strategy;
315 self
316 }
317
318 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320 self.retention = Some(retention);
321 self
322 }
323
324 pub fn with_key(mut self, key: impl Into<String>) -> Self {
326 self.key = Some(key.into());
327 self
328 }
329
330 pub fn without_retention(mut self) -> Self {
332 self.retention = None;
333 self
334 }
335
336 pub fn without_key(mut self) -> Self {
338 self.key = None;
339 self
340 }
341
342 pub fn is_enabled(&self) -> bool {
344 !matches!(self.mode, PromptCacheMode::Disabled)
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355 pub session_id: SessionId,
357 pub turn_id: agentkit_core::TurnId,
359 pub transcript: Vec<Item>,
361 pub available_tools: Vec<ToolSpec>,
363 pub cache: Option<PromptCacheRequest>,
365 pub metadata: MetadataMap,
367}
368
369#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375 pub finish_reason: FinishReason,
377 pub output_items: Vec<Item>,
379 pub usage: Option<Usage>,
381 pub metadata: MetadataMap,
383}
384
385#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub enum ModelTurnEvent {
392 Delta(Delta),
394 ToolCall(ToolCallPart),
396 Usage(Usage),
398 Finished(ModelTurnResult),
400}
401
402#[async_trait]
439pub trait ModelAdapter: Send + Sync {
440 type Session: ModelSession;
442
443 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
449}
450
451#[async_trait]
458pub trait ModelSession: Send {
459 type Turn: ModelTurn;
461
462 async fn begin_turn(
475 &mut self,
476 request: TurnRequest,
477 cancellation: Option<TurnCancellation>,
478 ) -> Result<Self::Turn, LoopError>;
479}
480
481#[async_trait]
487pub trait ModelTurn: Send {
488 async fn next_event(
497 &mut self,
498 cancellation: Option<TurnCancellation>,
499 ) -> Result<Option<ModelTurnEvent>, LoopError>;
500}
501
502pub trait LoopObserver: Send + Sync {
522 fn handle_event(&self, event: AgentEvent);
527}
528
529pub trait TranscriptObserver: Send + Sync {
564 fn on_item_appended(&self, item: &Item);
569}
570
571#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
576#[non_exhaustive]
577pub enum MutationPoint {
578 AfterToolResult,
581 AfterTurnEnded,
584}
585
586pub trait EventEmitter: Send + Sync {
589 fn emit(&self, event: AgentEvent);
591}
592
593#[non_exhaustive]
595pub struct LoopCtx<'a> {
596 pub session_id: &'a SessionId,
598 pub turn_id: Option<&'a agentkit_core::TurnId>,
600 pub point: MutationPoint,
602 pub cancellation: Option<TurnCancellation>,
604 pub emitter: &'a dyn EventEmitter,
606}
607
608pub struct TranscriptCursor<'a> {
616 items: &'a mut Vec<Item>,
617 pub(crate) dirty: bool,
618}
619
620impl<'a> std::ops::Deref for TranscriptCursor<'a> {
621 type Target = Vec<Item>;
622 fn deref(&self) -> &Vec<Item> {
623 self.items
624 }
625}
626
627impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
628 fn deref_mut(&mut self) -> &mut Vec<Item> {
629 self.dirty = true;
630 self.items
631 }
632}
633
634#[async_trait]
642pub trait LoopMutator: Send + Sync {
643 async fn mutate(
648 &self,
649 cursor: &mut TranscriptCursor<'_>,
650 ctx: LoopCtx<'_>,
651 ) -> Result<(), LoopError> {
652 let _ = (cursor, ctx);
653 Ok(())
654 }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
662pub enum AgentEvent {
663 RunStarted { session_id: SessionId },
665 TurnStarted {
667 session_id: SessionId,
668 turn_id: agentkit_core::TurnId,
669 },
670 InputAccepted {
672 session_id: SessionId,
673 items: Vec<Item>,
674 },
675 ContentDelta(Delta),
677 ToolCallRequested(ToolCallPart),
679 ToolResultReceived(ToolResultPart),
691 ApprovalRequired(ApprovalRequest),
693 ApprovalResolved { approved: bool },
695 ToolCatalogChanged(ToolCatalogEvent),
697 MutationStarted {
700 session_id: SessionId,
701 turn_id: Option<agentkit_core::TurnId>,
702 mutator: String,
703 point: MutationPoint,
704 },
705 MutationFinished {
709 session_id: SessionId,
710 turn_id: Option<agentkit_core::TurnId>,
711 mutator: String,
712 dirty: bool,
713 metadata: MetadataMap,
714 },
715 UsageUpdated(Usage),
717 Warning { message: String },
719 RunFailed { message: String },
721 TurnFinished(TurnResult),
723}
724
725#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
747pub struct PendingApproval {
748 pub request: ApprovalRequest,
750}
751
752impl std::ops::Deref for PendingApproval {
753 type Target = ApprovalRequest;
754 fn deref(&self) -> &ApprovalRequest {
755 &self.request
756 }
757}
758
759impl PendingApproval {
760 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
762 let call_id = self
763 .request
764 .call_id
765 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
766 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
767 }
768
769 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
771 let call_id = self
772 .request
773 .call_id
774 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
775 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
776 }
777
778 pub fn deny_with_reason<S: ModelSession>(
780 self,
781 driver: &mut LoopDriver<S>,
782 reason: impl Into<String>,
783 ) -> Result<(), LoopError> {
784 let call_id = self
785 .request
786 .call_id
787 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
788 driver.resolve_approval_for(
789 call_id,
790 ApprovalDecision::Deny {
791 reason: Some(reason.into()),
792 },
793 )
794 }
795
796 pub fn approve_with_patched_input<S: ModelSession>(
806 self,
807 driver: &mut LoopDriver<S>,
808 input: serde_json::Value,
809 ) -> Result<(), LoopError> {
810 let call_id = self
811 .request
812 .call_id
813 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
814 driver.resolve_approval_for_with_patched_input(call_id, input)
815 }
816}
817
818#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
843pub struct InputRequest {
844 pub session_id: SessionId,
846 pub reason: String,
848}
849
850impl InputRequest {
851 pub fn submit<S: ModelSession>(
853 self,
854 driver: &mut LoopDriver<S>,
855 items: Vec<Item>,
856 ) -> Result<(), LoopError> {
857 driver.submit_input(items)
858 }
859}
860
861#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
866pub struct TurnResult {
867 pub turn_id: agentkit_core::TurnId,
869 pub finish_reason: FinishReason,
871 pub items: Vec<Item>,
873 pub usage: Option<Usage>,
875 pub metadata: MetadataMap,
877}
878
879#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
915pub enum LoopInterrupt {
916 ApprovalRequest(PendingApproval),
918 AwaitingInput(InputRequest),
920 AfterToolResult(ToolRoundInfo),
930}
931
932impl LoopInterrupt {
933 pub fn is_blocking(&self) -> bool {
939 matches!(self, LoopInterrupt::ApprovalRequest(_))
940 }
941}
942
943#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
946pub struct ToolRoundInfo {
947 pub session_id: SessionId,
949 pub turn_id: agentkit_core::TurnId,
951 pub transcript_len: usize,
953}
954
955impl ToolRoundInfo {
956 pub fn submit<S: ModelSession>(
959 self,
960 driver: &mut LoopDriver<S>,
961 items: Vec<Item>,
962 ) -> Result<(), LoopError> {
963 driver.submit_input(items)
964 }
965}
966
967#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
995pub enum LoopStep {
996 Interrupt(LoopInterrupt),
998 Finished(TurnResult),
1000}
1001
1002#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1008pub struct LoopSnapshot {
1009 pub session_id: SessionId,
1011 pub transcript: Vec<Item>,
1013 pub pending_input: Vec<Item>,
1015}
1016
1017#[derive(Clone, Debug)]
1018struct PendingApprovalToolCall {
1019 request: ApprovalRequest,
1020 decision: Option<ApprovalDecision>,
1021 surfaced: bool,
1022 turn_id: agentkit_core::TurnId,
1023 task_id: TaskId,
1024 call: ToolCallPart,
1025 tool_request: ToolRequest,
1026}
1027
1028#[derive(Clone, Debug, Default)]
1029struct ActiveToolRound {
1030 turn_id: agentkit_core::TurnId,
1031 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1032 background_pending: bool,
1033 foreground_progressed: bool,
1034}
1035
1036pub struct Agent<M>
1078where
1079 M: ModelAdapter,
1080{
1081 model: M,
1082 tool_sources: Vec<Arc<dyn ToolSource>>,
1083 tool_executor: Option<Arc<dyn ToolExecutor>>,
1084 task_manager: Arc<dyn TaskManager>,
1085 permissions: Arc<dyn PermissionChecker>,
1086 resources: Arc<dyn ToolResources>,
1087 cancellation: Option<CancellationHandle>,
1088 mutators: Vec<Arc<dyn LoopMutator>>,
1089 observers: Vec<Arc<dyn LoopObserver>>,
1090 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1091 transcript: Vec<Item>,
1092 input: Vec<Item>,
1093}
1094
1095impl<M> Agent<M>
1096where
1097 M: ModelAdapter,
1098{
1099 pub fn builder() -> AgentBuilder<M> {
1101 AgentBuilder::default()
1102 }
1103
1104 pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1120 let session_id = config.session_id.clone();
1121 let default_cache = config.cache.clone();
1122 let session = self.model.start_session(config).await?;
1123 let tool_executor = self
1124 .tool_executor
1125 .clone()
1126 .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1127 let driver = LoopDriver {
1128 session_id: session_id.clone(),
1129 default_cache,
1130 next_turn_cache: None,
1131 session: Some(session),
1132 tool_executor,
1133 task_manager: self.task_manager.clone(),
1134 permissions: self.permissions.clone(),
1135 resources: self.resources.clone(),
1136 cancellation: self.cancellation.clone(),
1137 mutators: self.mutators.clone(),
1138 observers: self.observers.clone(),
1139 transcript_observers: self.transcript_observers.clone(),
1140 transcript: self.transcript.clone(),
1141 pending_input: self.input.clone(),
1142 pending_approvals: BTreeMap::new(),
1143 pending_approval_order: VecDeque::new(),
1144 active_tool_round: None,
1145 pending_round_resume: None,
1146 next_turn_index: 1,
1147 detached_call_ids: HashSet::new(),
1148 };
1149 driver.emit(AgentEvent::RunStarted { session_id });
1150 Ok(driver)
1151 }
1152}
1153
1154pub struct AgentBuilder<M>
1160where
1161 M: ModelAdapter,
1162{
1163 model: Option<M>,
1164 tool_sources: Vec<Arc<dyn ToolSource>>,
1165 tool_executor: Option<Arc<dyn ToolExecutor>>,
1166 task_manager: Option<Arc<dyn TaskManager>>,
1167 permissions: Arc<dyn PermissionChecker>,
1168 resources: Arc<dyn ToolResources>,
1169 cancellation: Option<CancellationHandle>,
1170 mutators: Vec<Arc<dyn LoopMutator>>,
1171 observers: Vec<Arc<dyn LoopObserver>>,
1172 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1173 transcript: Vec<Item>,
1174 input: Vec<Item>,
1175}
1176
1177impl<M> Default for AgentBuilder<M>
1178where
1179 M: ModelAdapter,
1180{
1181 fn default() -> Self {
1182 Self {
1183 model: None,
1184 tool_sources: Vec::new(),
1185 tool_executor: None,
1186 task_manager: None,
1187 permissions: Arc::new(AllowAllPermissions),
1188 resources: Arc::new(()),
1189 cancellation: None,
1190 mutators: Vec::new(),
1191 observers: Vec::new(),
1192 transcript_observers: Vec::new(),
1193 transcript: Vec::new(),
1194 input: Vec::new(),
1195 }
1196 }
1197}
1198
1199impl<M> AgentBuilder<M>
1200where
1201 M: ModelAdapter,
1202{
1203 pub fn model(mut self, model: M) -> Self {
1205 self.model = Some(model);
1206 self
1207 }
1208
1209 pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1222 self.tool_sources.push(Arc::new(source));
1223 self
1224 }
1225
1226 pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1232 self.tool_executor = Some(Arc::new(executor));
1233 self
1234 }
1235
1236 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1241 self.task_manager = Some(Arc::new(manager));
1242 self
1243 }
1244
1245 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1249 self.permissions = Arc::new(permissions);
1250 self
1251 }
1252
1253 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1255 self.resources = Arc::new(resources);
1256 self
1257 }
1258
1259 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1261 self.cancellation = Some(handle);
1262 self
1263 }
1264
1265 pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1273 self.mutators.push(Arc::new(mutator));
1274 self
1275 }
1276
1277 pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1281 self.observers.push(Arc::new(observer));
1282 self
1283 }
1284
1285 pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1294 self.transcript_observers.push(Arc::new(observer));
1295 self
1296 }
1297
1298 pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1306 self.transcript = transcript;
1307 self
1308 }
1309
1310 pub fn input(mut self, input: Vec<Item>) -> Self {
1319 self.input = input;
1320 self
1321 }
1322
1323 pub fn build(self) -> Result<Agent<M>, LoopError> {
1329 let model = self
1330 .model
1331 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1332 Ok(Agent {
1333 model,
1334 tool_sources: self.tool_sources,
1335 tool_executor: self.tool_executor,
1336 task_manager: self
1337 .task_manager
1338 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1339 permissions: self.permissions,
1340 resources: self.resources,
1341 cancellation: self.cancellation,
1342 mutators: self.mutators,
1343 observers: self.observers,
1344 transcript_observers: self.transcript_observers,
1345 transcript: self.transcript,
1346 input: self.input,
1347 })
1348 }
1349}
1350
1351pub struct LoopDriver<S>
1382where
1383 S: ModelSession,
1384{
1385 session_id: SessionId,
1386 default_cache: Option<PromptCacheRequest>,
1387 next_turn_cache: Option<PromptCacheRequest>,
1388 session: Option<S>,
1389 tool_executor: Arc<dyn ToolExecutor>,
1390 task_manager: Arc<dyn TaskManager>,
1391 permissions: Arc<dyn PermissionChecker>,
1392 resources: Arc<dyn ToolResources>,
1393 cancellation: Option<CancellationHandle>,
1394 mutators: Vec<Arc<dyn LoopMutator>>,
1395 observers: Vec<Arc<dyn LoopObserver>>,
1396 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1397 transcript: Vec<Item>,
1398 pending_input: Vec<Item>,
1399 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1400 pending_approval_order: VecDeque<ToolCallId>,
1401 active_tool_round: Option<ActiveToolRound>,
1402 pending_round_resume: Option<agentkit_core::TurnId>,
1403 next_turn_index: u64,
1404 detached_call_ids: HashSet<ToolCallId>,
1412}
1413
1414impl<S> LoopDriver<S>
1415where
1416 S: ModelSession,
1417{
1418 fn execute_tool_span(
1419 &self,
1420 request: &ToolRequest,
1421 turn_id: &agentkit_core::TurnId,
1422 launch_kind: &'static str,
1423 ) -> tracing::Span {
1424 tracing::info_span!(
1425 "agent.execute_tool",
1426 "gen_ai.tool.name" = %request.tool_name,
1427 "gen_ai.tool.call.id" = %request.call_id,
1428 session.id = %self.session_id,
1429 turn.id = %turn_id,
1430 launch_kind = launch_kind,
1431 )
1432 }
1433
1434 fn start_task_via_manager(
1435 &self,
1436 task_id: Option<TaskId>,
1437 tool_request: ToolRequest,
1438 kind: TaskLaunchKind,
1439 cancellation: Option<TurnCancellation>,
1440 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1441 {
1442 let task_manager = self.task_manager.clone();
1443 let tool_executor = self.tool_executor.clone();
1444 let permissions = self.permissions.clone();
1445 let resources = self.resources.clone();
1446 let session_id = self.session_id.clone();
1447 let turn_id = tool_request.turn_id.clone();
1448 let metadata = tool_request.metadata.clone();
1449
1450 async move {
1451 task_manager
1452 .start_task(
1453 TaskLaunchRequest {
1454 task_id,
1455 request: tool_request.clone(),
1456 kind,
1457 },
1458 TaskStartContext {
1459 executor: tool_executor,
1460 tool_context: OwnedToolContext {
1461 session_id,
1462 turn_id,
1463 metadata,
1464 permissions,
1465 resources,
1466 cancellation,
1467 },
1468 },
1469 )
1470 .await
1471 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1472 }
1473 }
1474
1475 fn has_pending_interrupts(&self) -> bool {
1476 !self.pending_approvals.is_empty()
1477 }
1478
1479 fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1480 for event in events {
1481 self.emit(AgentEvent::ToolCatalogChanged(event));
1482 }
1483 }
1484
1485 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1486 let call_id = task.tool_request.call_id.clone();
1487 let call = ToolCallPart {
1488 id: call_id.clone(),
1489 name: task.tool_request.tool_name.to_string(),
1490 input: task.tool_request.input.clone(),
1491 metadata: task.tool_request.metadata.clone(),
1492 };
1493 let mut request = task.approval;
1494 request.call_id = Some(call_id.clone());
1495 let pending = PendingApprovalToolCall {
1496 request: request.clone(),
1497 decision: None,
1498 surfaced: false,
1499 turn_id: turn_id.clone(),
1500 task_id: task.task_id,
1501 call,
1502 tool_request: task.tool_request,
1503 };
1504 self.pending_approvals.insert(call_id.clone(), pending);
1505 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1506 self.pending_approval_order.push_back(call_id);
1507 }
1508 self.emit(AgentEvent::ApprovalRequired(request));
1509 }
1510
1511 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1512 for call_id in self.pending_approval_order.clone() {
1513 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1514 continue;
1515 };
1516 if pending.decision.is_none() && !pending.surfaced {
1517 pending.surfaced = true;
1518 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1519 PendingApproval {
1520 request: pending.request.clone(),
1521 },
1522 )));
1523 }
1524 }
1525 None
1526 }
1527
1528 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1529 self.pending_approval_order.iter().find_map(|call_id| {
1530 self.pending_approvals.get(call_id).and_then(|pending| {
1531 pending.decision.is_none().then(|| {
1532 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1533 request: pending.request.clone(),
1534 }))
1535 })
1536 })
1537 })
1538 }
1539
1540 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1541 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1542 self.pending_approvals
1543 .get(call_id)
1544 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1545 })?;
1546 self.pending_approval_order.retain(|id| id != &call_id);
1547 self.pending_approvals.remove(&call_id)
1548 }
1549
1550 fn queue_resolution_interrupt(
1551 &mut self,
1552 turn_id: &agentkit_core::TurnId,
1553 resolution: TaskResolution,
1554 ) -> Option<LoopStep> {
1555 match resolution {
1556 TaskResolution::Item(item) => {
1557 self.append_tool_result_item(item);
1558 None
1559 }
1560 TaskResolution::Approval(task) => {
1561 self.enqueue_pending_approval(turn_id, task);
1562 self.take_next_unsurfaced_approval_interrupt()
1563 }
1564 }
1565 }
1566
1567 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1568 let PendingLoopUpdates { mut resolutions } = self
1569 .task_manager
1570 .take_pending_loop_updates()
1571 .await
1572 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1573 let mut saw_items = false;
1574 while let Some(resolution) = resolutions.pop_front() {
1575 match resolution {
1576 TaskResolution::Item(item) => {
1577 self.append_tool_result_item(item);
1578 saw_items = true;
1579 }
1580 TaskResolution::Approval(task) => {
1581 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1582 }
1583 }
1584 }
1585 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1586 }
1587
1588 async fn run_mutators(
1589 &mut self,
1590 point: MutationPoint,
1591 turn_id: Option<&agentkit_core::TurnId>,
1592 cancellation: Option<TurnCancellation>,
1593 ) -> Result<(), LoopError> {
1594 if self.mutators.is_empty() {
1595 return Ok(());
1596 }
1597 if cancellation
1598 .as_ref()
1599 .is_some_and(TurnCancellation::is_cancelled)
1600 {
1601 return Err(LoopError::Cancelled);
1602 }
1603 let mutators = self.mutators.clone();
1604 let session_id = self.session_id.clone();
1605 let observers = self.observers.clone();
1606 let emitter = DriverEmitter {
1607 observers: &observers,
1608 };
1609 let mut cursor = TranscriptCursor {
1610 items: &mut self.transcript,
1611 dirty: false,
1612 };
1613 for mutator in &mutators {
1614 if cancellation
1615 .as_ref()
1616 .is_some_and(TurnCancellation::is_cancelled)
1617 {
1618 return Err(LoopError::Cancelled);
1619 }
1620 let ctx = LoopCtx {
1621 session_id: &session_id,
1622 turn_id,
1623 point,
1624 cancellation: cancellation.clone(),
1625 emitter: &emitter,
1626 };
1627 mutator.mutate(&mut cursor, ctx).await?;
1628 }
1629 if cursor.dirty {
1630 validate_transcript_invariants(cursor.items)?;
1631 }
1632 Ok(())
1633 }
1634
1635 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1636 let Some(_) = self.active_tool_round.as_ref() else {
1637 return Ok(None);
1638 };
1639 loop {
1640 let cancellation = self
1641 .cancellation
1642 .as_ref()
1643 .map(CancellationHandle::checkpoint);
1644 let turn_id = self
1645 .active_tool_round
1646 .as_ref()
1647 .map(|active| active.turn_id.clone())
1648 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1649
1650 if cancellation
1651 .as_ref()
1652 .is_some_and(TurnCancellation::is_cancelled)
1653 {
1654 self.task_manager
1655 .on_turn_interrupted(&turn_id)
1656 .await
1657 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1658 self.active_tool_round = None;
1659 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1660 }
1661
1662 let next_call = self
1663 .active_tool_round
1664 .as_mut()
1665 .and_then(|active| active.pending_calls.pop_front());
1666 if let Some((_call, tool_request)) = next_call {
1667 use tracing::Instrument;
1668 let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1669 match self
1670 .start_task_via_manager(
1671 None,
1672 tool_request.clone(),
1673 TaskLaunchKind::Plain,
1674 cancellation.clone(),
1675 )
1676 .instrument(dispatch_span)
1677 .await?
1678 {
1679 TaskStartOutcome::Ready(resolution) => {
1680 let resolution = *resolution;
1681 match resolution {
1682 TaskResolution::Item(item) => {
1683 if let Some(active) = self.active_tool_round.as_mut() {
1684 active.foreground_progressed = true;
1685 }
1686 self.append_tool_result_item(item);
1687 }
1688 TaskResolution::Approval(task) => {
1689 self.enqueue_pending_approval(&turn_id, task);
1690 }
1691 }
1692 continue;
1693 }
1694 TaskStartOutcome::Pending { kind, .. } => {
1695 if kind == agentkit_task_manager::TaskKind::Background
1696 && let Some(active) = self.active_tool_round.as_mut()
1697 {
1698 active.background_pending = true;
1699 }
1700 continue;
1701 }
1702 }
1703 }
1704
1705 match self
1706 .task_manager
1707 .wait_for_turn(&turn_id, cancellation.clone())
1708 .await
1709 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1710 {
1711 Some(TurnTaskUpdate::Resolution(resolution)) => {
1712 let resolution = *resolution;
1713 match resolution {
1714 TaskResolution::Item(item) => {
1715 if let Some(active) = self.active_tool_round.as_mut() {
1716 active.foreground_progressed = true;
1717 }
1718 self.append_tool_result_item(item);
1719 }
1720 TaskResolution::Approval(task) => {
1721 self.enqueue_pending_approval(&turn_id, task);
1722 }
1723 }
1724 }
1725 Some(TurnTaskUpdate::Detached(snapshot)) => {
1726 let detached_call_id = snapshot.call_id.clone();
1744 self.append_tool_result_item(Item {
1745 id: None,
1746 kind: ItemKind::Tool,
1747 parts: vec![Part::ToolResult(ToolResultPart {
1748 call_id: detached_call_id.clone(),
1749 output: ToolOutput::Text(format!(
1750 "Tool {} is now running in the background. \
1751 The result will be delivered when it completes.",
1752 snapshot.tool_name,
1753 )),
1754 is_error: false,
1755 metadata: MetadataMap::new(),
1756 })],
1757 metadata: MetadataMap::new(),
1758 usage: None,
1759 finish_reason: None,
1760 created_at: None,
1761 });
1762 self.detached_call_ids.insert(detached_call_id);
1763 if let Some(active) = self.active_tool_round.as_mut() {
1764 active.background_pending = true;
1765 active.foreground_progressed = true;
1766 }
1767 }
1768 None => {
1769 if cancellation
1770 .as_ref()
1771 .is_some_and(TurnCancellation::is_cancelled)
1772 {
1773 self.task_manager
1774 .on_turn_interrupted(&turn_id)
1775 .await
1776 .map_err(|error| {
1777 LoopError::Tool(ToolError::Internal(error.to_string()))
1778 })?;
1779 self.active_tool_round = None;
1780 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1781 }
1782 let active = self.active_tool_round.take().ok_or_else(|| {
1783 LoopError::InvalidState("missing active tool round".into())
1784 })?;
1785 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1786 return Ok(Some(step));
1787 }
1788 if let Some(step) = self.next_unresolved_approval_interrupt() {
1789 return Ok(Some(step));
1790 }
1791 if active.background_pending && !active.foreground_progressed {
1792 return Ok(None);
1793 }
1794 let info = ToolRoundInfo {
1801 session_id: self.session_id.clone(),
1802 turn_id: turn_id.clone(),
1803 transcript_len: self.transcript.len(),
1804 };
1805 self.pending_round_resume = Some(turn_id);
1806 return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1807 info,
1808 ))));
1809 }
1810 }
1811 }
1812 }
1813
1814 #[tracing::instrument(
1815 name = "agent.turn",
1816 skip_all,
1817 fields(
1818 session.id = %self.session_id,
1819 turn.id = %turn_id,
1820 transcript.len = self.transcript.len(),
1821 saw_tool_call = tracing::field::Empty,
1822 finish_reason = tracing::field::Empty,
1823 ),
1824 )]
1825 async fn drive_turn(
1826 &mut self,
1827 turn_id: agentkit_core::TurnId,
1828 emit_started: bool,
1829 ) -> Result<LoopStep, LoopError> {
1830 let cancellation = self
1831 .cancellation
1832 .as_ref()
1833 .map(CancellationHandle::checkpoint);
1834 let mutation_point = if self.pending_round_resume.is_some() {
1835 MutationPoint::AfterToolResult
1836 } else {
1837 MutationPoint::AfterTurnEnded
1838 };
1839 match self
1840 .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1841 .await
1842 {
1843 Ok(()) => {}
1844 Err(LoopError::Cancelled) => {
1845 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1846 }
1847 Err(error) => return Err(error),
1848 }
1849 if emit_started {
1850 self.emit(AgentEvent::TurnStarted {
1851 session_id: self.session_id.clone(),
1852 turn_id: turn_id.clone(),
1853 });
1854 }
1855 if cancellation
1856 .as_ref()
1857 .is_some_and(TurnCancellation::is_cancelled)
1858 {
1859 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1860 }
1861
1862 let catalog_events = self.tool_executor.drain_catalog_events();
1863 self.emit_tool_catalog_events(catalog_events);
1864
1865 let request = TurnRequest {
1866 session_id: self.session_id.clone(),
1867 turn_id: turn_id.clone(),
1868 transcript: self.transcript.clone(),
1869 available_tools: self.tool_executor.specs(),
1870 cache: self
1871 .next_turn_cache
1872 .take()
1873 .or_else(|| self.default_cache.clone()),
1874 metadata: MetadataMap::new(),
1875 };
1876
1877 let session = self
1878 .session
1879 .as_mut()
1880 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1881 let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1882 Ok(turn) => turn,
1883 Err(LoopError::Cancelled) => {
1884 self.task_manager
1885 .on_turn_interrupted(&turn_id)
1886 .await
1887 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1888 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1889 }
1890 Err(error) => return Err(error),
1891 };
1892 let mut saw_tool_call = false;
1893 let mut finished_result = None;
1894
1895 while let Some(event) = match turn.next_event(cancellation.clone()).await {
1896 Ok(event) => event,
1897 Err(LoopError::Cancelled) => {
1898 self.task_manager
1899 .on_turn_interrupted(&turn_id)
1900 .await
1901 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1902 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1903 }
1904 Err(error) => return Err(error),
1905 } {
1906 if cancellation
1907 .as_ref()
1908 .is_some_and(TurnCancellation::is_cancelled)
1909 {
1910 self.task_manager
1911 .on_turn_interrupted(&turn_id)
1912 .await
1913 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1914 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1915 }
1916 match event {
1917 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1918 ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1919 ModelTurnEvent::ToolCall(call) => {
1920 saw_tool_call = true;
1921 self.emit(AgentEvent::ToolCallRequested(call.clone()));
1922 }
1923 ModelTurnEvent::Finished(result) => {
1924 finished_result = Some(result);
1925 break;
1926 }
1927 }
1928 }
1929
1930 let mut result = finished_result.ok_or_else(|| {
1931 LoopError::Provider("model turn ended without a Finished event".into())
1932 })?;
1933 tracing::Span::current().record("saw_tool_call", saw_tool_call);
1934 tracing::Span::current().record(
1935 "finish_reason",
1936 tracing::field::debug(&result.finish_reason),
1937 );
1938 let now = Timestamp::now();
1939 let usage = result.usage.clone();
1940 let finish_reason = result.finish_reason.clone();
1941 let output_items: Vec<Item> = result
1942 .output_items
1943 .drain(..)
1944 .map(|mut item| {
1945 if matches!(item.kind, ItemKind::Assistant) {
1946 if item.usage.is_none() {
1947 item.usage = usage.clone();
1948 }
1949 if item.finish_reason.is_none() {
1950 item.finish_reason = Some(finish_reason.clone());
1951 }
1952 }
1953 if item.created_at.is_none() {
1954 item.created_at = Some(now);
1955 }
1956 item
1957 })
1958 .collect();
1959 self.extend_transcript(output_items.clone());
1960
1961 if saw_tool_call {
1962 let pending_calls = extract_tool_calls(&output_items)
1963 .into_iter()
1964 .map(|call| {
1965 let tool_request = ToolRequest {
1966 call_id: call.id.clone(),
1967 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1968 input: call.input.clone(),
1969 session_id: self.session_id.clone(),
1970 turn_id: turn_id.clone(),
1971 metadata: call.metadata.clone(),
1972 };
1973 (call, tool_request)
1974 })
1975 .collect();
1976 self.active_tool_round = Some(ActiveToolRound {
1977 turn_id: turn_id.clone(),
1978 pending_calls,
1979 background_pending: false,
1980 foreground_progressed: false,
1981 });
1982 if let Some(step) = self.continue_active_tool_round().await? {
1983 return Ok(step);
1984 }
1985 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1986 InputRequest {
1987 session_id: self.session_id.clone(),
1988 reason: "driver is waiting for input".into(),
1989 },
1990 )));
1991 }
1992
1993 let turn_result = TurnResult {
1994 turn_id,
1995 finish_reason: result.finish_reason,
1996 items: output_items,
1997 usage: result.usage,
1998 metadata: result.metadata,
1999 };
2000 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2001 Ok(LoopStep::Finished(turn_result))
2002 }
2003
2004 async fn resume_after_approval(
2005 &mut self,
2006 pending: PendingApprovalToolCall,
2007 ) -> Result<LoopStep, LoopError> {
2008 let decision = pending
2009 .decision
2010 .clone()
2011 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2012
2013 match decision {
2014 ApprovalDecision::Approve => {
2015 use tracing::Instrument;
2016 let dispatch_span =
2017 self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2018 match self
2019 .start_task_via_manager(
2020 Some(pending.task_id.clone()),
2021 pending.tool_request.clone(),
2022 TaskLaunchKind::Approved(pending.request.clone()),
2023 self.cancellation
2024 .as_ref()
2025 .map(CancellationHandle::checkpoint),
2026 )
2027 .instrument(dispatch_span)
2028 .await?
2029 {
2030 TaskStartOutcome::Ready(resolution) => {
2031 let resolution = *resolution;
2032 if let Some(step) =
2033 self.queue_resolution_interrupt(&pending.turn_id, resolution)
2034 {
2035 return Ok(step);
2036 }
2037 }
2038 TaskStartOutcome::Pending { .. } => {}
2039 }
2040 }
2041 ApprovalDecision::Deny { reason } => {
2042 self.append_tool_result_item(Item {
2043 id: None,
2044 kind: ItemKind::Tool,
2045 parts: vec![Part::ToolResult(ToolResultPart {
2046 call_id: pending.call.id.clone(),
2047 output: ToolOutput::Text(
2048 reason.unwrap_or_else(|| "approval denied".into()),
2049 ),
2050 is_error: true,
2051 metadata: pending.call.metadata.clone(),
2052 })],
2053 metadata: MetadataMap::new(),
2054 usage: None,
2055 finish_reason: None,
2056 created_at: None,
2057 });
2058 }
2059 }
2060
2061 if let Some(step) = self.continue_active_tool_round().await? {
2062 Ok(step)
2063 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2064 Ok(step)
2065 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2066 Ok(step)
2067 } else {
2068 self.drive_turn(pending.turn_id, false).await
2069 }
2070 }
2071
2072 fn finish_cancelled(
2073 &mut self,
2074 turn_id: agentkit_core::TurnId,
2075 items: Vec<Item>,
2076 ) -> Result<LoopStep, LoopError> {
2077 self.extend_transcript(items.clone());
2078 let turn_result = TurnResult {
2079 turn_id,
2080 finish_reason: FinishReason::Cancelled,
2081 items,
2082 usage: None,
2083 metadata: interrupted_metadata("turn"),
2084 };
2085 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2086 Ok(LoopStep::Finished(turn_result))
2087 }
2088
2089 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2099 if self.has_pending_interrupts() {
2100 return Err(LoopError::InvalidState(
2101 "cannot submit input while an interrupt is pending".into(),
2102 ));
2103 }
2104 self.emit(AgentEvent::InputAccepted {
2105 session_id: self.session_id.clone(),
2106 items: input.clone(),
2107 });
2108 self.pending_input.extend(input);
2109 Ok(())
2110 }
2111
2112 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2117 if self.has_pending_interrupts() {
2118 return Err(LoopError::InvalidState(
2119 "cannot update next-turn cache while an interrupt is pending".into(),
2120 ));
2121 }
2122 self.next_turn_cache = Some(cache);
2123 Ok(())
2124 }
2125
2126 #[cfg(test)]
2127 pub(crate) fn submit_input_with_cache(
2128 &mut self,
2129 input: Vec<Item>,
2130 cache: PromptCacheRequest,
2131 ) -> Result<(), LoopError> {
2132 self.set_next_turn_cache(cache)?;
2133 self.submit_input(input)
2134 }
2135
2136 pub fn resolve_approval_for(
2146 &mut self,
2147 call_id: ToolCallId,
2148 decision: ApprovalDecision,
2149 ) -> Result<(), LoopError> {
2150 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2151 return Err(LoopError::InvalidState(format!(
2152 "no approval request is pending for call {}",
2153 call_id.0
2154 )));
2155 };
2156 pending.decision = Some(decision.clone());
2157 self.emit(AgentEvent::ApprovalResolved {
2158 approved: matches!(decision, ApprovalDecision::Approve),
2159 });
2160 Ok(())
2161 }
2162
2163 pub fn resolve_approval_for_with_patched_input(
2176 &mut self,
2177 call_id: ToolCallId,
2178 input: serde_json::Value,
2179 ) -> Result<(), LoopError> {
2180 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2181 return Err(LoopError::InvalidState(format!(
2182 "no approval request is pending for call {}",
2183 call_id.0
2184 )));
2185 };
2186 pending.tool_request.input = input;
2187 self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2188 }
2189
2190 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2193 let mut unresolved = self
2194 .pending_approval_order
2195 .iter()
2196 .filter(|call_id| {
2197 self.pending_approvals
2198 .get(*call_id)
2199 .is_some_and(|pending| pending.decision.is_none())
2200 })
2201 .cloned();
2202 let Some(call_id) = unresolved.next() else {
2203 return Err(LoopError::InvalidState(
2204 "no approval request is pending".into(),
2205 ));
2206 };
2207 if unresolved.next().is_some() {
2208 return Err(LoopError::InvalidState(
2209 "multiple approvals are pending; use resolve_approval_for".into(),
2210 ));
2211 }
2212 self.resolve_approval_for(call_id, decision)
2213 }
2214
2215 pub fn snapshot(&self) -> LoopSnapshot {
2217 LoopSnapshot {
2218 session_id: self.session_id.clone(),
2219 transcript: self.transcript.clone(),
2220 pending_input: self.pending_input.clone(),
2221 }
2222 }
2223
2224 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2245 if let Some(pending) = self.take_next_resolved_approval() {
2246 return self.resume_after_approval(pending).await;
2247 }
2248
2249 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2250 return Ok(step);
2251 }
2252
2253 if let Some(step) = self.next_unresolved_approval_interrupt() {
2254 return Ok(step);
2255 }
2256
2257 if let Some(step) = self.continue_active_tool_round().await? {
2258 return Ok(step);
2259 }
2260
2261 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2262 if let Some(step) = loop_step {
2263 return Ok(step);
2264 }
2265
2266 if let Some(turn_id) = self.pending_round_resume.take() {
2271 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2272 self.extend_transcript(drained);
2273 return self.drive_turn(turn_id, false).await;
2274 }
2275
2276 if self.pending_input.is_empty() && !had_loop_updates {
2277 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2278 InputRequest {
2279 session_id: self.session_id.clone(),
2280 reason: "driver is waiting for input".into(),
2281 },
2282 )));
2283 }
2284
2285 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2286 self.next_turn_index += 1;
2287 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2288 self.extend_transcript(drained);
2289 self.drive_turn(turn_id, true).await
2290 }
2291
2292 fn emit(&self, event: AgentEvent) {
2293 for observer in &self.observers {
2294 observer.handle_event(event.clone());
2295 }
2296 }
2297
2298 fn append_item(&mut self, mut item: Item) {
2303 if item.created_at.is_none() {
2304 item.created_at = Some(Timestamp::now());
2305 }
2306 for observer in &self.transcript_observers {
2307 observer.on_item_appended(&item);
2308 }
2309 self.transcript.push(item);
2310 }
2311
2312 fn append_tool_result_item(&mut self, item: Item) {
2325 for part in &item.parts {
2326 if let Part::ToolResult(result) = part {
2327 self.emit(AgentEvent::ToolResultReceived(result.clone()));
2328 }
2329 }
2330 let item = self.maybe_convert_detached(item);
2331 self.append_item(item);
2332 }
2333
2334 fn maybe_convert_detached(&mut self, item: Item) -> Item {
2335 if !matches!(item.kind, ItemKind::Tool) {
2336 return item;
2337 }
2338 let results: Vec<&ToolResultPart> = item
2339 .parts
2340 .iter()
2341 .filter_map(|p| match p {
2342 Part::ToolResult(r) => Some(r),
2343 _ => None,
2344 })
2345 .collect();
2346 if results.is_empty()
2347 || !results
2348 .iter()
2349 .all(|r| self.detached_call_ids.contains(&r.call_id))
2350 {
2351 return item;
2352 }
2353 let mut text = String::new();
2354 for result in &results {
2355 self.detached_call_ids.remove(&result.call_id);
2356 if !text.is_empty() {
2357 text.push_str("\n\n");
2358 }
2359 let label = if result.is_error {
2360 "failed"
2361 } else {
2362 "completed"
2363 };
2364 let body = render_tool_output_brief(&result.output);
2365 text.push_str(&format!(
2366 "Background tool call {} {}: {body}",
2367 result.call_id.0, label
2368 ));
2369 }
2370 Item::notification(text)
2371 }
2372
2373 fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2377 let now = Timestamp::now();
2378 for mut item in items {
2379 if item.created_at.is_none() {
2380 item.created_at = Some(now);
2381 }
2382 self.append_item(item);
2383 }
2384 }
2385}
2386
2387fn render_tool_output_brief(output: &ToolOutput) -> String {
2388 match output {
2389 ToolOutput::Text(t) => t.clone(),
2390 ToolOutput::Structured(value) => value.to_string(),
2391 ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2392 ToolOutput::Files(files) => format!("[{} files]", files.len()),
2393 }
2394}
2395
2396fn interrupted_metadata(stage: &str) -> MetadataMap {
2397 let mut metadata = MetadataMap::new();
2398 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2399 metadata.insert(
2400 INTERRUPT_REASON_METADATA_KEY.into(),
2401 USER_CANCELLED_REASON.into(),
2402 );
2403 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2404 metadata
2405}
2406
2407fn interrupted_assistant_items() -> Vec<Item> {
2408 vec![Item {
2409 id: None,
2410 kind: ItemKind::Assistant,
2411 parts: vec![Part::Text(TextPart {
2412 text: "Previous assistant response was interrupted by the user before completion."
2413 .into(),
2414 metadata: interrupted_metadata("assistant"),
2415 })],
2416 metadata: interrupted_metadata("assistant"),
2417 usage: None,
2418 finish_reason: None,
2419 created_at: None,
2420 }]
2421}
2422
2423fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2424 let mut calls = Vec::new();
2425 for item in items {
2426 for part in &item.parts {
2427 if let Part::ToolCall(call) = part {
2428 calls.push(call.clone());
2429 }
2430 }
2431 }
2432 calls
2433}
2434
2435#[derive(Debug, Error)]
2437pub enum LoopError {
2438 #[error("invalid driver state: {0}")]
2440 InvalidState(String),
2441 #[error("turn cancelled")]
2443 Cancelled,
2444 #[error("provider error: {0}")]
2446 Provider(String),
2447 #[error("tool error: {0}")]
2449 Tool(#[from] ToolError),
2450 #[error("mutator error: {0}")]
2452 Mutator(String),
2453 #[error("unsupported operation: {0}")]
2455 Unsupported(String),
2456}
2457
2458struct DriverEmitter<'a> {
2463 observers: &'a [Arc<dyn LoopObserver>],
2464}
2465
2466impl<'a> EventEmitter for DriverEmitter<'a> {
2467 fn emit(&self, event: AgentEvent) {
2468 for observer in self.observers {
2469 observer.handle_event(event.clone());
2470 }
2471 }
2472}
2473
2474fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2479 let mut pending: HashSet<ToolCallId> = HashSet::new();
2480 let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2481 let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2482 for item in transcript {
2483 for part in &item.parts {
2484 match part {
2485 Part::ToolCall(call) => {
2486 if !seen_calls.insert(call.id.clone()) {
2487 return Err(LoopError::Mutator(format!(
2488 "transcript invariant violation: duplicate tool_use: {}",
2489 call.id.0
2490 )));
2491 }
2492 pending.insert(call.id.clone());
2493 }
2494 Part::ToolResult(result) => {
2495 if !pending.remove(&result.call_id) {
2496 let kind = if seen_results.contains(&result.call_id) {
2497 "duplicate"
2498 } else {
2499 "orphaned"
2500 };
2501 return Err(LoopError::Mutator(format!(
2502 "transcript invariant violation: {kind} tool_result: {}",
2503 result.call_id.0
2504 )));
2505 }
2506 seen_results.insert(result.call_id.clone());
2507 }
2508 _ => {}
2509 }
2510 }
2511 }
2512 if !pending.is_empty() {
2513 let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2514 return Err(LoopError::Mutator(format!(
2515 "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2516 missing.join(", ")
2517 )));
2518 }
2519 Ok(())
2520}
2521
2522#[cfg(test)]
2523mod tests {
2524 use std::collections::VecDeque;
2525 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2526 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2527
2528 use agentkit_core::{
2529 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2530 ToolResultPart,
2531 };
2532 use agentkit_task_manager::{
2533 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2534 TaskRoutingPolicy,
2535 };
2536 use agentkit_tools_core::{
2537 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2538 ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2539 ToolResult, ToolSpec,
2540 };
2541 use serde_json::{Value, json};
2542 use tokio::sync::Notify;
2543 use tokio::time::{Duration, timeout};
2544
2545 use super::*;
2546
2547 struct FakeAdapter;
2548 struct SlowAdapter;
2549 struct RecordingAdapter {
2550 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2551 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2552 }
2553 struct MultiToolAdapter;
2554 struct DualApprovalAdapter;
2555
2556 struct FakeSession;
2557 struct SlowSession;
2558 struct RecordingSession {
2559 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2560 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2561 }
2562 struct MultiToolSession;
2563 struct DualApprovalSession;
2564
2565 struct FakeTurn {
2566 events: VecDeque<ModelTurnEvent>,
2567 }
2568
2569 struct SlowTurn {
2570 emitted: bool,
2571 }
2572
2573 struct RecordingTurn {
2574 emitted: bool,
2575 }
2576 struct MultiToolTurn {
2577 events: VecDeque<ModelTurnEvent>,
2578 }
2579 struct DualApprovalTurn {
2580 events: VecDeque<ModelTurnEvent>,
2581 }
2582
2583 #[async_trait]
2584 impl ModelAdapter for FakeAdapter {
2585 type Session = FakeSession;
2586
2587 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2588 Ok(FakeSession)
2589 }
2590 }
2591
2592 #[async_trait]
2593 impl ModelAdapter for SlowAdapter {
2594 type Session = SlowSession;
2595
2596 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2597 Ok(SlowSession)
2598 }
2599 }
2600
2601 #[async_trait]
2602 impl ModelAdapter for RecordingAdapter {
2603 type Session = RecordingSession;
2604
2605 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2606 Ok(RecordingSession {
2607 seen_descriptions: self.seen_descriptions.clone(),
2608 seen_caches: self.seen_caches.clone(),
2609 })
2610 }
2611 }
2612
2613 #[async_trait]
2614 impl ModelAdapter for MultiToolAdapter {
2615 type Session = MultiToolSession;
2616
2617 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2618 Ok(MultiToolSession)
2619 }
2620 }
2621
2622 #[async_trait]
2623 impl ModelAdapter for DualApprovalAdapter {
2624 type Session = DualApprovalSession;
2625
2626 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2627 Ok(DualApprovalSession)
2628 }
2629 }
2630
2631 #[async_trait]
2632 impl ModelSession for FakeSession {
2633 type Turn = FakeTurn;
2634
2635 async fn begin_turn(
2636 &mut self,
2637 request: TurnRequest,
2638 _cancellation: Option<TurnCancellation>,
2639 ) -> Result<Self::Turn, LoopError> {
2640 let has_tool_result = request.transcript.iter().any(|item| {
2641 item.kind == ItemKind::Tool
2642 && item
2643 .parts
2644 .iter()
2645 .any(|part| matches!(part, Part::ToolResult(_)))
2646 });
2647 let tool_name = request
2648 .available_tools
2649 .first()
2650 .map(|tool| tool.name.0.clone())
2651 .unwrap_or_else(|| "echo".into());
2652
2653 let events = if has_tool_result {
2654 let result_text = request
2655 .transcript
2656 .iter()
2657 .rev()
2658 .find_map(|item| {
2659 item.parts.iter().find_map(|part| match part {
2660 Part::ToolResult(ToolResultPart {
2661 output: ToolOutput::Text(text),
2662 ..
2663 }) => Some(text.clone()),
2664 _ => None,
2665 })
2666 })
2667 .unwrap_or_else(|| "missing".into());
2668
2669 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2670 finish_reason: FinishReason::Completed,
2671 output_items: vec![Item {
2672 id: None,
2673 kind: ItemKind::Assistant,
2674 parts: vec![Part::Text(TextPart {
2675 text: format!("tool said: {result_text}"),
2676 metadata: MetadataMap::new(),
2677 })],
2678 metadata: MetadataMap::new(),
2679 usage: None,
2680 finish_reason: None,
2681 created_at: None,
2682 }],
2683 usage: None,
2684 metadata: MetadataMap::new(),
2685 })])
2686 } else {
2687 VecDeque::from([
2688 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2689 id: ToolCallId::new("call-1"),
2690 name: tool_name.clone(),
2691 input: json!({ "value": "pong" }),
2692 metadata: MetadataMap::new(),
2693 }),
2694 ModelTurnEvent::Finished(ModelTurnResult {
2695 finish_reason: FinishReason::ToolCall,
2696 output_items: vec![Item {
2697 id: None,
2698 kind: ItemKind::Assistant,
2699 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2700 id: ToolCallId::new("call-1"),
2701 name: tool_name,
2702 input: json!({ "value": "pong" }),
2703 metadata: MetadataMap::new(),
2704 })],
2705 metadata: MetadataMap::new(),
2706 usage: None,
2707 finish_reason: None,
2708 created_at: None,
2709 }],
2710 usage: None,
2711 metadata: MetadataMap::new(),
2712 }),
2713 ])
2714 };
2715
2716 Ok(FakeTurn { events })
2717 }
2718 }
2719
2720 #[async_trait]
2721 impl ModelSession for SlowSession {
2722 type Turn = SlowTurn;
2723
2724 async fn begin_turn(
2725 &mut self,
2726 request: TurnRequest,
2727 cancellation: Option<TurnCancellation>,
2728 ) -> Result<Self::Turn, LoopError> {
2729 let should_block = request
2730 .transcript
2731 .iter()
2732 .rev()
2733 .find(|item| item.kind == ItemKind::User)
2734 .is_some_and(|item| {
2735 item.parts.iter().any(|part| match part {
2736 Part::Text(text) => text.text == "do the long task",
2737 _ => false,
2738 })
2739 });
2740
2741 if should_block && let Some(cancellation) = cancellation {
2742 cancellation.cancelled().await;
2743 return Err(LoopError::Cancelled);
2744 }
2745
2746 Ok(SlowTurn { emitted: false })
2747 }
2748 }
2749
2750 #[async_trait]
2751 impl ModelSession for RecordingSession {
2752 type Turn = RecordingTurn;
2753
2754 async fn begin_turn(
2755 &mut self,
2756 request: TurnRequest,
2757 _cancellation: Option<TurnCancellation>,
2758 ) -> Result<Self::Turn, LoopError> {
2759 let descriptions = request
2760 .available_tools
2761 .iter()
2762 .map(|tool| tool.description.clone())
2763 .collect::<Vec<_>>();
2764 self.seen_descriptions.lock().unwrap().push(descriptions);
2765 self.seen_caches.lock().unwrap().push(request.cache.clone());
2766
2767 Ok(RecordingTurn { emitted: false })
2768 }
2769 }
2770
2771 #[async_trait]
2772 impl ModelSession for MultiToolSession {
2773 type Turn = MultiToolTurn;
2774
2775 async fn begin_turn(
2776 &mut self,
2777 request: TurnRequest,
2778 _cancellation: Option<TurnCancellation>,
2779 ) -> Result<Self::Turn, LoopError> {
2780 let has_tool_result = request.transcript.iter().any(|item| {
2781 item.kind == ItemKind::Tool
2782 && item
2783 .parts
2784 .iter()
2785 .any(|part| matches!(part, Part::ToolResult(_)))
2786 });
2787
2788 let events = if has_tool_result {
2789 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2790 finish_reason: FinishReason::Completed,
2791 output_items: vec![Item {
2792 id: None,
2793 kind: ItemKind::Assistant,
2794 parts: vec![Part::Text(TextPart {
2795 text: "mixed tools finished".into(),
2796 metadata: MetadataMap::new(),
2797 })],
2798 metadata: MetadataMap::new(),
2799 usage: None,
2800 finish_reason: None,
2801 created_at: None,
2802 }],
2803 usage: None,
2804 metadata: MetadataMap::new(),
2805 })])
2806 } else {
2807 let foreground = agentkit_core::ToolCallPart {
2808 id: ToolCallId::new("call-foreground"),
2809 name: "foreground-wait".into(),
2810 input: json!({}),
2811 metadata: MetadataMap::new(),
2812 };
2813 let background = agentkit_core::ToolCallPart {
2814 id: ToolCallId::new("call-background"),
2815 name: "background-wait".into(),
2816 input: json!({}),
2817 metadata: MetadataMap::new(),
2818 };
2819 VecDeque::from([
2820 ModelTurnEvent::ToolCall(foreground.clone()),
2821 ModelTurnEvent::ToolCall(background.clone()),
2822 ModelTurnEvent::Finished(ModelTurnResult {
2823 finish_reason: FinishReason::ToolCall,
2824 output_items: vec![Item {
2825 id: None,
2826 kind: ItemKind::Assistant,
2827 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2828 metadata: MetadataMap::new(),
2829 usage: None,
2830 finish_reason: None,
2831 created_at: None,
2832 }],
2833 usage: None,
2834 metadata: MetadataMap::new(),
2835 }),
2836 ])
2837 };
2838
2839 Ok(MultiToolTurn { events })
2840 }
2841 }
2842
2843 #[async_trait]
2844 impl ModelSession for DualApprovalSession {
2845 type Turn = DualApprovalTurn;
2846
2847 async fn begin_turn(
2848 &mut self,
2849 request: TurnRequest,
2850 _cancellation: Option<TurnCancellation>,
2851 ) -> Result<Self::Turn, LoopError> {
2852 let tool_results = request
2853 .transcript
2854 .iter()
2855 .flat_map(|item| item.parts.iter())
2856 .filter(|part| matches!(part, Part::ToolResult(_)))
2857 .count();
2858
2859 let events = if tool_results >= 2 {
2860 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2861 finish_reason: FinishReason::Completed,
2862 output_items: vec![Item {
2863 id: None,
2864 kind: ItemKind::Assistant,
2865 parts: vec![Part::Text(TextPart {
2866 text: "both approvals finished".into(),
2867 metadata: MetadataMap::new(),
2868 })],
2869 metadata: MetadataMap::new(),
2870 usage: None,
2871 finish_reason: None,
2872 created_at: None,
2873 }],
2874 usage: None,
2875 metadata: MetadataMap::new(),
2876 })])
2877 } else {
2878 let first = agentkit_core::ToolCallPart {
2879 id: ToolCallId::new("call-1"),
2880 name: "echo".into(),
2881 input: json!({ "value": "first" }),
2882 metadata: MetadataMap::new(),
2883 };
2884 let second = agentkit_core::ToolCallPart {
2885 id: ToolCallId::new("call-2"),
2886 name: "echo".into(),
2887 input: json!({ "value": "second" }),
2888 metadata: MetadataMap::new(),
2889 };
2890 VecDeque::from([
2891 ModelTurnEvent::ToolCall(first.clone()),
2892 ModelTurnEvent::ToolCall(second.clone()),
2893 ModelTurnEvent::Finished(ModelTurnResult {
2894 finish_reason: FinishReason::ToolCall,
2895 output_items: vec![Item {
2896 id: None,
2897 kind: ItemKind::Assistant,
2898 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2899 metadata: MetadataMap::new(),
2900 usage: None,
2901 finish_reason: None,
2902 created_at: None,
2903 }],
2904 usage: None,
2905 metadata: MetadataMap::new(),
2906 }),
2907 ])
2908 };
2909
2910 Ok(DualApprovalTurn { events })
2911 }
2912 }
2913
2914 #[async_trait]
2915 impl ModelTurn for FakeTurn {
2916 async fn next_event(
2917 &mut self,
2918 _cancellation: Option<TurnCancellation>,
2919 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2920 Ok(self.events.pop_front())
2921 }
2922 }
2923
2924 #[async_trait]
2925 impl ModelTurn for SlowTurn {
2926 async fn next_event(
2927 &mut self,
2928 cancellation: Option<TurnCancellation>,
2929 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2930 if let Some(cancellation) = cancellation
2931 && cancellation.is_cancelled()
2932 {
2933 return Err(LoopError::Cancelled);
2934 }
2935
2936 if self.emitted {
2937 Ok(None)
2938 } else {
2939 self.emitted = true;
2940 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2941 finish_reason: FinishReason::Completed,
2942 output_items: vec![Item {
2943 id: None,
2944 kind: ItemKind::Assistant,
2945 parts: vec![Part::Text(TextPart {
2946 text: "done".into(),
2947 metadata: MetadataMap::new(),
2948 })],
2949 metadata: MetadataMap::new(),
2950 usage: None,
2951 finish_reason: None,
2952 created_at: None,
2953 }],
2954 usage: None,
2955 metadata: MetadataMap::new(),
2956 })))
2957 }
2958 }
2959 }
2960
2961 #[async_trait]
2962 impl ModelTurn for RecordingTurn {
2963 async fn next_event(
2964 &mut self,
2965 _cancellation: Option<TurnCancellation>,
2966 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2967 if self.emitted {
2968 Ok(None)
2969 } else {
2970 self.emitted = true;
2971 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2972 finish_reason: FinishReason::Completed,
2973 output_items: vec![Item {
2974 id: None,
2975 kind: ItemKind::Assistant,
2976 parts: vec![Part::Text(TextPart {
2977 text: "done".into(),
2978 metadata: MetadataMap::new(),
2979 })],
2980 metadata: MetadataMap::new(),
2981 usage: None,
2982 finish_reason: None,
2983 created_at: None,
2984 }],
2985 usage: None,
2986 metadata: MetadataMap::new(),
2987 })))
2988 }
2989 }
2990 }
2991
2992 #[async_trait]
2993 impl ModelTurn for MultiToolTurn {
2994 async fn next_event(
2995 &mut self,
2996 _cancellation: Option<TurnCancellation>,
2997 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2998 Ok(self.events.pop_front())
2999 }
3000 }
3001
3002 #[async_trait]
3003 impl ModelTurn for DualApprovalTurn {
3004 async fn next_event(
3005 &mut self,
3006 _cancellation: Option<TurnCancellation>,
3007 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3008 Ok(self.events.pop_front())
3009 }
3010 }
3011
3012 #[derive(Clone)]
3013 struct EchoTool {
3014 spec: ToolSpec,
3015 }
3016
3017 impl Default for EchoTool {
3018 fn default() -> Self {
3019 Self {
3020 spec: ToolSpec {
3021 name: ToolName::new("echo"),
3022 description: "Echo back a value".into(),
3023 input_schema: json!({
3024 "type": "object",
3025 "properties": {
3026 "value": { "type": "string" }
3027 },
3028 "required": ["value"],
3029 "additionalProperties": false
3030 }),
3031 annotations: ToolAnnotations::default(),
3032 metadata: MetadataMap::new(),
3033 },
3034 }
3035 }
3036 }
3037
3038 #[derive(Clone)]
3039 struct DynamicSpecTool {
3040 spec: ToolSpec,
3041 version: StdArc<AtomicUsize>,
3042 }
3043
3044 impl DynamicSpecTool {
3045 fn new(version: StdArc<AtomicUsize>) -> Self {
3046 Self {
3047 spec: ToolSpec {
3048 name: ToolName::new("dynamic"),
3049 description: "dynamic version 0".into(),
3050 input_schema: json!({
3051 "type": "object",
3052 "properties": {},
3053 "additionalProperties": false
3054 }),
3055 annotations: ToolAnnotations::default(),
3056 metadata: MetadataMap::new(),
3057 },
3058 version,
3059 }
3060 }
3061 }
3062
3063 #[async_trait]
3064 impl Tool for EchoTool {
3065 fn spec(&self) -> &ToolSpec {
3066 &self.spec
3067 }
3068
3069 fn proposed_requests(
3070 &self,
3071 request: &agentkit_tools_core::ToolRequest,
3072 ) -> Result<
3073 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3074 agentkit_tools_core::ToolError,
3075 > {
3076 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3077 path: "/tmp/echo".into(),
3078 metadata: request.metadata.clone(),
3079 })])
3080 }
3081
3082 async fn invoke(
3083 &self,
3084 request: agentkit_tools_core::ToolRequest,
3085 _ctx: &mut ToolContext<'_>,
3086 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3087 let value = request
3088 .input
3089 .get("value")
3090 .and_then(Value::as_str)
3091 .ok_or_else(|| {
3092 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3093 })?;
3094
3095 Ok(ToolResult {
3096 result: ToolResultPart {
3097 call_id: request.call_id,
3098 output: ToolOutput::Text(value.into()),
3099 is_error: false,
3100 metadata: MetadataMap::new(),
3101 },
3102 duration: None,
3103 metadata: MetadataMap::new(),
3104 })
3105 }
3106 }
3107
3108 #[async_trait]
3109 impl Tool for DynamicSpecTool {
3110 fn spec(&self) -> &ToolSpec {
3111 &self.spec
3112 }
3113
3114 fn current_spec(&self) -> Option<ToolSpec> {
3115 let mut spec = self.spec.clone();
3116 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3117 Some(spec)
3118 }
3119
3120 async fn invoke(
3121 &self,
3122 request: agentkit_tools_core::ToolRequest,
3123 _ctx: &mut ToolContext<'_>,
3124 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3125 Ok(ToolResult {
3126 result: ToolResultPart {
3127 call_id: request.call_id,
3128 output: ToolOutput::Text("ok".into()),
3129 is_error: false,
3130 metadata: MetadataMap::new(),
3131 },
3132 duration: None,
3133 metadata: MetadataMap::new(),
3134 })
3135 }
3136 }
3137
3138 struct DenyFsReads;
3139
3140 impl PermissionChecker for DenyFsReads {
3141 fn evaluate(
3142 &self,
3143 request: &dyn agentkit_tools_core::PermissionRequest,
3144 ) -> PermissionDecision {
3145 if request.kind() == "filesystem.read" {
3146 return PermissionDecision::Deny(PermissionDenial {
3147 code: PermissionCode::PathNotAllowed,
3148 message: "reads denied in test".into(),
3149 metadata: MetadataMap::new(),
3150 });
3151 }
3152
3153 PermissionDecision::Allow
3154 }
3155 }
3156
3157 struct ApproveFsReads;
3158
3159 impl PermissionChecker for ApproveFsReads {
3160 fn evaluate(
3161 &self,
3162 request: &dyn agentkit_tools_core::PermissionRequest,
3163 ) -> PermissionDecision {
3164 if request.kind() == "filesystem.read" {
3165 return PermissionDecision::RequireApproval(ApprovalRequest {
3166 task_id: None,
3167 call_id: None,
3168 id: "approval:fs-read".into(),
3169 request_kind: request.kind().into(),
3170 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3171 summary: request.summary(),
3172 metadata: request.metadata().clone(),
3173 });
3174 }
3175
3176 PermissionDecision::Allow
3177 }
3178 }
3179
3180 struct KeepRecentMutator {
3181 keep: usize,
3182 }
3183
3184 #[async_trait]
3185 impl LoopMutator for KeepRecentMutator {
3186 async fn mutate(
3187 &self,
3188 cursor: &mut TranscriptCursor<'_>,
3189 ctx: LoopCtx<'_>,
3190 ) -> Result<(), LoopError> {
3191 if cursor.len() < 2 {
3192 return Ok(());
3193 }
3194 let drop = cursor.len().saturating_sub(self.keep);
3195 ctx.emitter.emit(AgentEvent::MutationStarted {
3196 session_id: ctx.session_id.clone(),
3197 turn_id: ctx.turn_id.cloned(),
3198 mutator: "keep-recent".into(),
3199 point: ctx.point,
3200 });
3201 cursor.drain(..drop);
3202 ctx.emitter.emit(AgentEvent::MutationFinished {
3203 session_id: ctx.session_id.clone(),
3204 turn_id: ctx.turn_id.cloned(),
3205 mutator: "keep-recent".into(),
3206 dirty: true,
3207 metadata: MetadataMap::new(),
3208 });
3209 Ok(())
3210 }
3211 }
3212
3213 struct RecordingObserver {
3214 events: StdArc<StdMutex<Vec<AgentEvent>>>,
3215 }
3216
3217 impl LoopObserver for RecordingObserver {
3218 fn handle_event(&self, event: AgentEvent) {
3219 self.events.lock().unwrap().push(event);
3220 }
3221 }
3222
3223 struct CatalogExecutor {
3224 version: AtomicUsize,
3225 events: StdMutex<Vec<ToolCatalogEvent>>,
3226 }
3227
3228 impl CatalogExecutor {
3229 fn new() -> Self {
3230 Self {
3231 version: AtomicUsize::new(0),
3232 events: StdMutex::new(Vec::new()),
3233 }
3234 }
3235
3236 fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3237 self.version.store(version, Ordering::SeqCst);
3238 self.events.lock().unwrap().push(event);
3239 }
3240 }
3241
3242 #[async_trait]
3243 impl ToolExecutor for CatalogExecutor {
3244 fn specs(&self) -> Vec<ToolSpec> {
3245 vec![ToolSpec {
3246 name: ToolName::new("dynamic"),
3247 description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3248 input_schema: json!({
3249 "type": "object",
3250 "properties": {},
3251 "additionalProperties": false
3252 }),
3253 annotations: ToolAnnotations::default(),
3254 metadata: MetadataMap::new(),
3255 }]
3256 }
3257
3258 fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3259 std::mem::take(&mut *self.events.lock().unwrap())
3260 }
3261
3262 async fn execute(
3263 &self,
3264 request: ToolRequest,
3265 _ctx: &mut ToolContext<'_>,
3266 ) -> ToolExecutionOutcome {
3267 ToolExecutionOutcome::Completed(ToolResult {
3268 result: ToolResultPart {
3269 call_id: request.call_id,
3270 output: ToolOutput::Text("dynamic-ok".into()),
3271 is_error: false,
3272 metadata: MetadataMap::new(),
3273 },
3274 duration: None,
3275 metadata: MetadataMap::new(),
3276 })
3277 }
3278 }
3279
3280 #[derive(Clone)]
3281 struct BlockingTool {
3282 spec: ToolSpec,
3283 entered: StdArc<AtomicBool>,
3284 release: StdArc<Notify>,
3285 output: &'static str,
3286 }
3287
3288 impl BlockingTool {
3289 fn new(
3290 name: &str,
3291 entered: StdArc<AtomicBool>,
3292 release: StdArc<Notify>,
3293 output: &'static str,
3294 ) -> Self {
3295 Self {
3296 spec: ToolSpec {
3297 name: ToolName::new(name),
3298 description: format!("blocking tool {name}"),
3299 input_schema: json!({
3300 "type": "object",
3301 "properties": {},
3302 "additionalProperties": false
3303 }),
3304 annotations: ToolAnnotations::default(),
3305 metadata: MetadataMap::new(),
3306 },
3307 entered,
3308 release,
3309 output,
3310 }
3311 }
3312 }
3313
3314 #[async_trait]
3315 impl Tool for BlockingTool {
3316 fn spec(&self) -> &ToolSpec {
3317 &self.spec
3318 }
3319
3320 async fn invoke(
3321 &self,
3322 request: agentkit_tools_core::ToolRequest,
3323 _ctx: &mut ToolContext<'_>,
3324 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3325 self.entered.store(true, Ordering::SeqCst);
3326 self.release.notified().await;
3327 Ok(ToolResult {
3328 result: ToolResultPart {
3329 call_id: request.call_id,
3330 output: ToolOutput::Text(self.output.into()),
3331 is_error: false,
3332 metadata: MetadataMap::new(),
3333 },
3334 duration: None,
3335 metadata: MetadataMap::new(),
3336 })
3337 }
3338 }
3339
3340 struct NameRoutingPolicy {
3341 routes: Vec<(String, RoutingDecision)>,
3342 }
3343
3344 impl NameRoutingPolicy {
3345 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3346 Self {
3347 routes: routes
3348 .into_iter()
3349 .map(|(name, decision)| (name.into(), decision))
3350 .collect(),
3351 }
3352 }
3353 }
3354
3355 impl TaskRoutingPolicy for NameRoutingPolicy {
3356 fn route(&self, request: &ToolRequest) -> RoutingDecision {
3357 self.routes
3358 .iter()
3359 .find(|(name, _)| name == &request.tool_name.0)
3360 .map(|(_, decision)| *decision)
3361 .unwrap_or(RoutingDecision::Foreground)
3362 }
3363 }
3364
3365 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3366 timeout(Duration::from_secs(1), handle.next_event())
3367 .await
3368 .expect("timed out waiting for task event")
3369 .expect("task event stream ended unexpectedly")
3370 }
3371
3372 async fn wait_until_entered(flag: &AtomicBool) {
3373 timeout(Duration::from_secs(1), async {
3374 while !flag.load(Ordering::SeqCst) {
3375 tokio::task::yield_now().await;
3376 }
3377 })
3378 .await
3379 .expect("task never entered execution");
3380 }
3381
3382 #[tokio::test]
3383 async fn loop_continues_after_completed_tool_call() {
3384 let tools = ToolRegistry::new().with(EchoTool::default());
3385 let agent = Agent::builder()
3386 .model(FakeAdapter)
3387 .add_tool_source(tools)
3388 .permissions(AllowAllPermissions)
3389 .build()
3390 .unwrap();
3391
3392 let mut driver = agent
3393 .start(SessionConfig {
3394 session_id: SessionId::new("session-1"),
3395 metadata: MetadataMap::new(),
3396 cache: None,
3397 })
3398 .await
3399 .unwrap();
3400
3401 driver
3402 .submit_input(vec![Item {
3403 id: None,
3404 kind: ItemKind::User,
3405 parts: vec![Part::Text(TextPart {
3406 text: "ping".into(),
3407 metadata: MetadataMap::new(),
3408 })],
3409 metadata: MetadataMap::new(),
3410 usage: None,
3411 finish_reason: None,
3412 created_at: None,
3413 }])
3414 .unwrap();
3415
3416 let result = run_until_finished(&mut driver).await;
3417
3418 match result {
3419 LoopStep::Finished(turn) => {
3420 assert_eq!(turn.finish_reason, FinishReason::Completed);
3421 assert_eq!(turn.items.len(), 1);
3422 match &turn.items[0].parts[0] {
3423 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3424 other => panic!("unexpected part: {other:?}"),
3425 }
3426 }
3427 other => panic!("unexpected loop step: {other:?}"),
3428 }
3429 }
3430
3431 async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3435 loop {
3436 match driver.next().await.unwrap() {
3437 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3438 step => return step,
3439 }
3440 }
3441 }
3442
3443 #[tokio::test]
3444 async fn loop_uses_injected_permission_checker() {
3445 let tools = ToolRegistry::new().with(EchoTool::default());
3446 let agent = Agent::builder()
3447 .model(FakeAdapter)
3448 .add_tool_source(tools)
3449 .permissions(DenyFsReads)
3450 .build()
3451 .unwrap();
3452
3453 let mut driver = agent
3454 .start(SessionConfig {
3455 session_id: SessionId::new("session-2"),
3456 metadata: MetadataMap::new(),
3457 cache: None,
3458 })
3459 .await
3460 .unwrap();
3461
3462 driver
3463 .submit_input(vec![Item {
3464 id: None,
3465 kind: ItemKind::User,
3466 parts: vec![Part::Text(TextPart {
3467 text: "ping".into(),
3468 metadata: MetadataMap::new(),
3469 })],
3470 metadata: MetadataMap::new(),
3471 usage: None,
3472 finish_reason: None,
3473 created_at: None,
3474 }])
3475 .unwrap();
3476
3477 let result = run_until_finished(&mut driver).await;
3478
3479 match result {
3480 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3481 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3482 other => panic!("unexpected part: {other:?}"),
3483 },
3484 other => panic!("unexpected loop step: {other:?}"),
3485 }
3486 }
3487
3488 #[tokio::test]
3489 async fn async_task_manager_background_round_requires_explicit_continue() {
3490 let entered = StdArc::new(AtomicBool::new(false));
3491 let release = StdArc::new(Notify::new());
3492 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3493 "background-wait",
3494 RoutingDecision::Background,
3495 )]));
3496 let handle = task_manager.handle();
3497 let tools = ToolRegistry::new().with(BlockingTool::new(
3498 "background-wait",
3499 entered.clone(),
3500 release.clone(),
3501 "background-done",
3502 ));
3503 let agent = Agent::builder()
3504 .model(FakeAdapter)
3505 .add_tool_source(tools)
3506 .permissions(AllowAllPermissions)
3507 .task_manager(task_manager)
3508 .build()
3509 .unwrap();
3510
3511 let mut driver = agent
3512 .start(SessionConfig {
3513 session_id: SessionId::new("session-background"),
3514 metadata: MetadataMap::new(),
3515 cache: None,
3516 })
3517 .await
3518 .unwrap();
3519
3520 driver
3521 .submit_input(vec![Item {
3522 id: None,
3523 kind: ItemKind::User,
3524 parts: vec![Part::Text(TextPart {
3525 text: "ping".into(),
3526 metadata: MetadataMap::new(),
3527 })],
3528 metadata: MetadataMap::new(),
3529 usage: None,
3530 finish_reason: None,
3531 created_at: None,
3532 }])
3533 .unwrap();
3534
3535 let first = driver.next().await.unwrap();
3536 match first {
3537 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3538 other => panic!("unexpected first loop step: {other:?}"),
3539 }
3540
3541 match wait_for_task_event(&handle).await {
3542 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3543 other => panic!("unexpected task event: {other:?}"),
3544 }
3545 wait_until_entered(entered.as_ref()).await;
3546 release.notify_waiters();
3547
3548 match wait_for_task_event(&handle).await {
3549 TaskEvent::Completed(_, result) => {
3550 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3551 }
3552 other => panic!("unexpected completion event: {other:?}"),
3553 }
3554
3555 let resumed = driver.next().await.unwrap();
3556 match resumed {
3557 LoopStep::Finished(turn) => {
3558 assert_eq!(turn.finish_reason, FinishReason::Completed);
3559 match &turn.items[0].parts[0] {
3560 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3561 other => panic!("unexpected part after resume: {other:?}"),
3562 }
3563 }
3564 other => panic!("unexpected resumed step: {other:?}"),
3565 }
3566 }
3567
3568 #[tokio::test]
3569 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3570 let controller = CancellationController::new();
3571 let agent = Agent::builder()
3572 .model(SlowAdapter)
3573 .cancellation(controller.handle())
3574 .build()
3575 .unwrap();
3576
3577 let mut driver = agent
3578 .start(SessionConfig {
3579 session_id: SessionId::new("session-cancel"),
3580 metadata: MetadataMap::new(),
3581 cache: None,
3582 })
3583 .await
3584 .unwrap();
3585
3586 driver
3587 .submit_input(vec![Item {
3588 id: None,
3589 kind: ItemKind::User,
3590 parts: vec![Part::Text(TextPart {
3591 text: "do the long task".into(),
3592 metadata: MetadataMap::new(),
3593 })],
3594 metadata: MetadataMap::new(),
3595 usage: None,
3596 finish_reason: None,
3597 created_at: None,
3598 }])
3599 .unwrap();
3600
3601 let cancelled = tokio::join!(async { driver.next().await }, async {
3602 tokio::task::yield_now().await;
3603 controller.interrupt();
3604 })
3605 .0
3606 .unwrap();
3607
3608 match cancelled {
3609 LoopStep::Finished(turn) => {
3610 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3611 assert_eq!(turn.items.len(), 1);
3612 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3613 assert_eq!(
3614 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3615 Some(&Value::Bool(true))
3616 );
3617 }
3618 other => panic!("unexpected loop step: {other:?}"),
3619 }
3620
3621 driver
3622 .submit_input(vec![Item {
3623 id: None,
3624 kind: ItemKind::User,
3625 parts: vec![Part::Text(TextPart {
3626 text: "try again".into(),
3627 metadata: MetadataMap::new(),
3628 })],
3629 metadata: MetadataMap::new(),
3630 usage: None,
3631 finish_reason: None,
3632 created_at: None,
3633 }])
3634 .unwrap();
3635
3636 let result = driver.next().await.unwrap();
3637 match result {
3638 LoopStep::Finished(turn) => {
3639 assert_eq!(turn.finish_reason, FinishReason::Completed);
3640 }
3641 other => panic!("unexpected loop step after retry: {other:?}"),
3642 }
3643 }
3644
3645 #[tokio::test]
3646 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3647 let controller = CancellationController::new();
3648 let fg_entered = StdArc::new(AtomicBool::new(false));
3649 let fg_release = StdArc::new(Notify::new());
3650 let bg_entered = StdArc::new(AtomicBool::new(false));
3651 let bg_release = StdArc::new(Notify::new());
3652 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3653 ("foreground-wait", RoutingDecision::Foreground),
3654 ("background-wait", RoutingDecision::Background),
3655 ]));
3656 let handle = task_manager.handle();
3657 let tools = ToolRegistry::new()
3658 .with(BlockingTool::new(
3659 "foreground-wait",
3660 fg_entered.clone(),
3661 fg_release,
3662 "foreground-done",
3663 ))
3664 .with(BlockingTool::new(
3665 "background-wait",
3666 bg_entered.clone(),
3667 bg_release.clone(),
3668 "background-done",
3669 ));
3670 let agent = Agent::builder()
3671 .model(MultiToolAdapter)
3672 .add_tool_source(tools)
3673 .permissions(AllowAllPermissions)
3674 .cancellation(controller.handle())
3675 .task_manager(task_manager)
3676 .build()
3677 .unwrap();
3678
3679 let mut driver = agent
3680 .start(SessionConfig {
3681 session_id: SessionId::new("session-mixed-cancel"),
3682 metadata: MetadataMap::new(),
3683 cache: None,
3684 })
3685 .await
3686 .unwrap();
3687
3688 driver
3689 .submit_input(vec![Item {
3690 id: None,
3691 kind: ItemKind::User,
3692 parts: vec![Part::Text(TextPart {
3693 text: "run both".into(),
3694 metadata: MetadataMap::new(),
3695 })],
3696 metadata: MetadataMap::new(),
3697 usage: None,
3698 finish_reason: None,
3699 created_at: None,
3700 }])
3701 .unwrap();
3702
3703 let cancelled = tokio::join!(async { driver.next().await }, async {
3704 let _ = wait_for_task_event(&handle).await;
3705 let _ = wait_for_task_event(&handle).await;
3706 wait_until_entered(fg_entered.as_ref()).await;
3707 wait_until_entered(bg_entered.as_ref()).await;
3708 controller.interrupt();
3709 })
3710 .0
3711 .unwrap();
3712
3713 match cancelled {
3714 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3715 other => panic!("unexpected loop step after interrupt: {other:?}"),
3716 }
3717
3718 match wait_for_task_event(&handle).await {
3719 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3720 other => panic!("unexpected post-interrupt event: {other:?}"),
3721 }
3722
3723 let running = handle.list_running().await;
3724 assert_eq!(running.len(), 1);
3725 assert_eq!(running[0].tool_name, "background-wait");
3726
3727 bg_release.notify_waiters();
3728 match wait_for_task_event(&handle).await {
3729 TaskEvent::Completed(snapshot, result) => {
3730 assert_eq!(snapshot.tool_name, "background-wait");
3731 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3732 }
3733 other => panic!("unexpected background completion event: {other:?}"),
3734 }
3735 }
3736
3737 #[tokio::test]
3738 async fn loop_resumes_after_approved_tool_request() {
3739 let tools = ToolRegistry::new().with(EchoTool::default());
3740 let agent = Agent::builder()
3741 .model(FakeAdapter)
3742 .add_tool_source(tools)
3743 .permissions(ApproveFsReads)
3744 .build()
3745 .unwrap();
3746
3747 let mut driver = agent
3748 .start(SessionConfig {
3749 session_id: SessionId::new("session-approval"),
3750 metadata: MetadataMap::new(),
3751 cache: None,
3752 })
3753 .await
3754 .unwrap();
3755
3756 driver
3757 .submit_input(vec![Item {
3758 id: None,
3759 kind: ItemKind::User,
3760 parts: vec![Part::Text(TextPart {
3761 text: "ping".into(),
3762 metadata: MetadataMap::new(),
3763 })],
3764 metadata: MetadataMap::new(),
3765 usage: None,
3766 finish_reason: None,
3767 created_at: None,
3768 }])
3769 .unwrap();
3770
3771 let first = driver.next().await.unwrap();
3772 match first {
3773 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3774 assert!(pending.request.task_id.is_some());
3775 assert_eq!(pending.request.id.0, "approval:fs-read");
3776 pending.approve(&mut driver).unwrap();
3777 }
3778 other => panic!("unexpected loop step: {other:?}"),
3779 }
3780 let second = driver.next().await.unwrap();
3781 match second {
3782 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3783 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3784 other => panic!("unexpected part: {other:?}"),
3785 },
3786 other => panic!("unexpected loop step after approval: {other:?}"),
3787 }
3788 }
3789
3790 #[tokio::test]
3791 async fn loop_resumes_with_patched_input_on_approval() {
3792 let tools = ToolRegistry::new().with(EchoTool::default());
3793 let agent = Agent::builder()
3794 .model(FakeAdapter)
3795 .add_tool_source(tools)
3796 .permissions(ApproveFsReads)
3797 .build()
3798 .unwrap();
3799
3800 let mut driver = agent
3801 .start(SessionConfig {
3802 session_id: SessionId::new("session-approval-patched"),
3803 metadata: MetadataMap::new(),
3804 cache: None,
3805 })
3806 .await
3807 .unwrap();
3808
3809 driver
3810 .submit_input(vec![Item {
3811 id: None,
3812 kind: ItemKind::User,
3813 parts: vec![Part::Text(TextPart {
3814 text: "ping".into(),
3815 metadata: MetadataMap::new(),
3816 })],
3817 metadata: MetadataMap::new(),
3818 usage: None,
3819 finish_reason: None,
3820 created_at: None,
3821 }])
3822 .unwrap();
3823
3824 match driver.next().await.unwrap() {
3825 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3826 pending
3827 .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3828 .unwrap();
3829 }
3830 other => panic!("unexpected loop step: {other:?}"),
3831 }
3832 match driver.next().await.unwrap() {
3833 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3834 Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
3835 other => panic!("unexpected part: {other:?}"),
3836 },
3837 other => panic!("unexpected loop step after approval: {other:?}"),
3838 }
3839 }
3840
3841 #[tokio::test]
3842 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3843 let tools = ToolRegistry::new().with(EchoTool::default());
3844 let agent = Agent::builder()
3845 .model(DualApprovalAdapter)
3846 .add_tool_source(tools)
3847 .permissions(ApproveFsReads)
3848 .build()
3849 .unwrap();
3850
3851 let mut driver = agent
3852 .start(SessionConfig {
3853 session_id: SessionId::new("session-dual-approval"),
3854 metadata: MetadataMap::new(),
3855 cache: None,
3856 })
3857 .await
3858 .unwrap();
3859
3860 driver
3861 .submit_input(vec![Item {
3862 id: None,
3863 kind: ItemKind::User,
3864 parts: vec![Part::Text(TextPart {
3865 text: "run both approvals".into(),
3866 metadata: MetadataMap::new(),
3867 })],
3868 metadata: MetadataMap::new(),
3869 usage: None,
3870 finish_reason: None,
3871 created_at: None,
3872 }])
3873 .unwrap();
3874
3875 let pending_first = match driver.next().await.unwrap() {
3876 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3877 assert_eq!(
3878 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3879 Some("call-1")
3880 );
3881 pending
3882 }
3883 other => panic!("unexpected first loop step: {other:?}"),
3884 };
3885
3886 let pending_second = match driver.next().await.unwrap() {
3887 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3888 assert_eq!(
3889 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3890 Some("call-2")
3891 );
3892 pending
3893 }
3894 other => panic!("unexpected second loop step: {other:?}"),
3895 };
3896
3897 pending_second.approve(&mut driver).unwrap();
3898 match driver.next().await.unwrap() {
3899 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3900 assert_eq!(
3901 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3902 Some("call-1")
3903 );
3904 }
3905 other => panic!("unexpected step after approving second request: {other:?}"),
3906 }
3907
3908 pending_first.approve(&mut driver).unwrap();
3909 match driver.next().await.unwrap() {
3910 LoopStep::Finished(turn) => {
3911 assert_eq!(turn.finish_reason, FinishReason::Completed);
3912 match &turn.items[0].parts[0] {
3913 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3914 other => panic!("unexpected final part: {other:?}"),
3915 }
3916 }
3917 other => panic!("unexpected final loop step: {other:?}"),
3918 }
3919 }
3920
3921 #[tokio::test]
3922 async fn loop_compacts_transcript_before_new_turns() {
3923 let events = StdArc::new(StdMutex::new(Vec::new()));
3924 let agent = Agent::builder()
3925 .model(FakeAdapter)
3926 .mutator(KeepRecentMutator { keep: 1 })
3927 .observer(RecordingObserver {
3928 events: events.clone(),
3929 })
3930 .build()
3931 .unwrap();
3932
3933 let mut driver = agent
3934 .start(SessionConfig {
3935 session_id: SessionId::new("session-4"),
3936 metadata: MetadataMap::new(),
3937 cache: None,
3938 })
3939 .await
3940 .unwrap();
3941
3942 for text in ["first", "second"] {
3943 driver
3944 .submit_input(vec![Item {
3945 id: None,
3946 kind: ItemKind::User,
3947 parts: vec![Part::Text(TextPart {
3948 text: text.into(),
3949 metadata: MetadataMap::new(),
3950 })],
3951 metadata: MetadataMap::new(),
3952 usage: None,
3953 finish_reason: None,
3954 created_at: None,
3955 }])
3956 .unwrap();
3957 let _ = driver.next().await.unwrap();
3958 }
3959
3960 let events = events.lock().unwrap();
3961 assert!(
3962 events
3963 .iter()
3964 .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
3965 );
3966 }
3967
3968 #[test]
3969 fn transcript_validation_rejects_orphaned_tool_result() {
3970 let transcript = vec![Item {
3971 id: None,
3972 kind: ItemKind::Tool,
3973 parts: vec![Part::ToolResult(ToolResultPart {
3974 call_id: "call-1".into(),
3975 output: ToolOutput::Text("result".into()),
3976 is_error: false,
3977 metadata: MetadataMap::new(),
3978 })],
3979 metadata: MetadataMap::new(),
3980 usage: None,
3981 finish_reason: None,
3982 created_at: None,
3983 }];
3984
3985 let error = validate_transcript_invariants(&transcript).unwrap_err();
3986 assert!(error.to_string().contains("orphaned tool_result"));
3987 }
3988
3989 #[test]
3990 fn transcript_validation_rejects_duplicate_tool_result() {
3991 let transcript = vec![
3992 Item {
3993 id: None,
3994 kind: ItemKind::Assistant,
3995 parts: vec![Part::ToolCall(ToolCallPart {
3996 id: "call-1".into(),
3997 name: "lookup".into(),
3998 input: serde_json::json!({}),
3999 metadata: MetadataMap::new(),
4000 })],
4001 metadata: MetadataMap::new(),
4002 usage: None,
4003 finish_reason: None,
4004 created_at: None,
4005 },
4006 Item {
4007 id: None,
4008 kind: ItemKind::Tool,
4009 parts: vec![Part::ToolResult(ToolResultPart {
4010 call_id: "call-1".into(),
4011 output: ToolOutput::Text("result".into()),
4012 is_error: false,
4013 metadata: MetadataMap::new(),
4014 })],
4015 metadata: MetadataMap::new(),
4016 usage: None,
4017 finish_reason: None,
4018 created_at: None,
4019 },
4020 Item {
4021 id: None,
4022 kind: ItemKind::Tool,
4023 parts: vec![Part::ToolResult(ToolResultPart {
4024 call_id: "call-1".into(),
4025 output: ToolOutput::Text("again".into()),
4026 is_error: false,
4027 metadata: MetadataMap::new(),
4028 })],
4029 metadata: MetadataMap::new(),
4030 usage: None,
4031 finish_reason: None,
4032 created_at: None,
4033 },
4034 ];
4035
4036 let error = validate_transcript_invariants(&transcript).unwrap_err();
4037 assert!(error.to_string().contains("duplicate tool_result"));
4038 }
4039
4040 #[tokio::test]
4041 async fn loop_refreshes_tool_specs_each_turn() {
4042 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4043 let version = StdArc::new(AtomicUsize::new(1));
4044 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4045 let agent = Agent::builder()
4046 .model(RecordingAdapter {
4047 seen_descriptions: seen_descriptions.clone(),
4048 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4049 })
4050 .add_tool_source(tools)
4051 .permissions(AllowAllPermissions)
4052 .build()
4053 .unwrap();
4054
4055 let mut driver = agent
4056 .start(SessionConfig {
4057 session_id: SessionId::new("session-dynamic-tools"),
4058 metadata: MetadataMap::new(),
4059 cache: None,
4060 })
4061 .await
4062 .unwrap();
4063
4064 for text in ["first", "second"] {
4065 driver
4066 .submit_input(vec![Item {
4067 id: None,
4068 kind: ItemKind::User,
4069 parts: vec![Part::Text(TextPart {
4070 text: text.into(),
4071 metadata: MetadataMap::new(),
4072 })],
4073 metadata: MetadataMap::new(),
4074 usage: None,
4075 finish_reason: None,
4076 created_at: None,
4077 }])
4078 .unwrap();
4079
4080 let _ = driver.next().await.unwrap();
4081 if text == "first" {
4082 version.store(2, Ordering::SeqCst);
4083 }
4084 }
4085
4086 let seen_descriptions = seen_descriptions.lock().unwrap();
4087 assert_eq!(seen_descriptions.len(), 2);
4088 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4089 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4090 }
4091
4092 #[tokio::test]
4093 async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4094 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4095 let events = StdArc::new(StdMutex::new(Vec::new()));
4096 let executor = StdArc::new(CatalogExecutor::new());
4097 let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4098 let agent = Agent::builder()
4099 .model(RecordingAdapter {
4100 seen_descriptions: seen_descriptions.clone(),
4101 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4102 })
4103 .tool_executor(executor_for_agent)
4104 .permissions(AllowAllPermissions)
4105 .observer(RecordingObserver {
4106 events: events.clone(),
4107 })
4108 .build()
4109 .unwrap();
4110
4111 let mut driver = agent
4112 .start(SessionConfig {
4113 session_id: SessionId::new("session-catalog-events"),
4114 metadata: MetadataMap::new(),
4115 cache: None,
4116 })
4117 .await
4118 .unwrap();
4119
4120 driver
4121 .submit_input(vec![Item::text(ItemKind::User, "first")])
4122 .unwrap();
4123 let _ = driver.next().await.unwrap();
4124
4125 executor.publish_change(
4126 1,
4127 ToolCatalogEvent {
4128 source: "mcp:mock".into(),
4129 added: vec!["dynamic".into()],
4130 removed: Vec::new(),
4131 changed: Vec::new(),
4132 },
4133 );
4134
4135 driver
4136 .submit_input(vec![Item::text(ItemKind::User, "second")])
4137 .unwrap();
4138 let _ = driver.next().await.unwrap();
4139
4140 let seen_descriptions = seen_descriptions.lock().unwrap();
4141 assert_eq!(seen_descriptions.len(), 2);
4142 assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4143 assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4144
4145 let events = events.lock().unwrap();
4146 assert!(events.iter().any(|event| matches!(
4147 event,
4148 AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4149 source,
4150 added,
4151 removed,
4152 changed,
4153 }) if source == "mcp:mock"
4154 && added == &vec!["dynamic".to_string()]
4155 && removed.is_empty()
4156 && changed.is_empty()
4157 )));
4158 }
4159
4160 #[tokio::test]
4161 async fn loop_passes_session_default_and_next_turn_cache_requests() {
4162 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4163 let agent = Agent::builder()
4164 .model(RecordingAdapter {
4165 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4166 seen_caches: seen_caches.clone(),
4167 })
4168 .permissions(AllowAllPermissions)
4169 .build()
4170 .unwrap();
4171
4172 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4173 .with_retention(PromptCacheRetention::Short);
4174 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4175 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4176 });
4177
4178 let mut driver = agent
4179 .start(SessionConfig {
4180 session_id: SessionId::new("session-cache"),
4181 metadata: MetadataMap::new(),
4182 cache: Some(default_cache.clone()),
4183 })
4184 .await
4185 .unwrap();
4186
4187 driver
4188 .submit_input(vec![Item {
4189 id: None,
4190 kind: ItemKind::User,
4191 parts: vec![Part::Text(TextPart {
4192 text: "first".into(),
4193 metadata: MetadataMap::new(),
4194 })],
4195 metadata: MetadataMap::new(),
4196 usage: None,
4197 finish_reason: None,
4198 created_at: None,
4199 }])
4200 .unwrap();
4201 let _ = driver.next().await.unwrap();
4202
4203 driver
4204 .submit_input_with_cache(
4205 vec![Item {
4206 id: None,
4207 kind: ItemKind::User,
4208 parts: vec![Part::Text(TextPart {
4209 text: "second".into(),
4210 metadata: MetadataMap::new(),
4211 })],
4212 metadata: MetadataMap::new(),
4213 usage: None,
4214 finish_reason: None,
4215 created_at: None,
4216 }],
4217 override_cache.clone(),
4218 )
4219 .unwrap();
4220 let _ = driver.next().await.unwrap();
4221
4222 let seen = seen_caches.lock().unwrap();
4223 assert_eq!(seen.len(), 2);
4224 assert_eq!(seen[0], Some(default_cache));
4225 assert_eq!(seen[1], Some(override_cache));
4226 }
4227
4228 #[tokio::test]
4229 async fn loop_yields_after_tool_result_between_rounds() {
4230 let tools = ToolRegistry::new().with(EchoTool::default());
4231 let agent = Agent::builder()
4232 .model(FakeAdapter)
4233 .add_tool_source(tools)
4234 .permissions(AllowAllPermissions)
4235 .build()
4236 .unwrap();
4237
4238 let mut driver = agent
4239 .start(SessionConfig {
4240 session_id: SessionId::new("yield-session"),
4241 metadata: MetadataMap::new(),
4242 cache: None,
4243 })
4244 .await
4245 .unwrap();
4246
4247 driver
4248 .submit_input(vec![Item::text(ItemKind::User, "ping")])
4249 .unwrap();
4250
4251 let step = driver.next().await.unwrap();
4254 let info = match step {
4255 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4256 other => panic!("expected AfterToolResult, got {other:?}"),
4257 };
4258 assert_eq!(info.session_id, SessionId::new("yield-session"));
4259 assert_eq!(info.transcript_len, 3);
4261
4262 let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4264 assert!(!interrupt.is_blocking());
4265
4266 driver
4268 .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4269 .unwrap();
4270
4271 let step = driver.next().await.unwrap();
4274 match step {
4275 LoopStep::Finished(turn) => {
4276 assert_eq!(turn.finish_reason, FinishReason::Completed);
4277 }
4278 other => panic!("expected Finished, got {other:?}"),
4279 }
4280
4281 let snapshot = driver.snapshot();
4283 let has_injected_message = snapshot.transcript.iter().any(|item| {
4284 item.kind == ItemKind::User
4285 && item.parts.iter().any(|part| match part {
4286 Part::Text(text) => text.text == "also: report back",
4287 _ => false,
4288 })
4289 });
4290 assert!(
4291 has_injected_message,
4292 "injected user message should be in transcript, got: {:?}",
4293 snapshot.transcript
4294 );
4295 }
4296
4297 struct RecordingTranscriptObserver {
4298 items: StdArc<StdMutex<Vec<Item>>>,
4299 }
4300
4301 impl TranscriptObserver for RecordingTranscriptObserver {
4302 fn on_item_appended(&self, item: &Item) {
4303 self.items.lock().unwrap().push(item.clone());
4304 }
4305 }
4306
4307 #[tokio::test]
4308 async fn observers_see_full_tool_round() {
4309 let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4315 let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4316 let agent = Agent::builder()
4317 .model(FakeAdapter)
4318 .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4319 .permissions(AllowAllPermissions)
4320 .observer(RecordingObserver {
4321 events: events.clone(),
4322 })
4323 .transcript_observer(RecordingTranscriptObserver {
4324 items: items.clone(),
4325 })
4326 .build()
4327 .unwrap();
4328
4329 let mut driver = agent
4330 .start(SessionConfig {
4331 session_id: SessionId::new("observer-session"),
4332 metadata: MetadataMap::new(),
4333 cache: None,
4334 })
4335 .await
4336 .unwrap();
4337
4338 driver
4339 .submit_input(vec![Item {
4340 id: None,
4341 kind: ItemKind::User,
4342 parts: vec![Part::Text(TextPart {
4343 text: "ping".into(),
4344 metadata: MetadataMap::new(),
4345 })],
4346 metadata: MetadataMap::new(),
4347 usage: None,
4348 finish_reason: None,
4349 created_at: None,
4350 }])
4351 .unwrap();
4352
4353 let result = run_until_finished(&mut driver).await;
4354 assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4355
4356 let events = events.lock().unwrap().clone();
4359 let tool_call_id = events.iter().find_map(|e| match e {
4360 AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4361 _ => None,
4362 });
4363 let tool_results: Vec<_> = events
4364 .iter()
4365 .filter_map(|e| match e {
4366 AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4367 _ => None,
4368 })
4369 .collect();
4370 assert_eq!(tool_results.len(), 1, "events: {events:?}");
4371 assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4372 assert!(!tool_results[0].is_error);
4373
4374 let items = items.lock().unwrap().clone();
4378 assert_eq!(items.len(), 4, "items: {items:?}");
4379 assert_eq!(items[0].kind, ItemKind::User);
4380 assert_eq!(items[1].kind, ItemKind::Assistant);
4381 assert!(
4382 items[1]
4383 .parts
4384 .iter()
4385 .any(|p| matches!(p, Part::ToolCall(_)))
4386 );
4387 assert_eq!(items[2].kind, ItemKind::Tool);
4388 assert!(
4389 items[2]
4390 .parts
4391 .iter()
4392 .any(|p| matches!(p, Part::ToolResult(_)))
4393 );
4394 assert_eq!(items[3].kind, ItemKind::Assistant);
4395 }
4396
4397 #[test]
4398 fn convenience_cache_builders_construct_expected_defaults() {
4399 let cache = PromptCacheRequest::automatic()
4400 .with_retention(PromptCacheRetention::Short)
4401 .with_key("workspace:demo");
4402 let session = SessionConfig::new("demo").with_cache(cache.clone());
4403
4404 assert_eq!(session.session_id, SessionId::new("demo"));
4405 assert_eq!(session.cache, Some(cache));
4406
4407 let explicit = PromptCacheRequest::explicit([
4408 PromptCacheBreakpoint::tools_end(),
4409 PromptCacheBreakpoint::transcript_item_end(2),
4410 PromptCacheBreakpoint::transcript_part_end(3, 1),
4411 ]);
4412
4413 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4414 assert_eq!(
4415 explicit.strategy,
4416 PromptCacheStrategy::Explicit {
4417 breakpoints: vec![
4418 PromptCacheBreakpoint::ToolsEnd,
4419 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4420 PromptCacheBreakpoint::TranscriptPartEnd {
4421 item_index: 3,
4422 part_index: 1,
4423 },
4424 ],
4425 }
4426 );
4427 }
4428}