1use std::collections::{BTreeMap, VecDeque};
50use std::sync::Arc;
51
52use agentkit_compaction::{
53 CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
54};
55use agentkit_core::{
56 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
57 TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
58};
59use agentkit_task_manager::{
60 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskAuth, TaskLaunchRequest, TaskManager,
61 TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
62};
63#[cfg(test)]
64use agentkit_tools_core::ToolContext;
65use agentkit_tools_core::{
66 ApprovalDecision, ApprovalRequest, AuthOperation, AuthRequest, AuthResolution,
67 BasicToolExecutor, OwnedToolContext, PermissionChecker, ToolError, ToolExecutor, ToolRegistry,
68 ToolRequest, ToolResources, ToolSpec,
69};
70use async_trait::async_trait;
71use serde::{Deserialize, Serialize};
72use thiserror::Error;
73
74const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
75const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
76const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
77const USER_CANCELLED_REASON: &str = "user_cancelled";
78
79#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct SessionConfig {
95 pub session_id: SessionId,
97 pub metadata: MetadataMap,
99 pub cache: Option<PromptCacheRequest>,
101}
102
103impl SessionConfig {
104 pub fn new(session_id: impl Into<SessionId>) -> Self {
106 Self {
107 session_id: session_id.into(),
108 metadata: MetadataMap::new(),
109 cache: None,
110 }
111 }
112
113 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
115 self.metadata = metadata;
116 self
117 }
118
119 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
121 self.cache = Some(cache);
122 self
123 }
124
125 pub fn without_cache(mut self) -> Self {
127 self.cache = None;
128 self
129 }
130}
131
132#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub enum PromptCacheMode {
139 Disabled,
141 #[default]
143 BestEffort,
144 Required,
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub enum PromptCacheRetention {
155 Default,
157 Short,
159 Extended,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
165pub enum PromptCacheStrategy {
166 #[default]
168 Automatic,
169 Explicit {
171 breakpoints: Vec<PromptCacheBreakpoint>,
173 },
174}
175
176impl PromptCacheStrategy {
177 pub fn automatic() -> Self {
180 Self::Automatic
181 }
182
183 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
185 Self::Explicit {
186 breakpoints: breakpoints.into_iter().collect(),
187 }
188 }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum PromptCacheBreakpoint {
194 ToolsEnd,
196 TranscriptItemEnd { index: usize },
198 TranscriptPartEnd {
204 item_index: usize,
205 part_index: usize,
206 },
207}
208
209impl PromptCacheBreakpoint {
210 pub fn tools_end() -> Self {
212 Self::ToolsEnd
213 }
214
215 pub fn transcript_item_end(index: usize) -> Self {
217 Self::TranscriptItemEnd { index }
218 }
219
220 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
222 Self::TranscriptPartEnd {
223 item_index,
224 part_index,
225 }
226 }
227}
228
229#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
231pub struct PromptCacheRequest {
232 pub mode: PromptCacheMode,
234 pub strategy: PromptCacheStrategy,
236 pub retention: Option<PromptCacheRetention>,
238 pub key: Option<String>,
240}
241
242impl PromptCacheRequest {
243 pub fn automatic() -> Self {
245 Self::best_effort(PromptCacheStrategy::automatic())
246 }
247
248 pub fn automatic_required() -> Self {
250 Self::required(PromptCacheStrategy::automatic())
251 }
252
253 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
255 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
256 }
257
258 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
260 Self::required(PromptCacheStrategy::explicit(breakpoints))
261 }
262
263 pub fn disabled() -> Self {
265 Self {
266 mode: PromptCacheMode::Disabled,
267 strategy: PromptCacheStrategy::Automatic,
268 retention: None,
269 key: None,
270 }
271 }
272
273 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
275 Self {
276 mode: PromptCacheMode::BestEffort,
277 strategy,
278 retention: None,
279 key: None,
280 }
281 }
282
283 pub fn required(strategy: PromptCacheStrategy) -> Self {
285 Self {
286 mode: PromptCacheMode::Required,
287 strategy,
288 retention: None,
289 key: None,
290 }
291 }
292
293 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
295 self.mode = mode;
296 self
297 }
298
299 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
301 self.strategy = strategy;
302 self
303 }
304
305 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
307 self.retention = Some(retention);
308 self
309 }
310
311 pub fn with_key(mut self, key: impl Into<String>) -> Self {
313 self.key = Some(key.into());
314 self
315 }
316
317 pub fn without_retention(mut self) -> Self {
319 self.retention = None;
320 self
321 }
322
323 pub fn without_key(mut self) -> Self {
325 self.key = None;
326 self
327 }
328
329 pub fn is_enabled(&self) -> bool {
331 !matches!(self.mode, PromptCacheMode::Disabled)
332 }
333}
334
335#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
341pub struct TurnRequest {
342 pub session_id: SessionId,
344 pub turn_id: agentkit_core::TurnId,
346 pub transcript: Vec<Item>,
348 pub available_tools: Vec<ToolSpec>,
350 pub cache: Option<PromptCacheRequest>,
352 pub metadata: MetadataMap,
354}
355
356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
361pub struct ModelTurnResult {
362 pub finish_reason: FinishReason,
364 pub output_items: Vec<Item>,
366 pub usage: Option<Usage>,
368 pub metadata: MetadataMap,
370}
371
372#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378pub enum ModelTurnEvent {
379 Delta(Delta),
381 ToolCall(ToolCallPart),
383 Usage(Usage),
385 Finished(ModelTurnResult),
387}
388
389#[async_trait]
426pub trait ModelAdapter: Send + Sync {
427 type Session: ModelSession;
429
430 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
436}
437
438#[async_trait]
445pub trait ModelSession: Send {
446 type Turn: ModelTurn;
448
449 async fn begin_turn(
462 &mut self,
463 request: TurnRequest,
464 cancellation: Option<TurnCancellation>,
465 ) -> Result<Self::Turn, LoopError>;
466}
467
468#[async_trait]
474pub trait ModelTurn: Send {
475 async fn next_event(
484 &mut self,
485 cancellation: Option<TurnCancellation>,
486 ) -> Result<Option<ModelTurnEvent>, LoopError>;
487}
488
489pub trait LoopObserver: Send {
509 fn handle_event(&mut self, event: AgentEvent);
511}
512
513#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
518pub enum AgentEvent {
519 RunStarted { session_id: SessionId },
521 TurnStarted {
523 session_id: SessionId,
524 turn_id: agentkit_core::TurnId,
525 },
526 InputAccepted {
528 session_id: SessionId,
529 items: Vec<Item>,
530 },
531 ContentDelta(Delta),
533 ToolCallRequested(ToolCallPart),
535 ApprovalRequired(ApprovalRequest),
537 AuthRequired(AuthRequest),
539 ApprovalResolved { approved: bool },
541 AuthResolved { provided: bool },
543 CompactionStarted {
545 session_id: SessionId,
546 turn_id: Option<agentkit_core::TurnId>,
547 reason: CompactionReason,
548 },
549 CompactionFinished {
551 session_id: SessionId,
552 turn_id: Option<agentkit_core::TurnId>,
553 replaced_items: usize,
554 transcript_len: usize,
555 metadata: MetadataMap,
556 },
557 UsageUpdated(Usage),
559 Warning { message: String },
561 RunFailed { message: String },
563 TurnFinished(TurnResult),
565}
566
567#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
589pub struct PendingApproval {
590 pub request: ApprovalRequest,
592}
593
594impl std::ops::Deref for PendingApproval {
595 type Target = ApprovalRequest;
596 fn deref(&self) -> &ApprovalRequest {
597 &self.request
598 }
599}
600
601impl PendingApproval {
602 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
604 let call_id = self
605 .request
606 .call_id
607 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
608 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
609 }
610
611 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
613 let call_id = self
614 .request
615 .call_id
616 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
617 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
618 }
619
620 pub fn deny_with_reason<S: ModelSession>(
622 self,
623 driver: &mut LoopDriver<S>,
624 reason: impl Into<String>,
625 ) -> Result<(), LoopError> {
626 let call_id = self
627 .request
628 .call_id
629 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
630 driver.resolve_approval_for(
631 call_id,
632 ApprovalDecision::Deny {
633 reason: Some(reason.into()),
634 },
635 )
636 }
637}
638
639#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660pub struct PendingAuth {
661 pub request: AuthRequest,
663}
664
665impl std::ops::Deref for PendingAuth {
666 type Target = AuthRequest;
667 fn deref(&self) -> &AuthRequest {
668 &self.request
669 }
670}
671
672impl PendingAuth {
673 pub fn provide<S: ModelSession>(
675 self,
676 driver: &mut LoopDriver<S>,
677 credentials: MetadataMap,
678 ) -> Result<(), LoopError> {
679 driver.resolve_auth(AuthResolution::Provided {
680 request: self.request,
681 credentials,
682 })
683 }
684
685 pub fn cancel<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
687 driver.resolve_auth(AuthResolution::Cancelled {
688 request: self.request,
689 })
690 }
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct InputRequest {
715 pub session_id: SessionId,
717 pub reason: String,
719}
720
721impl InputRequest {
722 pub fn submit<S: ModelSession>(
724 self,
725 driver: &mut LoopDriver<S>,
726 items: Vec<Item>,
727 ) -> Result<(), LoopError> {
728 driver.submit_input(items)
729 }
730}
731
732#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
737pub struct TurnResult {
738 pub turn_id: agentkit_core::TurnId,
740 pub finish_reason: FinishReason,
742 pub items: Vec<Item>,
744 pub usage: Option<Usage>,
746 pub metadata: MetadataMap,
748}
749
750#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
790pub enum LoopInterrupt {
791 ApprovalRequest(PendingApproval),
793 AuthRequest(PendingAuth),
795 AwaitingInput(InputRequest),
797 AfterToolResult(ToolRoundInfo),
807}
808
809impl LoopInterrupt {
810 pub fn is_blocking(&self) -> bool {
816 matches!(
817 self,
818 LoopInterrupt::ApprovalRequest(_) | LoopInterrupt::AuthRequest(_)
819 )
820 }
821}
822
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
826pub struct ToolRoundInfo {
827 pub session_id: SessionId,
829 pub turn_id: agentkit_core::TurnId,
831 pub transcript_len: usize,
833}
834
835impl ToolRoundInfo {
836 pub fn submit<S: ModelSession>(
838 &self,
839 driver: &mut LoopDriver<S>,
840 items: Vec<Item>,
841 ) -> Result<(), LoopError> {
842 driver.submit_input(items)
843 }
844}
845
846#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
874pub enum LoopStep {
875 Interrupt(LoopInterrupt),
877 Finished(TurnResult),
879}
880
881#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
887pub struct LoopSnapshot {
888 pub session_id: SessionId,
890 pub transcript: Vec<Item>,
892 pub pending_input: Vec<Item>,
894}
895
896#[derive(Clone, Debug)]
897struct PendingApprovalToolCall {
898 request: ApprovalRequest,
899 decision: Option<ApprovalDecision>,
900 surfaced: bool,
901 turn_id: agentkit_core::TurnId,
902 task_id: TaskId,
903 call: ToolCallPart,
904 tool_request: ToolRequest,
905}
906
907#[derive(Clone, Debug)]
908struct PendingAuthToolCall {
909 request: AuthRequest,
910 resolution: Option<AuthResolution>,
911 turn_id: agentkit_core::TurnId,
912 task_id: TaskId,
913 call: ToolCallPart,
914 tool_request: ToolRequest,
915}
916
917#[derive(Clone, Debug, Default)]
918struct ActiveToolRound {
919 turn_id: agentkit_core::TurnId,
920 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
921 background_pending: bool,
922 foreground_progressed: bool,
923}
924
925pub struct Agent<M>
958where
959 M: ModelAdapter,
960{
961 model: M,
962 tools: ToolRegistry,
963 task_manager: Arc<dyn TaskManager>,
964 permissions: Arc<dyn PermissionChecker>,
965 resources: Arc<dyn ToolResources>,
966 cancellation: Option<CancellationHandle>,
967 compaction: Option<CompactionConfig>,
968 observers: Vec<Box<dyn LoopObserver>>,
969}
970
971impl<M> Agent<M>
972where
973 M: ModelAdapter,
974{
975 pub fn builder() -> AgentBuilder<M> {
977 AgentBuilder::default()
978 }
979
980 pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
989 let session_id = config.session_id.clone();
990 let default_cache = config.cache.clone();
991 let session = self.model.start_session(config).await?;
992 let tool_executor = Arc::new(BasicToolExecutor::new(self.tools.clone()));
993 let mut driver = LoopDriver {
994 session_id: session_id.clone(),
995 default_cache,
996 next_turn_cache: None,
997 session: Some(session),
998 tool_executor,
999 task_manager: self.task_manager,
1000 permissions: self.permissions,
1001 resources: self.resources,
1002 cancellation: self.cancellation,
1003 compaction: self.compaction,
1004 observers: self.observers,
1005 transcript: Vec::new(),
1006 pending_input: Vec::new(),
1007 pending_approvals: BTreeMap::new(),
1008 pending_approval_order: VecDeque::new(),
1009 pending_auth: None,
1010 active_tool_round: None,
1011 pending_round_resume: None,
1012 next_turn_index: 1,
1013 };
1014 driver.emit(AgentEvent::RunStarted { session_id });
1015 Ok(driver)
1016 }
1017}
1018
1019pub struct AgentBuilder<M>
1025where
1026 M: ModelAdapter,
1027{
1028 model: Option<M>,
1029 tools: ToolRegistry,
1030 task_manager: Option<Arc<dyn TaskManager>>,
1031 permissions: Arc<dyn PermissionChecker>,
1032 resources: Arc<dyn ToolResources>,
1033 cancellation: Option<CancellationHandle>,
1034 compaction: Option<CompactionConfig>,
1035 observers: Vec<Box<dyn LoopObserver>>,
1036}
1037
1038impl<M> Default for AgentBuilder<M>
1039where
1040 M: ModelAdapter,
1041{
1042 fn default() -> Self {
1043 Self {
1044 model: None,
1045 tools: ToolRegistry::new(),
1046 task_manager: None,
1047 permissions: Arc::new(AllowAllPermissions),
1048 resources: Arc::new(()),
1049 cancellation: None,
1050 compaction: None,
1051 observers: Vec::new(),
1052 }
1053 }
1054}
1055
1056impl<M> AgentBuilder<M>
1057where
1058 M: ModelAdapter,
1059{
1060 pub fn model(mut self, model: M) -> Self {
1062 self.model = Some(model);
1063 self
1064 }
1065
1066 pub fn tools(mut self, tools: ToolRegistry) -> Self {
1068 self.tools = tools;
1069 self
1070 }
1071
1072 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1077 self.task_manager = Some(Arc::new(manager));
1078 self
1079 }
1080
1081 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1085 self.permissions = Arc::new(permissions);
1086 self
1087 }
1088
1089 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1091 self.resources = Arc::new(resources);
1092 self
1093 }
1094
1095 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1097 self.cancellation = Some(handle);
1098 self
1099 }
1100
1101 pub fn compaction(mut self, config: CompactionConfig) -> Self {
1106 self.compaction = Some(config);
1107 self
1108 }
1109
1110 pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1114 self.observers.push(Box::new(observer));
1115 self
1116 }
1117
1118 pub fn build(self) -> Result<Agent<M>, LoopError> {
1124 let model = self
1125 .model
1126 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1127 Ok(Agent {
1128 model,
1129 tools: self.tools,
1130 task_manager: self
1131 .task_manager
1132 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1133 permissions: self.permissions,
1134 resources: self.resources,
1135 cancellation: self.cancellation,
1136 compaction: self.compaction,
1137 observers: self.observers,
1138 })
1139 }
1140}
1141
1142pub struct LoopDriver<S>
1173where
1174 S: ModelSession,
1175{
1176 session_id: SessionId,
1177 default_cache: Option<PromptCacheRequest>,
1178 next_turn_cache: Option<PromptCacheRequest>,
1179 session: Option<S>,
1180 tool_executor: Arc<dyn ToolExecutor>,
1181 task_manager: Arc<dyn TaskManager>,
1182 permissions: Arc<dyn PermissionChecker>,
1183 resources: Arc<dyn ToolResources>,
1184 cancellation: Option<CancellationHandle>,
1185 compaction: Option<CompactionConfig>,
1186 observers: Vec<Box<dyn LoopObserver>>,
1187 transcript: Vec<Item>,
1188 pending_input: Vec<Item>,
1189 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1190 pending_approval_order: VecDeque<ToolCallId>,
1191 pending_auth: Option<PendingAuthToolCall>,
1192 active_tool_round: Option<ActiveToolRound>,
1193 pending_round_resume: Option<agentkit_core::TurnId>,
1194 next_turn_index: u64,
1195}
1196
1197impl<S> LoopDriver<S>
1198where
1199 S: ModelSession,
1200{
1201 fn start_task_via_manager(
1202 &self,
1203 task_id: Option<TaskId>,
1204 tool_request: ToolRequest,
1205 approved_request: Option<ApprovalRequest>,
1206 cancellation: Option<TurnCancellation>,
1207 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1208 {
1209 let task_manager = self.task_manager.clone();
1210 let tool_executor = self.tool_executor.clone();
1211 let permissions = self.permissions.clone();
1212 let resources = self.resources.clone();
1213 let session_id = self.session_id.clone();
1214 let turn_id = tool_request.turn_id.clone();
1215 let metadata = tool_request.metadata.clone();
1216
1217 async move {
1218 task_manager
1219 .start_task(
1220 TaskLaunchRequest {
1221 task_id,
1222 request: tool_request.clone(),
1223 approved_request,
1224 },
1225 TaskStartContext {
1226 executor: tool_executor,
1227 tool_context: OwnedToolContext {
1228 session_id,
1229 turn_id,
1230 metadata,
1231 permissions,
1232 resources,
1233 cancellation,
1234 },
1235 },
1236 )
1237 .await
1238 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1239 }
1240 }
1241
1242 fn has_pending_interrupts(&self) -> bool {
1243 self.pending_auth.is_some() || !self.pending_approvals.is_empty()
1244 }
1245
1246 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1247 let call_id = task.tool_request.call_id.clone();
1248 let call = ToolCallPart {
1249 id: call_id.clone(),
1250 name: task.tool_request.tool_name.to_string(),
1251 input: task.tool_request.input.clone(),
1252 metadata: task.tool_request.metadata.clone(),
1253 };
1254 let mut request = task.approval;
1255 request.call_id = Some(call_id.clone());
1256 let pending = PendingApprovalToolCall {
1257 request: request.clone(),
1258 decision: None,
1259 surfaced: false,
1260 turn_id: turn_id.clone(),
1261 task_id: task.task_id,
1262 call,
1263 tool_request: task.tool_request,
1264 };
1265 self.pending_approvals.insert(call_id.clone(), pending);
1266 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1267 self.pending_approval_order.push_back(call_id);
1268 }
1269 self.emit(AgentEvent::ApprovalRequired(request));
1270 }
1271
1272 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1273 for call_id in self.pending_approval_order.clone() {
1274 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1275 continue;
1276 };
1277 if pending.decision.is_none() && !pending.surfaced {
1278 pending.surfaced = true;
1279 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1280 PendingApproval {
1281 request: pending.request.clone(),
1282 },
1283 )));
1284 }
1285 }
1286 None
1287 }
1288
1289 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1290 self.pending_approval_order.iter().find_map(|call_id| {
1291 self.pending_approvals.get(call_id).and_then(|pending| {
1292 pending.decision.is_none().then(|| {
1293 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1294 request: pending.request.clone(),
1295 }))
1296 })
1297 })
1298 })
1299 }
1300
1301 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1302 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1303 self.pending_approvals
1304 .get(call_id)
1305 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1306 })?;
1307 self.pending_approval_order.retain(|id| id != &call_id);
1308 self.pending_approvals.remove(&call_id)
1309 }
1310
1311 fn pending_auth_interrupt(&self) -> Option<LoopStep> {
1312 self.pending_auth.as_ref().and_then(|pending| {
1313 pending.resolution.is_none().then(|| {
1314 LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth {
1315 request: pending.request.clone(),
1316 }))
1317 })
1318 })
1319 }
1320
1321 fn queue_auth_interrupt(
1322 &mut self,
1323 turn_id: &agentkit_core::TurnId,
1324 task: TaskAuth,
1325 ) -> LoopStep {
1326 let call = ToolCallPart {
1327 id: task.tool_request.call_id.clone(),
1328 name: task.tool_request.tool_name.to_string(),
1329 input: task.tool_request.input.clone(),
1330 metadata: task.tool_request.metadata.clone(),
1331 };
1332 let request = upgrade_auth_request(task.auth, &task.tool_request, &call);
1333 self.pending_auth = Some(PendingAuthToolCall {
1334 request: request.clone(),
1335 resolution: None,
1336 turn_id: turn_id.clone(),
1337 task_id: task.task_id,
1338 call,
1339 tool_request: task.tool_request,
1340 });
1341 self.emit(AgentEvent::AuthRequired(request.clone()));
1342 LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth { request }))
1343 }
1344
1345 fn queue_resolution_interrupt(
1346 &mut self,
1347 turn_id: &agentkit_core::TurnId,
1348 resolution: TaskResolution,
1349 ) -> Option<LoopStep> {
1350 match resolution {
1351 TaskResolution::Item(item) => {
1352 self.transcript.push(item);
1353 None
1354 }
1355 TaskResolution::Approval(task) => {
1356 self.enqueue_pending_approval(turn_id, task);
1357 self.take_next_unsurfaced_approval_interrupt()
1358 }
1359 TaskResolution::Auth(task) => Some(self.queue_auth_interrupt(turn_id, task)),
1360 }
1361 }
1362
1363 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1364 let PendingLoopUpdates { mut resolutions } = self
1365 .task_manager
1366 .take_pending_loop_updates()
1367 .await
1368 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1369 let mut saw_items = false;
1370 while let Some(resolution) = resolutions.pop_front() {
1371 match resolution {
1372 TaskResolution::Item(item) => {
1373 self.transcript.push(item);
1374 saw_items = true;
1375 }
1376 TaskResolution::Approval(task) => {
1377 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1378 }
1379 TaskResolution::Auth(task) => {
1380 return Ok((
1381 saw_items,
1382 Some(self.queue_auth_interrupt(&task.tool_request.turn_id.clone(), task)),
1383 ));
1384 }
1385 }
1386 }
1387 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1388 }
1389
1390 async fn maybe_compact(
1391 &mut self,
1392 turn_id: Option<&agentkit_core::TurnId>,
1393 cancellation: Option<TurnCancellation>,
1394 ) -> Result<(), LoopError> {
1395 let Some(compaction) = self.compaction.as_ref().cloned() else {
1396 return Ok(());
1397 };
1398 if cancellation
1399 .as_ref()
1400 .is_some_and(TurnCancellation::is_cancelled)
1401 {
1402 return Err(LoopError::Cancelled);
1403 }
1404 let Some(reason) =
1405 compaction
1406 .trigger
1407 .should_compact(&self.session_id, turn_id, &self.transcript)
1408 else {
1409 return Ok(());
1410 };
1411
1412 self.emit(AgentEvent::CompactionStarted {
1413 session_id: self.session_id.clone(),
1414 turn_id: turn_id.cloned(),
1415 reason: reason.clone(),
1416 });
1417
1418 let CompactionResult {
1419 transcript,
1420 replaced_items,
1421 metadata,
1422 } = compaction
1423 .strategy
1424 .apply(
1425 agentkit_compaction::CompactionRequest {
1426 session_id: self.session_id.clone(),
1427 turn_id: turn_id.cloned(),
1428 transcript: self.transcript.clone(),
1429 reason,
1430 metadata: compaction.metadata.clone(),
1431 },
1432 &mut CompactionContext {
1433 backend: compaction.backend.as_deref(),
1434 metadata: &compaction.metadata,
1435 cancellation,
1436 },
1437 )
1438 .await
1439 .map_err(|error| match error {
1440 agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1441 other => LoopError::Compaction(other.to_string()),
1442 })?;
1443
1444 self.transcript = transcript;
1445 self.emit(AgentEvent::CompactionFinished {
1446 session_id: self.session_id.clone(),
1447 turn_id: turn_id.cloned(),
1448 replaced_items,
1449 transcript_len: self.transcript.len(),
1450 metadata,
1451 });
1452 Ok(())
1453 }
1454
1455 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1456 let Some(_) = self.active_tool_round.as_ref() else {
1457 return Ok(None);
1458 };
1459 loop {
1460 let cancellation = self
1461 .cancellation
1462 .as_ref()
1463 .map(CancellationHandle::checkpoint);
1464 let turn_id = self
1465 .active_tool_round
1466 .as_ref()
1467 .map(|active| active.turn_id.clone())
1468 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1469
1470 if cancellation
1471 .as_ref()
1472 .is_some_and(TurnCancellation::is_cancelled)
1473 {
1474 self.task_manager
1475 .on_turn_interrupted(&turn_id)
1476 .await
1477 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1478 self.active_tool_round = None;
1479 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1480 }
1481
1482 let next_call = self
1483 .active_tool_round
1484 .as_mut()
1485 .and_then(|active| active.pending_calls.pop_front());
1486 if let Some((_call, tool_request)) = next_call {
1487 match self
1488 .start_task_via_manager(None, tool_request.clone(), None, cancellation.clone())
1489 .await?
1490 {
1491 TaskStartOutcome::Ready(resolution) => {
1492 let resolution = *resolution;
1493 match resolution {
1494 TaskResolution::Item(item) => {
1495 if let Some(active) = self.active_tool_round.as_mut() {
1496 active.foreground_progressed = true;
1497 }
1498 self.transcript.push(item);
1499 }
1500 TaskResolution::Approval(task) => {
1501 self.enqueue_pending_approval(&turn_id, task);
1502 }
1503 TaskResolution::Auth(task) => {
1504 return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1505 }
1506 }
1507 continue;
1508 }
1509 TaskStartOutcome::Pending { kind, .. } => {
1510 if kind == agentkit_task_manager::TaskKind::Background
1511 && let Some(active) = self.active_tool_round.as_mut()
1512 {
1513 active.background_pending = true;
1514 }
1515 continue;
1516 }
1517 }
1518 }
1519
1520 match self
1521 .task_manager
1522 .wait_for_turn(&turn_id, cancellation.clone())
1523 .await
1524 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1525 {
1526 Some(TurnTaskUpdate::Resolution(resolution)) => {
1527 let resolution = *resolution;
1528 match resolution {
1529 TaskResolution::Item(item) => {
1530 if let Some(active) = self.active_tool_round.as_mut() {
1531 active.foreground_progressed = true;
1532 }
1533 self.transcript.push(item);
1534 }
1535 TaskResolution::Approval(task) => {
1536 self.enqueue_pending_approval(&turn_id, task);
1537 }
1538 TaskResolution::Auth(task) => {
1539 return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1540 }
1541 }
1542 }
1543 Some(TurnTaskUpdate::Detached(snapshot)) => {
1544 self.transcript.push(Item {
1548 id: None,
1549 kind: ItemKind::Tool,
1550 parts: vec![Part::ToolResult(ToolResultPart {
1551 call_id: snapshot.call_id,
1552 output: ToolOutput::Text(format!(
1553 "Tool {} is now running in the background. \
1554 The result will be delivered when it completes.",
1555 snapshot.tool_name,
1556 )),
1557 is_error: false,
1558 metadata: MetadataMap::new(),
1559 })],
1560 metadata: MetadataMap::new(),
1561 });
1562 if let Some(active) = self.active_tool_round.as_mut() {
1563 active.background_pending = true;
1564 active.foreground_progressed = true;
1565 }
1566 }
1567 None => {
1568 if cancellation
1569 .as_ref()
1570 .is_some_and(TurnCancellation::is_cancelled)
1571 {
1572 self.task_manager
1573 .on_turn_interrupted(&turn_id)
1574 .await
1575 .map_err(|error| {
1576 LoopError::Tool(ToolError::Internal(error.to_string()))
1577 })?;
1578 self.active_tool_round = None;
1579 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1580 }
1581 let active = self.active_tool_round.take().ok_or_else(|| {
1582 LoopError::InvalidState("missing active tool round".into())
1583 })?;
1584 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1585 return Ok(Some(step));
1586 }
1587 if let Some(step) = self.pending_auth_interrupt() {
1588 return Ok(Some(step));
1589 }
1590 if let Some(step) = self.next_unresolved_approval_interrupt() {
1591 return Ok(Some(step));
1592 }
1593 if active.background_pending && !active.foreground_progressed {
1594 return Ok(None);
1595 }
1596 let info = ToolRoundInfo {
1603 session_id: self.session_id.clone(),
1604 turn_id: turn_id.clone(),
1605 transcript_len: self.transcript.len(),
1606 };
1607 self.pending_round_resume = Some(turn_id);
1608 return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1609 info,
1610 ))));
1611 }
1612 }
1613 }
1614 }
1615
1616 async fn drive_turn(
1617 &mut self,
1618 turn_id: agentkit_core::TurnId,
1619 emit_started: bool,
1620 ) -> Result<LoopStep, LoopError> {
1621 let cancellation = self
1622 .cancellation
1623 .as_ref()
1624 .map(CancellationHandle::checkpoint);
1625 match self
1626 .maybe_compact(Some(&turn_id), cancellation.clone())
1627 .await
1628 {
1629 Ok(()) => {}
1630 Err(LoopError::Cancelled) => {
1631 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1632 }
1633 Err(error) => return Err(error),
1634 }
1635 if emit_started {
1636 self.emit(AgentEvent::TurnStarted {
1637 session_id: self.session_id.clone(),
1638 turn_id: turn_id.clone(),
1639 });
1640 }
1641 if cancellation
1642 .as_ref()
1643 .is_some_and(TurnCancellation::is_cancelled)
1644 {
1645 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1646 }
1647
1648 let request = TurnRequest {
1649 session_id: self.session_id.clone(),
1650 turn_id: turn_id.clone(),
1651 transcript: self.transcript.clone(),
1652 available_tools: self.tool_executor.specs(),
1653 cache: self
1654 .next_turn_cache
1655 .take()
1656 .or_else(|| self.default_cache.clone()),
1657 metadata: MetadataMap::new(),
1658 };
1659
1660 let session = self
1661 .session
1662 .as_mut()
1663 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1664 let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1665 Ok(turn) => turn,
1666 Err(LoopError::Cancelled) => {
1667 self.task_manager
1668 .on_turn_interrupted(&turn_id)
1669 .await
1670 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1671 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1672 }
1673 Err(error) => return Err(error),
1674 };
1675 let mut saw_tool_call = false;
1676 let mut finished_result = None;
1677
1678 while let Some(event) = match turn.next_event(cancellation.clone()).await {
1679 Ok(event) => event,
1680 Err(LoopError::Cancelled) => {
1681 self.task_manager
1682 .on_turn_interrupted(&turn_id)
1683 .await
1684 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1685 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1686 }
1687 Err(error) => return Err(error),
1688 } {
1689 if cancellation
1690 .as_ref()
1691 .is_some_and(TurnCancellation::is_cancelled)
1692 {
1693 self.task_manager
1694 .on_turn_interrupted(&turn_id)
1695 .await
1696 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1697 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1698 }
1699 match event {
1700 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1701 ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1702 ModelTurnEvent::ToolCall(call) => {
1703 saw_tool_call = true;
1704 self.emit(AgentEvent::ToolCallRequested(call.clone()));
1705 }
1706 ModelTurnEvent::Finished(result) => {
1707 finished_result = Some(result);
1708 break;
1709 }
1710 }
1711 }
1712
1713 let result = finished_result.ok_or_else(|| {
1714 LoopError::Provider("model turn ended without a Finished event".into())
1715 })?;
1716 self.transcript.extend(result.output_items.clone());
1717
1718 if saw_tool_call {
1719 let pending_calls = extract_tool_calls(&result.output_items)
1720 .into_iter()
1721 .map(|call| {
1722 let tool_request = ToolRequest {
1723 call_id: call.id.clone(),
1724 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1725 input: call.input.clone(),
1726 session_id: self.session_id.clone(),
1727 turn_id: turn_id.clone(),
1728 metadata: call.metadata.clone(),
1729 };
1730 (call, tool_request)
1731 })
1732 .collect();
1733 self.active_tool_round = Some(ActiveToolRound {
1734 turn_id: turn_id.clone(),
1735 pending_calls,
1736 background_pending: false,
1737 foreground_progressed: false,
1738 });
1739 if let Some(step) = self.continue_active_tool_round().await? {
1740 return Ok(step);
1741 }
1742 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1743 InputRequest {
1744 session_id: self.session_id.clone(),
1745 reason: "driver is waiting for input".into(),
1746 },
1747 )));
1748 }
1749
1750 let turn_result = TurnResult {
1751 turn_id,
1752 finish_reason: result.finish_reason,
1753 items: result.output_items,
1754 usage: result.usage,
1755 metadata: result.metadata,
1756 };
1757 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1758 Ok(LoopStep::Finished(turn_result))
1759 }
1760
1761 async fn resume_after_auth(
1762 &mut self,
1763 pending: PendingAuthToolCall,
1764 ) -> Result<LoopStep, LoopError> {
1765 let resolution = pending
1766 .resolution
1767 .clone()
1768 .ok_or_else(|| LoopError::InvalidState("pending auth has no resolution".into()))?;
1769 match resolution {
1770 AuthResolution::Provided { .. } => match self
1771 .start_task_via_manager(
1772 Some(pending.task_id.clone()),
1773 pending.tool_request.clone(),
1774 None,
1775 self.cancellation
1776 .as_ref()
1777 .map(CancellationHandle::checkpoint),
1778 )
1779 .await?
1780 {
1781 TaskStartOutcome::Ready(resolution) => {
1782 let resolution = *resolution;
1783 if let Some(step) =
1784 self.queue_resolution_interrupt(&pending.turn_id, resolution)
1785 {
1786 return Ok(step);
1787 }
1788 }
1789 TaskStartOutcome::Pending { .. } => {}
1790 },
1791 AuthResolution::Cancelled { .. } => {
1792 self.transcript.push(Item {
1793 id: None,
1794 kind: ItemKind::Tool,
1795 parts: vec![Part::ToolResult(ToolResultPart {
1796 call_id: pending.call.id.clone(),
1797 output: ToolOutput::Text("auth cancelled".into()),
1798 is_error: true,
1799 metadata: pending.call.metadata.clone(),
1800 })],
1801 metadata: MetadataMap::new(),
1802 });
1803 }
1804 }
1805
1806 if let Some(step) = self.continue_active_tool_round().await? {
1807 Ok(step)
1808 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1809 Ok(step)
1810 } else if let Some(step) = self.pending_auth_interrupt() {
1811 Ok(step)
1812 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1813 Ok(step)
1814 } else {
1815 self.drive_turn(pending.turn_id, false).await
1816 }
1817 }
1818
1819 async fn resume_after_approval(
1820 &mut self,
1821 pending: PendingApprovalToolCall,
1822 ) -> Result<LoopStep, LoopError> {
1823 let decision = pending
1824 .decision
1825 .clone()
1826 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1827
1828 match decision {
1829 ApprovalDecision::Approve => match self
1830 .start_task_via_manager(
1831 Some(pending.task_id.clone()),
1832 pending.tool_request.clone(),
1833 Some(pending.request.clone()),
1834 self.cancellation
1835 .as_ref()
1836 .map(CancellationHandle::checkpoint),
1837 )
1838 .await?
1839 {
1840 TaskStartOutcome::Ready(resolution) => {
1841 let resolution = *resolution;
1842 if let Some(step) =
1843 self.queue_resolution_interrupt(&pending.turn_id, resolution)
1844 {
1845 return Ok(step);
1846 }
1847 }
1848 TaskStartOutcome::Pending { .. } => {}
1849 },
1850 ApprovalDecision::Deny { reason } => {
1851 self.transcript.push(Item {
1852 id: None,
1853 kind: ItemKind::Tool,
1854 parts: vec![Part::ToolResult(ToolResultPart {
1855 call_id: pending.call.id.clone(),
1856 output: ToolOutput::Text(
1857 reason.unwrap_or_else(|| "approval denied".into()),
1858 ),
1859 is_error: true,
1860 metadata: pending.call.metadata.clone(),
1861 })],
1862 metadata: MetadataMap::new(),
1863 });
1864 }
1865 }
1866
1867 if let Some(step) = self.continue_active_tool_round().await? {
1868 Ok(step)
1869 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1870 Ok(step)
1871 } else if let Some(step) = self.pending_auth_interrupt() {
1872 Ok(step)
1873 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1874 Ok(step)
1875 } else {
1876 self.drive_turn(pending.turn_id, false).await
1877 }
1878 }
1879
1880 fn finish_cancelled(
1881 &mut self,
1882 turn_id: agentkit_core::TurnId,
1883 items: Vec<Item>,
1884 ) -> Result<LoopStep, LoopError> {
1885 self.transcript.extend(items.clone());
1886 let turn_result = TurnResult {
1887 turn_id,
1888 finish_reason: FinishReason::Cancelled,
1889 items,
1890 usage: None,
1891 metadata: interrupted_metadata("turn"),
1892 };
1893 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1894 Ok(LoopStep::Finished(turn_result))
1895 }
1896
1897 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1906 if self.has_pending_interrupts() {
1907 return Err(LoopError::InvalidState(
1908 "cannot submit input while an interrupt is pending".into(),
1909 ));
1910 }
1911 self.emit(AgentEvent::InputAccepted {
1912 session_id: self.session_id.clone(),
1913 items: input.clone(),
1914 });
1915 self.pending_input.extend(input);
1916 Ok(())
1917 }
1918
1919 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
1924 if self.has_pending_interrupts() {
1925 return Err(LoopError::InvalidState(
1926 "cannot update next-turn cache while an interrupt is pending".into(),
1927 ));
1928 }
1929 self.next_turn_cache = Some(cache);
1930 Ok(())
1931 }
1932
1933 pub fn submit_input_with_cache(
1936 &mut self,
1937 input: Vec<Item>,
1938 cache: PromptCacheRequest,
1939 ) -> Result<(), LoopError> {
1940 self.set_next_turn_cache(cache)?;
1941 self.submit_input(input)
1942 }
1943
1944 pub fn resolve_approval_for(
1954 &mut self,
1955 call_id: ToolCallId,
1956 decision: ApprovalDecision,
1957 ) -> Result<(), LoopError> {
1958 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1959 return Err(LoopError::InvalidState(format!(
1960 "no approval request is pending for call {}",
1961 call_id.0
1962 )));
1963 };
1964 pending.decision = Some(decision.clone());
1965 self.emit(AgentEvent::ApprovalResolved {
1966 approved: matches!(decision, ApprovalDecision::Approve),
1967 });
1968 Ok(())
1969 }
1970
1971 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
1974 let mut unresolved = self
1975 .pending_approval_order
1976 .iter()
1977 .filter(|call_id| {
1978 self.pending_approvals
1979 .get(*call_id)
1980 .is_some_and(|pending| pending.decision.is_none())
1981 })
1982 .cloned();
1983 let Some(call_id) = unresolved.next() else {
1984 return Err(LoopError::InvalidState(
1985 "no approval request is pending".into(),
1986 ));
1987 };
1988 if unresolved.next().is_some() {
1989 return Err(LoopError::InvalidState(
1990 "multiple approvals are pending; use resolve_approval_for".into(),
1991 ));
1992 }
1993 self.resolve_approval_for(call_id, decision)
1994 }
1995
1996 pub fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), LoopError> {
2007 let Some(pending) = self.pending_auth.as_mut() else {
2008 return Err(LoopError::InvalidState("no auth request is pending".into()));
2009 };
2010 if pending.request.id != resolution.request().id {
2011 return Err(LoopError::InvalidState(
2012 "auth resolution does not match the pending request".into(),
2013 ));
2014 }
2015 pending.resolution = Some(resolution.clone());
2016 self.emit(AgentEvent::AuthResolved {
2017 provided: matches!(resolution, AuthResolution::Provided { .. }),
2018 });
2019 Ok(())
2020 }
2021
2022 pub fn snapshot(&self) -> LoopSnapshot {
2024 LoopSnapshot {
2025 session_id: self.session_id.clone(),
2026 transcript: self.transcript.clone(),
2027 pending_input: self.pending_input.clone(),
2028 }
2029 }
2030
2031 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2046 if self
2047 .pending_auth
2048 .as_ref()
2049 .is_some_and(|pending| pending.resolution.is_some())
2050 {
2051 let pending = self
2052 .pending_auth
2053 .take()
2054 .ok_or_else(|| LoopError::InvalidState("missing pending auth state".into()))?;
2055 return self.resume_after_auth(pending).await;
2056 }
2057
2058 if let Some(pending) = self.take_next_resolved_approval() {
2059 return self.resume_after_approval(pending).await;
2060 }
2061
2062 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2063 return Ok(step);
2064 }
2065
2066 if let Some(step) = self.pending_auth_interrupt() {
2067 return Ok(step);
2068 }
2069
2070 if let Some(step) = self.next_unresolved_approval_interrupt() {
2071 return Ok(step);
2072 }
2073
2074 if let Some(step) = self.continue_active_tool_round().await? {
2075 return Ok(step);
2076 }
2077
2078 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2079 if let Some(step) = loop_step {
2080 return Ok(step);
2081 }
2082
2083 if let Some(turn_id) = self.pending_round_resume.take() {
2088 self.transcript.append(&mut self.pending_input);
2089 return self.drive_turn(turn_id, false).await;
2090 }
2091
2092 if self.pending_input.is_empty() && !had_loop_updates {
2093 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2094 InputRequest {
2095 session_id: self.session_id.clone(),
2096 reason: "driver is waiting for input".into(),
2097 },
2098 )));
2099 }
2100
2101 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2102 self.next_turn_index += 1;
2103 self.transcript.append(&mut self.pending_input);
2104 self.drive_turn(turn_id, true).await
2105 }
2106
2107 fn emit(&mut self, event: AgentEvent) {
2108 for observer in &mut self.observers {
2109 observer.handle_event(event.clone());
2110 }
2111 }
2112}
2113
2114fn interrupted_metadata(stage: &str) -> MetadataMap {
2115 let mut metadata = MetadataMap::new();
2116 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2117 metadata.insert(
2118 INTERRUPT_REASON_METADATA_KEY.into(),
2119 USER_CANCELLED_REASON.into(),
2120 );
2121 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2122 metadata
2123}
2124
2125fn interrupted_assistant_items() -> Vec<Item> {
2126 vec![Item {
2127 id: None,
2128 kind: ItemKind::Assistant,
2129 parts: vec![Part::Text(TextPart {
2130 text: "Previous assistant response was interrupted by the user before completion."
2131 .into(),
2132 metadata: interrupted_metadata("assistant"),
2133 })],
2134 metadata: interrupted_metadata("assistant"),
2135 }]
2136}
2137
2138fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2139 let mut calls = Vec::new();
2140 for item in items {
2141 for part in &item.parts {
2142 if let Part::ToolCall(call) = part {
2143 calls.push(call.clone());
2144 }
2145 }
2146 }
2147 calls
2148}
2149
2150fn upgrade_auth_request(
2151 mut request: AuthRequest,
2152 tool_request: &ToolRequest,
2153 _call: &ToolCallPart,
2154) -> AuthRequest {
2155 if matches!(request.operation, AuthOperation::ToolCall { .. }) {
2156 return request;
2157 }
2158
2159 let prior_server_id = request.challenge.get("server_id").cloned();
2160 let mut metadata = tool_request.metadata.clone();
2161 if let Some(server_id) = prior_server_id {
2162 metadata.entry("server_id".into()).or_insert(server_id);
2163 }
2164 request.operation = AuthOperation::ToolCall {
2165 tool_name: tool_request.tool_name.0.clone(),
2166 input: tool_request.input.clone(),
2167 call_id: Some(tool_request.call_id.clone()),
2168 session_id: Some(tool_request.session_id.clone()),
2169 turn_id: Some(tool_request.turn_id.clone()),
2170 metadata,
2171 };
2172 request
2173}
2174
2175struct AllowAllPermissions;
2176
2177impl PermissionChecker for AllowAllPermissions {
2178 fn evaluate(
2179 &self,
2180 _request: &dyn agentkit_tools_core::PermissionRequest,
2181 ) -> agentkit_tools_core::PermissionDecision {
2182 agentkit_tools_core::PermissionDecision::Allow
2183 }
2184}
2185
2186#[derive(Debug, Error)]
2188pub enum LoopError {
2189 #[error("invalid driver state: {0}")]
2191 InvalidState(String),
2192 #[error("turn cancelled")]
2194 Cancelled,
2195 #[error("provider error: {0}")]
2197 Provider(String),
2198 #[error("tool error: {0}")]
2200 Tool(#[from] ToolError),
2201 #[error("compaction error: {0}")]
2203 Compaction(String),
2204 #[error("unsupported operation: {0}")]
2206 Unsupported(String),
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211 use std::collections::VecDeque;
2212 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2213 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2214
2215 use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2216 use agentkit_core::{
2217 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2218 };
2219 use agentkit_task_manager::{
2220 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2221 TaskRoutingPolicy,
2222 };
2223 use agentkit_tools_core::{
2224 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2225 ToolAnnotations, ToolName, ToolResult, ToolSpec,
2226 };
2227 use serde_json::{Value, json};
2228 use tokio::sync::Notify;
2229 use tokio::time::{Duration, timeout};
2230
2231 use super::*;
2232
2233 struct FakeAdapter;
2234 struct SlowAdapter;
2235 struct RecordingAdapter {
2236 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2237 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2238 }
2239 struct MultiToolAdapter;
2240 struct DualApprovalAdapter;
2241
2242 struct FakeSession;
2243 struct SlowSession;
2244 struct RecordingSession {
2245 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2246 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2247 }
2248 struct MultiToolSession;
2249 struct DualApprovalSession;
2250
2251 struct FakeTurn {
2252 events: VecDeque<ModelTurnEvent>,
2253 }
2254
2255 struct SlowTurn {
2256 emitted: bool,
2257 }
2258
2259 struct RecordingTurn {
2260 emitted: bool,
2261 }
2262 struct MultiToolTurn {
2263 events: VecDeque<ModelTurnEvent>,
2264 }
2265 struct DualApprovalTurn {
2266 events: VecDeque<ModelTurnEvent>,
2267 }
2268
2269 #[async_trait]
2270 impl ModelAdapter for FakeAdapter {
2271 type Session = FakeSession;
2272
2273 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2274 Ok(FakeSession)
2275 }
2276 }
2277
2278 #[async_trait]
2279 impl ModelAdapter for SlowAdapter {
2280 type Session = SlowSession;
2281
2282 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2283 Ok(SlowSession)
2284 }
2285 }
2286
2287 #[async_trait]
2288 impl ModelAdapter for RecordingAdapter {
2289 type Session = RecordingSession;
2290
2291 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2292 Ok(RecordingSession {
2293 seen_descriptions: self.seen_descriptions.clone(),
2294 seen_caches: self.seen_caches.clone(),
2295 })
2296 }
2297 }
2298
2299 #[async_trait]
2300 impl ModelAdapter for MultiToolAdapter {
2301 type Session = MultiToolSession;
2302
2303 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2304 Ok(MultiToolSession)
2305 }
2306 }
2307
2308 #[async_trait]
2309 impl ModelAdapter for DualApprovalAdapter {
2310 type Session = DualApprovalSession;
2311
2312 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2313 Ok(DualApprovalSession)
2314 }
2315 }
2316
2317 #[async_trait]
2318 impl ModelSession for FakeSession {
2319 type Turn = FakeTurn;
2320
2321 async fn begin_turn(
2322 &mut self,
2323 request: TurnRequest,
2324 _cancellation: Option<TurnCancellation>,
2325 ) -> Result<Self::Turn, LoopError> {
2326 let has_tool_result = request.transcript.iter().any(|item| {
2327 item.kind == ItemKind::Tool
2328 && item
2329 .parts
2330 .iter()
2331 .any(|part| matches!(part, Part::ToolResult(_)))
2332 });
2333 let tool_name = request
2334 .available_tools
2335 .first()
2336 .map(|tool| tool.name.0.clone())
2337 .unwrap_or_else(|| "echo".into());
2338
2339 let events = if has_tool_result {
2340 let result_text = request
2341 .transcript
2342 .iter()
2343 .rev()
2344 .find_map(|item| {
2345 item.parts.iter().find_map(|part| match part {
2346 Part::ToolResult(ToolResultPart {
2347 output: ToolOutput::Text(text),
2348 ..
2349 }) => Some(text.clone()),
2350 _ => None,
2351 })
2352 })
2353 .unwrap_or_else(|| "missing".into());
2354
2355 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2356 finish_reason: FinishReason::Completed,
2357 output_items: vec![Item {
2358 id: None,
2359 kind: ItemKind::Assistant,
2360 parts: vec![Part::Text(TextPart {
2361 text: format!("tool said: {result_text}"),
2362 metadata: MetadataMap::new(),
2363 })],
2364 metadata: MetadataMap::new(),
2365 }],
2366 usage: None,
2367 metadata: MetadataMap::new(),
2368 })])
2369 } else {
2370 VecDeque::from([
2371 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2372 id: ToolCallId::new("call-1"),
2373 name: tool_name.clone(),
2374 input: json!({ "value": "pong" }),
2375 metadata: MetadataMap::new(),
2376 }),
2377 ModelTurnEvent::Finished(ModelTurnResult {
2378 finish_reason: FinishReason::ToolCall,
2379 output_items: vec![Item {
2380 id: None,
2381 kind: ItemKind::Assistant,
2382 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2383 id: ToolCallId::new("call-1"),
2384 name: tool_name,
2385 input: json!({ "value": "pong" }),
2386 metadata: MetadataMap::new(),
2387 })],
2388 metadata: MetadataMap::new(),
2389 }],
2390 usage: None,
2391 metadata: MetadataMap::new(),
2392 }),
2393 ])
2394 };
2395
2396 Ok(FakeTurn { events })
2397 }
2398 }
2399
2400 #[async_trait]
2401 impl ModelSession for SlowSession {
2402 type Turn = SlowTurn;
2403
2404 async fn begin_turn(
2405 &mut self,
2406 request: TurnRequest,
2407 cancellation: Option<TurnCancellation>,
2408 ) -> Result<Self::Turn, LoopError> {
2409 let should_block = request
2410 .transcript
2411 .iter()
2412 .rev()
2413 .find(|item| item.kind == ItemKind::User)
2414 .is_some_and(|item| {
2415 item.parts.iter().any(|part| match part {
2416 Part::Text(text) => text.text == "do the long task",
2417 _ => false,
2418 })
2419 });
2420
2421 if should_block && let Some(cancellation) = cancellation {
2422 cancellation.cancelled().await;
2423 return Err(LoopError::Cancelled);
2424 }
2425
2426 Ok(SlowTurn { emitted: false })
2427 }
2428 }
2429
2430 #[async_trait]
2431 impl ModelSession for RecordingSession {
2432 type Turn = RecordingTurn;
2433
2434 async fn begin_turn(
2435 &mut self,
2436 request: TurnRequest,
2437 _cancellation: Option<TurnCancellation>,
2438 ) -> Result<Self::Turn, LoopError> {
2439 let descriptions = request
2440 .available_tools
2441 .iter()
2442 .map(|tool| tool.description.clone())
2443 .collect::<Vec<_>>();
2444 self.seen_descriptions.lock().unwrap().push(descriptions);
2445 self.seen_caches.lock().unwrap().push(request.cache.clone());
2446
2447 Ok(RecordingTurn { emitted: false })
2448 }
2449 }
2450
2451 #[async_trait]
2452 impl ModelSession for MultiToolSession {
2453 type Turn = MultiToolTurn;
2454
2455 async fn begin_turn(
2456 &mut self,
2457 request: TurnRequest,
2458 _cancellation: Option<TurnCancellation>,
2459 ) -> Result<Self::Turn, LoopError> {
2460 let has_tool_result = request.transcript.iter().any(|item| {
2461 item.kind == ItemKind::Tool
2462 && item
2463 .parts
2464 .iter()
2465 .any(|part| matches!(part, Part::ToolResult(_)))
2466 });
2467
2468 let events = if has_tool_result {
2469 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2470 finish_reason: FinishReason::Completed,
2471 output_items: vec![Item {
2472 id: None,
2473 kind: ItemKind::Assistant,
2474 parts: vec![Part::Text(TextPart {
2475 text: "mixed tools finished".into(),
2476 metadata: MetadataMap::new(),
2477 })],
2478 metadata: MetadataMap::new(),
2479 }],
2480 usage: None,
2481 metadata: MetadataMap::new(),
2482 })])
2483 } else {
2484 let foreground = agentkit_core::ToolCallPart {
2485 id: ToolCallId::new("call-foreground"),
2486 name: "foreground-wait".into(),
2487 input: json!({}),
2488 metadata: MetadataMap::new(),
2489 };
2490 let background = agentkit_core::ToolCallPart {
2491 id: ToolCallId::new("call-background"),
2492 name: "background-wait".into(),
2493 input: json!({}),
2494 metadata: MetadataMap::new(),
2495 };
2496 VecDeque::from([
2497 ModelTurnEvent::ToolCall(foreground.clone()),
2498 ModelTurnEvent::ToolCall(background.clone()),
2499 ModelTurnEvent::Finished(ModelTurnResult {
2500 finish_reason: FinishReason::ToolCall,
2501 output_items: vec![Item {
2502 id: None,
2503 kind: ItemKind::Assistant,
2504 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2505 metadata: MetadataMap::new(),
2506 }],
2507 usage: None,
2508 metadata: MetadataMap::new(),
2509 }),
2510 ])
2511 };
2512
2513 Ok(MultiToolTurn { events })
2514 }
2515 }
2516
2517 #[async_trait]
2518 impl ModelSession for DualApprovalSession {
2519 type Turn = DualApprovalTurn;
2520
2521 async fn begin_turn(
2522 &mut self,
2523 request: TurnRequest,
2524 _cancellation: Option<TurnCancellation>,
2525 ) -> Result<Self::Turn, LoopError> {
2526 let tool_results = request
2527 .transcript
2528 .iter()
2529 .flat_map(|item| item.parts.iter())
2530 .filter(|part| matches!(part, Part::ToolResult(_)))
2531 .count();
2532
2533 let events = if tool_results >= 2 {
2534 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2535 finish_reason: FinishReason::Completed,
2536 output_items: vec![Item {
2537 id: None,
2538 kind: ItemKind::Assistant,
2539 parts: vec![Part::Text(TextPart {
2540 text: "both approvals finished".into(),
2541 metadata: MetadataMap::new(),
2542 })],
2543 metadata: MetadataMap::new(),
2544 }],
2545 usage: None,
2546 metadata: MetadataMap::new(),
2547 })])
2548 } else {
2549 let first = agentkit_core::ToolCallPart {
2550 id: ToolCallId::new("call-1"),
2551 name: "echo".into(),
2552 input: json!({ "value": "first" }),
2553 metadata: MetadataMap::new(),
2554 };
2555 let second = agentkit_core::ToolCallPart {
2556 id: ToolCallId::new("call-2"),
2557 name: "echo".into(),
2558 input: json!({ "value": "second" }),
2559 metadata: MetadataMap::new(),
2560 };
2561 VecDeque::from([
2562 ModelTurnEvent::ToolCall(first.clone()),
2563 ModelTurnEvent::ToolCall(second.clone()),
2564 ModelTurnEvent::Finished(ModelTurnResult {
2565 finish_reason: FinishReason::ToolCall,
2566 output_items: vec![Item {
2567 id: None,
2568 kind: ItemKind::Assistant,
2569 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2570 metadata: MetadataMap::new(),
2571 }],
2572 usage: None,
2573 metadata: MetadataMap::new(),
2574 }),
2575 ])
2576 };
2577
2578 Ok(DualApprovalTurn { events })
2579 }
2580 }
2581
2582 #[async_trait]
2583 impl ModelTurn for FakeTurn {
2584 async fn next_event(
2585 &mut self,
2586 _cancellation: Option<TurnCancellation>,
2587 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2588 Ok(self.events.pop_front())
2589 }
2590 }
2591
2592 #[async_trait]
2593 impl ModelTurn for SlowTurn {
2594 async fn next_event(
2595 &mut self,
2596 cancellation: Option<TurnCancellation>,
2597 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2598 if let Some(cancellation) = cancellation
2599 && cancellation.is_cancelled()
2600 {
2601 return Err(LoopError::Cancelled);
2602 }
2603
2604 if self.emitted {
2605 Ok(None)
2606 } else {
2607 self.emitted = true;
2608 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2609 finish_reason: FinishReason::Completed,
2610 output_items: vec![Item {
2611 id: None,
2612 kind: ItemKind::Assistant,
2613 parts: vec![Part::Text(TextPart {
2614 text: "done".into(),
2615 metadata: MetadataMap::new(),
2616 })],
2617 metadata: MetadataMap::new(),
2618 }],
2619 usage: None,
2620 metadata: MetadataMap::new(),
2621 })))
2622 }
2623 }
2624 }
2625
2626 #[async_trait]
2627 impl ModelTurn for RecordingTurn {
2628 async fn next_event(
2629 &mut self,
2630 _cancellation: Option<TurnCancellation>,
2631 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2632 if self.emitted {
2633 Ok(None)
2634 } else {
2635 self.emitted = true;
2636 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2637 finish_reason: FinishReason::Completed,
2638 output_items: vec![Item {
2639 id: None,
2640 kind: ItemKind::Assistant,
2641 parts: vec![Part::Text(TextPart {
2642 text: "done".into(),
2643 metadata: MetadataMap::new(),
2644 })],
2645 metadata: MetadataMap::new(),
2646 }],
2647 usage: None,
2648 metadata: MetadataMap::new(),
2649 })))
2650 }
2651 }
2652 }
2653
2654 #[async_trait]
2655 impl ModelTurn for MultiToolTurn {
2656 async fn next_event(
2657 &mut self,
2658 _cancellation: Option<TurnCancellation>,
2659 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2660 Ok(self.events.pop_front())
2661 }
2662 }
2663
2664 #[async_trait]
2665 impl ModelTurn for DualApprovalTurn {
2666 async fn next_event(
2667 &mut self,
2668 _cancellation: Option<TurnCancellation>,
2669 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2670 Ok(self.events.pop_front())
2671 }
2672 }
2673
2674 #[derive(Clone)]
2675 struct EchoTool {
2676 spec: ToolSpec,
2677 }
2678
2679 impl Default for EchoTool {
2680 fn default() -> Self {
2681 Self {
2682 spec: ToolSpec {
2683 name: ToolName::new("echo"),
2684 description: "Echo back a value".into(),
2685 input_schema: json!({
2686 "type": "object",
2687 "properties": {
2688 "value": { "type": "string" }
2689 },
2690 "required": ["value"],
2691 "additionalProperties": false
2692 }),
2693 annotations: ToolAnnotations::default(),
2694 metadata: MetadataMap::new(),
2695 },
2696 }
2697 }
2698 }
2699
2700 #[derive(Clone)]
2701 struct DynamicSpecTool {
2702 spec: ToolSpec,
2703 version: StdArc<AtomicUsize>,
2704 }
2705
2706 impl DynamicSpecTool {
2707 fn new(version: StdArc<AtomicUsize>) -> Self {
2708 Self {
2709 spec: ToolSpec {
2710 name: ToolName::new("dynamic"),
2711 description: "dynamic version 0".into(),
2712 input_schema: json!({
2713 "type": "object",
2714 "properties": {},
2715 "additionalProperties": false
2716 }),
2717 annotations: ToolAnnotations::default(),
2718 metadata: MetadataMap::new(),
2719 },
2720 version,
2721 }
2722 }
2723 }
2724
2725 #[async_trait]
2726 impl Tool for EchoTool {
2727 fn spec(&self) -> &ToolSpec {
2728 &self.spec
2729 }
2730
2731 fn proposed_requests(
2732 &self,
2733 request: &agentkit_tools_core::ToolRequest,
2734 ) -> Result<
2735 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2736 agentkit_tools_core::ToolError,
2737 > {
2738 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2739 path: "/tmp/echo".into(),
2740 metadata: request.metadata.clone(),
2741 })])
2742 }
2743
2744 async fn invoke(
2745 &self,
2746 request: agentkit_tools_core::ToolRequest,
2747 _ctx: &mut ToolContext<'_>,
2748 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2749 let value = request
2750 .input
2751 .get("value")
2752 .and_then(Value::as_str)
2753 .ok_or_else(|| {
2754 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2755 })?;
2756
2757 Ok(ToolResult {
2758 result: ToolResultPart {
2759 call_id: request.call_id,
2760 output: ToolOutput::Text(value.into()),
2761 is_error: false,
2762 metadata: MetadataMap::new(),
2763 },
2764 duration: None,
2765 metadata: MetadataMap::new(),
2766 })
2767 }
2768 }
2769
2770 #[async_trait]
2771 impl Tool for DynamicSpecTool {
2772 fn spec(&self) -> &ToolSpec {
2773 &self.spec
2774 }
2775
2776 fn current_spec(&self) -> Option<ToolSpec> {
2777 let mut spec = self.spec.clone();
2778 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2779 Some(spec)
2780 }
2781
2782 async fn invoke(
2783 &self,
2784 request: agentkit_tools_core::ToolRequest,
2785 _ctx: &mut ToolContext<'_>,
2786 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2787 Ok(ToolResult {
2788 result: ToolResultPart {
2789 call_id: request.call_id,
2790 output: ToolOutput::Text("ok".into()),
2791 is_error: false,
2792 metadata: MetadataMap::new(),
2793 },
2794 duration: None,
2795 metadata: MetadataMap::new(),
2796 })
2797 }
2798 }
2799
2800 struct DenyFsReads;
2801
2802 impl PermissionChecker for DenyFsReads {
2803 fn evaluate(
2804 &self,
2805 request: &dyn agentkit_tools_core::PermissionRequest,
2806 ) -> PermissionDecision {
2807 if request.kind() == "filesystem.read" {
2808 return PermissionDecision::Deny(PermissionDenial {
2809 code: PermissionCode::PathNotAllowed,
2810 message: "reads denied in test".into(),
2811 metadata: MetadataMap::new(),
2812 });
2813 }
2814
2815 PermissionDecision::Allow
2816 }
2817 }
2818
2819 struct ApproveFsReads;
2820
2821 impl PermissionChecker for ApproveFsReads {
2822 fn evaluate(
2823 &self,
2824 request: &dyn agentkit_tools_core::PermissionRequest,
2825 ) -> PermissionDecision {
2826 if request.kind() == "filesystem.read" {
2827 return PermissionDecision::RequireApproval(ApprovalRequest {
2828 task_id: None,
2829 call_id: None,
2830 id: "approval:fs-read".into(),
2831 request_kind: request.kind().into(),
2832 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2833 summary: request.summary(),
2834 metadata: request.metadata().clone(),
2835 });
2836 }
2837
2838 PermissionDecision::Allow
2839 }
2840 }
2841
2842 struct CountTrigger;
2843
2844 impl CompactionTrigger for CountTrigger {
2845 fn should_compact(
2846 &self,
2847 _session_id: &SessionId,
2848 _turn_id: Option<&agentkit_core::TurnId>,
2849 transcript: &[Item],
2850 ) -> Option<agentkit_compaction::CompactionReason> {
2851 (transcript.len() >= 2)
2852 .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2853 }
2854 }
2855
2856 struct RecordingObserver {
2857 events: StdArc<StdMutex<Vec<AgentEvent>>>,
2858 }
2859
2860 impl LoopObserver for RecordingObserver {
2861 fn handle_event(&mut self, event: AgentEvent) {
2862 self.events.lock().unwrap().push(event);
2863 }
2864 }
2865
2866 #[derive(Clone)]
2867 struct AuthTool {
2868 spec: ToolSpec,
2869 }
2870
2871 impl Default for AuthTool {
2872 fn default() -> Self {
2873 Self {
2874 spec: ToolSpec {
2875 name: ToolName::new("auth-tool"),
2876 description: "Always requires auth".into(),
2877 input_schema: json!({
2878 "type": "object",
2879 "properties": {},
2880 "additionalProperties": false
2881 }),
2882 annotations: ToolAnnotations::default(),
2883 metadata: MetadataMap::new(),
2884 },
2885 }
2886 }
2887 }
2888
2889 #[async_trait]
2890 impl Tool for AuthTool {
2891 fn spec(&self) -> &ToolSpec {
2892 &self.spec
2893 }
2894
2895 async fn invoke(
2896 &self,
2897 request: agentkit_tools_core::ToolRequest,
2898 _ctx: &mut ToolContext<'_>,
2899 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2900 let mut challenge = MetadataMap::new();
2901 challenge.insert("server_id".into(), json!("mock"));
2902 challenge.insert("scope".into(), json!("secret.read"));
2903
2904 Err(agentkit_tools_core::ToolError::AuthRequired(Box::new(
2905 AuthRequest {
2906 task_id: None,
2907 id: "auth-1".into(),
2908 provider: "mcp.mock".into(),
2909 operation: AuthOperation::ToolCall {
2910 tool_name: request.tool_name.0,
2911 input: request.input,
2912 call_id: Some(request.call_id),
2913 session_id: Some(request.session_id),
2914 turn_id: Some(request.turn_id),
2915 metadata: request.metadata,
2916 },
2917 challenge,
2918 },
2919 )))
2920 }
2921 }
2922
2923 #[derive(Clone)]
2924 struct BlockingTool {
2925 spec: ToolSpec,
2926 entered: StdArc<AtomicBool>,
2927 release: StdArc<Notify>,
2928 output: &'static str,
2929 }
2930
2931 impl BlockingTool {
2932 fn new(
2933 name: &str,
2934 entered: StdArc<AtomicBool>,
2935 release: StdArc<Notify>,
2936 output: &'static str,
2937 ) -> Self {
2938 Self {
2939 spec: ToolSpec {
2940 name: ToolName::new(name),
2941 description: format!("blocking tool {name}"),
2942 input_schema: json!({
2943 "type": "object",
2944 "properties": {},
2945 "additionalProperties": false
2946 }),
2947 annotations: ToolAnnotations::default(),
2948 metadata: MetadataMap::new(),
2949 },
2950 entered,
2951 release,
2952 output,
2953 }
2954 }
2955 }
2956
2957 #[async_trait]
2958 impl Tool for BlockingTool {
2959 fn spec(&self) -> &ToolSpec {
2960 &self.spec
2961 }
2962
2963 async fn invoke(
2964 &self,
2965 request: agentkit_tools_core::ToolRequest,
2966 _ctx: &mut ToolContext<'_>,
2967 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2968 self.entered.store(true, Ordering::SeqCst);
2969 self.release.notified().await;
2970 Ok(ToolResult {
2971 result: ToolResultPart {
2972 call_id: request.call_id,
2973 output: ToolOutput::Text(self.output.into()),
2974 is_error: false,
2975 metadata: MetadataMap::new(),
2976 },
2977 duration: None,
2978 metadata: MetadataMap::new(),
2979 })
2980 }
2981 }
2982
2983 struct NameRoutingPolicy {
2984 routes: Vec<(String, RoutingDecision)>,
2985 }
2986
2987 impl NameRoutingPolicy {
2988 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
2989 Self {
2990 routes: routes
2991 .into_iter()
2992 .map(|(name, decision)| (name.into(), decision))
2993 .collect(),
2994 }
2995 }
2996 }
2997
2998 impl TaskRoutingPolicy for NameRoutingPolicy {
2999 fn route(&self, request: &ToolRequest) -> RoutingDecision {
3000 self.routes
3001 .iter()
3002 .find(|(name, _)| name == &request.tool_name.0)
3003 .map(|(_, decision)| *decision)
3004 .unwrap_or(RoutingDecision::Foreground)
3005 }
3006 }
3007
3008 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3009 timeout(Duration::from_secs(1), handle.next_event())
3010 .await
3011 .expect("timed out waiting for task event")
3012 .expect("task event stream ended unexpectedly")
3013 }
3014
3015 async fn wait_until_entered(flag: &AtomicBool) {
3016 timeout(Duration::from_secs(1), async {
3017 while !flag.load(Ordering::SeqCst) {
3018 tokio::task::yield_now().await;
3019 }
3020 })
3021 .await
3022 .expect("task never entered execution");
3023 }
3024
3025 #[tokio::test]
3026 async fn loop_continues_after_completed_tool_call() {
3027 let tools = ToolRegistry::new().with(EchoTool::default());
3028 let agent = Agent::builder()
3029 .model(FakeAdapter)
3030 .tools(tools)
3031 .permissions(AllowAllPermissions)
3032 .build()
3033 .unwrap();
3034
3035 let mut driver = agent
3036 .start(SessionConfig {
3037 session_id: SessionId::new("session-1"),
3038 metadata: MetadataMap::new(),
3039 cache: None,
3040 })
3041 .await
3042 .unwrap();
3043
3044 driver
3045 .submit_input(vec![Item {
3046 id: None,
3047 kind: ItemKind::User,
3048 parts: vec![Part::Text(TextPart {
3049 text: "ping".into(),
3050 metadata: MetadataMap::new(),
3051 })],
3052 metadata: MetadataMap::new(),
3053 }])
3054 .unwrap();
3055
3056 let result = run_until_finished(&mut driver).await;
3057
3058 match result {
3059 LoopStep::Finished(turn) => {
3060 assert_eq!(turn.finish_reason, FinishReason::Completed);
3061 assert_eq!(turn.items.len(), 1);
3062 match &turn.items[0].parts[0] {
3063 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3064 other => panic!("unexpected part: {other:?}"),
3065 }
3066 }
3067 other => panic!("unexpected loop step: {other:?}"),
3068 }
3069 }
3070
3071 async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3075 loop {
3076 match driver.next().await.unwrap() {
3077 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3078 step => return step,
3079 }
3080 }
3081 }
3082
3083 #[tokio::test]
3084 async fn loop_uses_injected_permission_checker() {
3085 let tools = ToolRegistry::new().with(EchoTool::default());
3086 let agent = Agent::builder()
3087 .model(FakeAdapter)
3088 .tools(tools)
3089 .permissions(DenyFsReads)
3090 .build()
3091 .unwrap();
3092
3093 let mut driver = agent
3094 .start(SessionConfig {
3095 session_id: SessionId::new("session-2"),
3096 metadata: MetadataMap::new(),
3097 cache: None,
3098 })
3099 .await
3100 .unwrap();
3101
3102 driver
3103 .submit_input(vec![Item {
3104 id: None,
3105 kind: ItemKind::User,
3106 parts: vec![Part::Text(TextPart {
3107 text: "ping".into(),
3108 metadata: MetadataMap::new(),
3109 })],
3110 metadata: MetadataMap::new(),
3111 }])
3112 .unwrap();
3113
3114 let result = run_until_finished(&mut driver).await;
3115
3116 match result {
3117 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3118 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3119 other => panic!("unexpected part: {other:?}"),
3120 },
3121 other => panic!("unexpected loop step: {other:?}"),
3122 }
3123 }
3124
3125 #[tokio::test]
3126 async fn loop_surfaces_auth_interruptions_from_tools() {
3127 let tools = ToolRegistry::new().with(AuthTool::default());
3128 let agent = Agent::builder()
3129 .model(FakeAdapter)
3130 .tools(tools)
3131 .permissions(AllowAllPermissions)
3132 .build()
3133 .unwrap();
3134
3135 let mut driver = agent
3136 .start(SessionConfig {
3137 session_id: SessionId::new("session-3"),
3138 metadata: MetadataMap::new(),
3139 cache: None,
3140 })
3141 .await
3142 .unwrap();
3143
3144 driver
3145 .submit_input(vec![Item {
3146 id: None,
3147 kind: ItemKind::User,
3148 parts: vec![Part::Text(TextPart {
3149 text: "ping".into(),
3150 metadata: MetadataMap::new(),
3151 })],
3152 metadata: MetadataMap::new(),
3153 }])
3154 .unwrap();
3155
3156 let result = driver.next().await.unwrap();
3157
3158 match result {
3159 LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
3160 let request = &pending.request;
3161 assert!(request.task_id.is_some());
3162 assert_eq!(request.provider, "mcp.mock");
3163 assert_eq!(request.challenge.get("scope"), Some(&json!("secret.read")));
3164 match &request.operation {
3165 AuthOperation::ToolCall { tool_name, .. } => {
3166 assert_eq!(tool_name, "auth-tool");
3167 }
3168 other => panic!("unexpected auth operation: {other:?}"),
3169 }
3170 }
3171 other => panic!("unexpected loop step: {other:?}"),
3172 }
3173 }
3174
3175 #[tokio::test]
3176 async fn async_task_manager_background_round_requires_explicit_continue() {
3177 let entered = StdArc::new(AtomicBool::new(false));
3178 let release = StdArc::new(Notify::new());
3179 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3180 "background-wait",
3181 RoutingDecision::Background,
3182 )]));
3183 let handle = task_manager.handle();
3184 let tools = ToolRegistry::new().with(BlockingTool::new(
3185 "background-wait",
3186 entered.clone(),
3187 release.clone(),
3188 "background-done",
3189 ));
3190 let agent = Agent::builder()
3191 .model(FakeAdapter)
3192 .tools(tools)
3193 .permissions(AllowAllPermissions)
3194 .task_manager(task_manager)
3195 .build()
3196 .unwrap();
3197
3198 let mut driver = agent
3199 .start(SessionConfig {
3200 session_id: SessionId::new("session-background"),
3201 metadata: MetadataMap::new(),
3202 cache: None,
3203 })
3204 .await
3205 .unwrap();
3206
3207 driver
3208 .submit_input(vec![Item {
3209 id: None,
3210 kind: ItemKind::User,
3211 parts: vec![Part::Text(TextPart {
3212 text: "ping".into(),
3213 metadata: MetadataMap::new(),
3214 })],
3215 metadata: MetadataMap::new(),
3216 }])
3217 .unwrap();
3218
3219 let first = driver.next().await.unwrap();
3220 match first {
3221 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3222 other => panic!("unexpected first loop step: {other:?}"),
3223 }
3224
3225 match wait_for_task_event(&handle).await {
3226 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3227 other => panic!("unexpected task event: {other:?}"),
3228 }
3229 wait_until_entered(entered.as_ref()).await;
3230 release.notify_waiters();
3231
3232 match wait_for_task_event(&handle).await {
3233 TaskEvent::Completed(_, result) => {
3234 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3235 }
3236 other => panic!("unexpected completion event: {other:?}"),
3237 }
3238
3239 let resumed = driver.next().await.unwrap();
3240 match resumed {
3241 LoopStep::Finished(turn) => {
3242 assert_eq!(turn.finish_reason, FinishReason::Completed);
3243 match &turn.items[0].parts[0] {
3244 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3245 other => panic!("unexpected part after resume: {other:?}"),
3246 }
3247 }
3248 other => panic!("unexpected resumed step: {other:?}"),
3249 }
3250 }
3251
3252 #[tokio::test]
3253 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3254 let controller = CancellationController::new();
3255 let agent = Agent::builder()
3256 .model(SlowAdapter)
3257 .cancellation(controller.handle())
3258 .build()
3259 .unwrap();
3260
3261 let mut driver = agent
3262 .start(SessionConfig {
3263 session_id: SessionId::new("session-cancel"),
3264 metadata: MetadataMap::new(),
3265 cache: None,
3266 })
3267 .await
3268 .unwrap();
3269
3270 driver
3271 .submit_input(vec![Item {
3272 id: None,
3273 kind: ItemKind::User,
3274 parts: vec![Part::Text(TextPart {
3275 text: "do the long task".into(),
3276 metadata: MetadataMap::new(),
3277 })],
3278 metadata: MetadataMap::new(),
3279 }])
3280 .unwrap();
3281
3282 let cancelled = tokio::join!(async { driver.next().await }, async {
3283 tokio::task::yield_now().await;
3284 controller.interrupt();
3285 })
3286 .0
3287 .unwrap();
3288
3289 match cancelled {
3290 LoopStep::Finished(turn) => {
3291 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3292 assert_eq!(turn.items.len(), 1);
3293 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3294 assert_eq!(
3295 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3296 Some(&Value::Bool(true))
3297 );
3298 }
3299 other => panic!("unexpected loop step: {other:?}"),
3300 }
3301
3302 driver
3303 .submit_input(vec![Item {
3304 id: None,
3305 kind: ItemKind::User,
3306 parts: vec![Part::Text(TextPart {
3307 text: "try again".into(),
3308 metadata: MetadataMap::new(),
3309 })],
3310 metadata: MetadataMap::new(),
3311 }])
3312 .unwrap();
3313
3314 let result = driver.next().await.unwrap();
3315 match result {
3316 LoopStep::Finished(turn) => {
3317 assert_eq!(turn.finish_reason, FinishReason::Completed);
3318 }
3319 other => panic!("unexpected loop step after retry: {other:?}"),
3320 }
3321 }
3322
3323 #[tokio::test]
3324 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3325 let controller = CancellationController::new();
3326 let fg_entered = StdArc::new(AtomicBool::new(false));
3327 let fg_release = StdArc::new(Notify::new());
3328 let bg_entered = StdArc::new(AtomicBool::new(false));
3329 let bg_release = StdArc::new(Notify::new());
3330 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3331 ("foreground-wait", RoutingDecision::Foreground),
3332 ("background-wait", RoutingDecision::Background),
3333 ]));
3334 let handle = task_manager.handle();
3335 let tools = ToolRegistry::new()
3336 .with(BlockingTool::new(
3337 "foreground-wait",
3338 fg_entered.clone(),
3339 fg_release,
3340 "foreground-done",
3341 ))
3342 .with(BlockingTool::new(
3343 "background-wait",
3344 bg_entered.clone(),
3345 bg_release.clone(),
3346 "background-done",
3347 ));
3348 let agent = Agent::builder()
3349 .model(MultiToolAdapter)
3350 .tools(tools)
3351 .permissions(AllowAllPermissions)
3352 .cancellation(controller.handle())
3353 .task_manager(task_manager)
3354 .build()
3355 .unwrap();
3356
3357 let mut driver = agent
3358 .start(SessionConfig {
3359 session_id: SessionId::new("session-mixed-cancel"),
3360 metadata: MetadataMap::new(),
3361 cache: None,
3362 })
3363 .await
3364 .unwrap();
3365
3366 driver
3367 .submit_input(vec![Item {
3368 id: None,
3369 kind: ItemKind::User,
3370 parts: vec![Part::Text(TextPart {
3371 text: "run both".into(),
3372 metadata: MetadataMap::new(),
3373 })],
3374 metadata: MetadataMap::new(),
3375 }])
3376 .unwrap();
3377
3378 let cancelled = tokio::join!(async { driver.next().await }, async {
3379 let _ = wait_for_task_event(&handle).await;
3380 let _ = wait_for_task_event(&handle).await;
3381 wait_until_entered(fg_entered.as_ref()).await;
3382 wait_until_entered(bg_entered.as_ref()).await;
3383 controller.interrupt();
3384 })
3385 .0
3386 .unwrap();
3387
3388 match cancelled {
3389 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3390 other => panic!("unexpected loop step after interrupt: {other:?}"),
3391 }
3392
3393 match wait_for_task_event(&handle).await {
3394 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3395 other => panic!("unexpected post-interrupt event: {other:?}"),
3396 }
3397
3398 let running = handle.list_running().await;
3399 assert_eq!(running.len(), 1);
3400 assert_eq!(running[0].tool_name, "background-wait");
3401
3402 bg_release.notify_waiters();
3403 match wait_for_task_event(&handle).await {
3404 TaskEvent::Completed(snapshot, result) => {
3405 assert_eq!(snapshot.tool_name, "background-wait");
3406 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3407 }
3408 other => panic!("unexpected background completion event: {other:?}"),
3409 }
3410 }
3411
3412 #[tokio::test]
3413 async fn loop_resumes_after_approved_tool_request() {
3414 let tools = ToolRegistry::new().with(EchoTool::default());
3415 let agent = Agent::builder()
3416 .model(FakeAdapter)
3417 .tools(tools)
3418 .permissions(ApproveFsReads)
3419 .build()
3420 .unwrap();
3421
3422 let mut driver = agent
3423 .start(SessionConfig {
3424 session_id: SessionId::new("session-approval"),
3425 metadata: MetadataMap::new(),
3426 cache: None,
3427 })
3428 .await
3429 .unwrap();
3430
3431 driver
3432 .submit_input(vec![Item {
3433 id: None,
3434 kind: ItemKind::User,
3435 parts: vec![Part::Text(TextPart {
3436 text: "ping".into(),
3437 metadata: MetadataMap::new(),
3438 })],
3439 metadata: MetadataMap::new(),
3440 }])
3441 .unwrap();
3442
3443 let first = driver.next().await.unwrap();
3444 match first {
3445 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3446 assert!(pending.request.task_id.is_some());
3447 assert_eq!(pending.request.id.0, "approval:fs-read");
3448 pending.approve(&mut driver).unwrap();
3449 }
3450 other => panic!("unexpected loop step: {other:?}"),
3451 }
3452 let second = driver.next().await.unwrap();
3453 match second {
3454 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3455 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3456 other => panic!("unexpected part: {other:?}"),
3457 },
3458 other => panic!("unexpected loop step after approval: {other:?}"),
3459 }
3460 }
3461
3462 #[tokio::test]
3463 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3464 let tools = ToolRegistry::new().with(EchoTool::default());
3465 let agent = Agent::builder()
3466 .model(DualApprovalAdapter)
3467 .tools(tools)
3468 .permissions(ApproveFsReads)
3469 .build()
3470 .unwrap();
3471
3472 let mut driver = agent
3473 .start(SessionConfig {
3474 session_id: SessionId::new("session-dual-approval"),
3475 metadata: MetadataMap::new(),
3476 cache: None,
3477 })
3478 .await
3479 .unwrap();
3480
3481 driver
3482 .submit_input(vec![Item {
3483 id: None,
3484 kind: ItemKind::User,
3485 parts: vec![Part::Text(TextPart {
3486 text: "run both approvals".into(),
3487 metadata: MetadataMap::new(),
3488 })],
3489 metadata: MetadataMap::new(),
3490 }])
3491 .unwrap();
3492
3493 let pending_first = match driver.next().await.unwrap() {
3494 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3495 assert_eq!(
3496 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3497 Some("call-1")
3498 );
3499 pending
3500 }
3501 other => panic!("unexpected first loop step: {other:?}"),
3502 };
3503
3504 let pending_second = match driver.next().await.unwrap() {
3505 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3506 assert_eq!(
3507 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3508 Some("call-2")
3509 );
3510 pending
3511 }
3512 other => panic!("unexpected second loop step: {other:?}"),
3513 };
3514
3515 pending_second.approve(&mut driver).unwrap();
3516 match driver.next().await.unwrap() {
3517 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3518 assert_eq!(
3519 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3520 Some("call-1")
3521 );
3522 }
3523 other => panic!("unexpected step after approving second request: {other:?}"),
3524 }
3525
3526 pending_first.approve(&mut driver).unwrap();
3527 match driver.next().await.unwrap() {
3528 LoopStep::Finished(turn) => {
3529 assert_eq!(turn.finish_reason, FinishReason::Completed);
3530 match &turn.items[0].parts[0] {
3531 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3532 other => panic!("unexpected final part: {other:?}"),
3533 }
3534 }
3535 other => panic!("unexpected final loop step: {other:?}"),
3536 }
3537 }
3538
3539 #[tokio::test]
3540 async fn loop_compacts_transcript_before_new_turns() {
3541 let events = StdArc::new(StdMutex::new(Vec::new()));
3542 let agent = Agent::builder()
3543 .model(FakeAdapter)
3544 .compaction(CompactionConfig::new(
3545 CountTrigger,
3546 CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3547 ))
3548 .observer(RecordingObserver {
3549 events: events.clone(),
3550 })
3551 .build()
3552 .unwrap();
3553
3554 let mut driver = agent
3555 .start(SessionConfig {
3556 session_id: SessionId::new("session-4"),
3557 metadata: MetadataMap::new(),
3558 cache: None,
3559 })
3560 .await
3561 .unwrap();
3562
3563 for text in ["first", "second"] {
3564 driver
3565 .submit_input(vec![Item {
3566 id: None,
3567 kind: ItemKind::User,
3568 parts: vec![Part::Text(TextPart {
3569 text: text.into(),
3570 metadata: MetadataMap::new(),
3571 })],
3572 metadata: MetadataMap::new(),
3573 }])
3574 .unwrap();
3575 let _ = driver.next().await.unwrap();
3576 }
3577
3578 let events = events.lock().unwrap();
3579 assert!(events.iter().any(|event| matches!(
3580 event,
3581 AgentEvent::CompactionFinished {
3582 replaced_items,
3583 ..
3584 } if *replaced_items > 0
3585 )));
3586 }
3587
3588 #[tokio::test]
3589 async fn loop_refreshes_tool_specs_each_turn() {
3590 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3591 let version = StdArc::new(AtomicUsize::new(1));
3592 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3593 let agent = Agent::builder()
3594 .model(RecordingAdapter {
3595 seen_descriptions: seen_descriptions.clone(),
3596 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3597 })
3598 .tools(tools)
3599 .permissions(AllowAllPermissions)
3600 .build()
3601 .unwrap();
3602
3603 let mut driver = agent
3604 .start(SessionConfig {
3605 session_id: SessionId::new("session-dynamic-tools"),
3606 metadata: MetadataMap::new(),
3607 cache: None,
3608 })
3609 .await
3610 .unwrap();
3611
3612 for text in ["first", "second"] {
3613 driver
3614 .submit_input(vec![Item {
3615 id: None,
3616 kind: ItemKind::User,
3617 parts: vec![Part::Text(TextPart {
3618 text: text.into(),
3619 metadata: MetadataMap::new(),
3620 })],
3621 metadata: MetadataMap::new(),
3622 }])
3623 .unwrap();
3624
3625 let _ = driver.next().await.unwrap();
3626 if text == "first" {
3627 version.store(2, Ordering::SeqCst);
3628 }
3629 }
3630
3631 let seen_descriptions = seen_descriptions.lock().unwrap();
3632 assert_eq!(seen_descriptions.len(), 2);
3633 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3634 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3635 }
3636
3637 #[tokio::test]
3638 async fn loop_passes_session_default_and_next_turn_cache_requests() {
3639 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3640 let agent = Agent::builder()
3641 .model(RecordingAdapter {
3642 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3643 seen_caches: seen_caches.clone(),
3644 })
3645 .permissions(AllowAllPermissions)
3646 .build()
3647 .unwrap();
3648
3649 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3650 .with_retention(PromptCacheRetention::Short);
3651 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3652 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3653 });
3654
3655 let mut driver = agent
3656 .start(SessionConfig {
3657 session_id: SessionId::new("session-cache"),
3658 metadata: MetadataMap::new(),
3659 cache: Some(default_cache.clone()),
3660 })
3661 .await
3662 .unwrap();
3663
3664 driver
3665 .submit_input(vec![Item {
3666 id: None,
3667 kind: ItemKind::User,
3668 parts: vec![Part::Text(TextPart {
3669 text: "first".into(),
3670 metadata: MetadataMap::new(),
3671 })],
3672 metadata: MetadataMap::new(),
3673 }])
3674 .unwrap();
3675 let _ = driver.next().await.unwrap();
3676
3677 driver
3678 .submit_input_with_cache(
3679 vec![Item {
3680 id: None,
3681 kind: ItemKind::User,
3682 parts: vec![Part::Text(TextPart {
3683 text: "second".into(),
3684 metadata: MetadataMap::new(),
3685 })],
3686 metadata: MetadataMap::new(),
3687 }],
3688 override_cache.clone(),
3689 )
3690 .unwrap();
3691 let _ = driver.next().await.unwrap();
3692
3693 let seen = seen_caches.lock().unwrap();
3694 assert_eq!(seen.len(), 2);
3695 assert_eq!(seen[0], Some(default_cache));
3696 assert_eq!(seen[1], Some(override_cache));
3697 }
3698
3699 #[tokio::test]
3700 async fn loop_yields_after_tool_result_between_rounds() {
3701 let tools = ToolRegistry::new().with(EchoTool::default());
3702 let agent = Agent::builder()
3703 .model(FakeAdapter)
3704 .tools(tools)
3705 .permissions(AllowAllPermissions)
3706 .build()
3707 .unwrap();
3708
3709 let mut driver = agent
3710 .start(SessionConfig {
3711 session_id: SessionId::new("yield-session"),
3712 metadata: MetadataMap::new(),
3713 cache: None,
3714 })
3715 .await
3716 .unwrap();
3717
3718 driver
3719 .submit_input(vec![Item::text(ItemKind::User, "ping")])
3720 .unwrap();
3721
3722 let step = driver.next().await.unwrap();
3725 let info = match step {
3726 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
3727 other => panic!("expected AfterToolResult, got {other:?}"),
3728 };
3729 assert_eq!(info.session_id, SessionId::new("yield-session"));
3730 assert_eq!(info.transcript_len, 3);
3732
3733 let interrupt = LoopInterrupt::AfterToolResult(info.clone());
3735 assert!(!interrupt.is_blocking());
3736
3737 driver
3739 .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
3740 .unwrap();
3741
3742 let step = driver.next().await.unwrap();
3745 match step {
3746 LoopStep::Finished(turn) => {
3747 assert_eq!(turn.finish_reason, FinishReason::Completed);
3748 }
3749 other => panic!("expected Finished, got {other:?}"),
3750 }
3751
3752 let snapshot = driver.snapshot();
3754 let has_injected_message = snapshot.transcript.iter().any(|item| {
3755 item.kind == ItemKind::User
3756 && item.parts.iter().any(|part| match part {
3757 Part::Text(text) => text.text == "also: report back",
3758 _ => false,
3759 })
3760 });
3761 assert!(
3762 has_injected_message,
3763 "injected user message should be in transcript, got: {:?}",
3764 snapshot.transcript
3765 );
3766 }
3767
3768 #[test]
3769 fn convenience_cache_builders_construct_expected_defaults() {
3770 let cache = PromptCacheRequest::automatic()
3771 .with_retention(PromptCacheRetention::Short)
3772 .with_key("workspace:demo");
3773 let session = SessionConfig::new("demo").with_cache(cache.clone());
3774
3775 assert_eq!(session.session_id, SessionId::new("demo"));
3776 assert_eq!(session.cache, Some(cache));
3777
3778 let explicit = PromptCacheRequest::explicit([
3779 PromptCacheBreakpoint::tools_end(),
3780 PromptCacheBreakpoint::transcript_item_end(2),
3781 PromptCacheBreakpoint::transcript_part_end(3, 1),
3782 ]);
3783
3784 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
3785 assert_eq!(
3786 explicit.strategy,
3787 PromptCacheStrategy::Explicit {
3788 breakpoints: vec![
3789 PromptCacheBreakpoint::ToolsEnd,
3790 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
3791 PromptCacheBreakpoint::TranscriptPartEnd {
3792 item_index: 3,
3793 part_index: 1,
3794 },
3795 ],
3796 }
3797 );
3798 }
3799}