1use std::collections::{BTreeMap, VecDeque};
50use std::sync::Arc;
51
52use agentkit_compaction::{
53 CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
54};
55use agentkit_core::{
56 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
57 TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
58};
59use agentkit_task_manager::{
60 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskAuth, TaskLaunchRequest, TaskManager,
61 TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
62};
63#[cfg(test)]
64use agentkit_tools_core::ToolContext;
65use agentkit_tools_core::{
66 ApprovalDecision, ApprovalRequest, AuthOperation, AuthRequest, AuthResolution,
67 BasicToolExecutor, OwnedToolContext, PermissionChecker, ToolError, ToolExecutor, ToolRegistry,
68 ToolRequest, ToolResources, ToolSpec,
69};
70use async_trait::async_trait;
71use serde::{Deserialize, Serialize};
72use thiserror::Error;
73
74const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
75const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
76const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
77const USER_CANCELLED_REASON: &str = "user_cancelled";
78
79#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct SessionConfig {
95 pub session_id: SessionId,
97 pub metadata: MetadataMap,
99 pub cache: Option<PromptCacheRequest>,
101}
102
103impl SessionConfig {
104 pub fn new(session_id: impl Into<SessionId>) -> Self {
106 Self {
107 session_id: session_id.into(),
108 metadata: MetadataMap::new(),
109 cache: None,
110 }
111 }
112
113 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
115 self.metadata = metadata;
116 self
117 }
118
119 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
121 self.cache = Some(cache);
122 self
123 }
124
125 pub fn without_cache(mut self) -> Self {
127 self.cache = None;
128 self
129 }
130}
131
132#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub enum PromptCacheMode {
139 Disabled,
141 #[default]
143 BestEffort,
144 Required,
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub enum PromptCacheRetention {
155 Default,
157 Short,
159 Extended,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
165pub enum PromptCacheStrategy {
166 #[default]
168 Automatic,
169 Explicit {
171 breakpoints: Vec<PromptCacheBreakpoint>,
173 },
174}
175
176impl PromptCacheStrategy {
177 pub fn automatic() -> Self {
180 Self::Automatic
181 }
182
183 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
185 Self::Explicit {
186 breakpoints: breakpoints.into_iter().collect(),
187 }
188 }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum PromptCacheBreakpoint {
194 ToolsEnd,
196 TranscriptItemEnd { index: usize },
198 TranscriptPartEnd {
204 item_index: usize,
205 part_index: usize,
206 },
207}
208
209impl PromptCacheBreakpoint {
210 pub fn tools_end() -> Self {
212 Self::ToolsEnd
213 }
214
215 pub fn transcript_item_end(index: usize) -> Self {
217 Self::TranscriptItemEnd { index }
218 }
219
220 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
222 Self::TranscriptPartEnd {
223 item_index,
224 part_index,
225 }
226 }
227}
228
229#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
231pub struct PromptCacheRequest {
232 pub mode: PromptCacheMode,
234 pub strategy: PromptCacheStrategy,
236 pub retention: Option<PromptCacheRetention>,
238 pub key: Option<String>,
240}
241
242impl PromptCacheRequest {
243 pub fn automatic() -> Self {
245 Self::best_effort(PromptCacheStrategy::automatic())
246 }
247
248 pub fn automatic_required() -> Self {
250 Self::required(PromptCacheStrategy::automatic())
251 }
252
253 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
255 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
256 }
257
258 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
260 Self::required(PromptCacheStrategy::explicit(breakpoints))
261 }
262
263 pub fn disabled() -> Self {
265 Self {
266 mode: PromptCacheMode::Disabled,
267 strategy: PromptCacheStrategy::Automatic,
268 retention: None,
269 key: None,
270 }
271 }
272
273 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
275 Self {
276 mode: PromptCacheMode::BestEffort,
277 strategy,
278 retention: None,
279 key: None,
280 }
281 }
282
283 pub fn required(strategy: PromptCacheStrategy) -> Self {
285 Self {
286 mode: PromptCacheMode::Required,
287 strategy,
288 retention: None,
289 key: None,
290 }
291 }
292
293 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
295 self.mode = mode;
296 self
297 }
298
299 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
301 self.strategy = strategy;
302 self
303 }
304
305 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
307 self.retention = Some(retention);
308 self
309 }
310
311 pub fn with_key(mut self, key: impl Into<String>) -> Self {
313 self.key = Some(key.into());
314 self
315 }
316
317 pub fn without_retention(mut self) -> Self {
319 self.retention = None;
320 self
321 }
322
323 pub fn without_key(mut self) -> Self {
325 self.key = None;
326 self
327 }
328
329 pub fn is_enabled(&self) -> bool {
331 !matches!(self.mode, PromptCacheMode::Disabled)
332 }
333}
334
335#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
341pub struct TurnRequest {
342 pub session_id: SessionId,
344 pub turn_id: agentkit_core::TurnId,
346 pub transcript: Vec<Item>,
348 pub available_tools: Vec<ToolSpec>,
350 pub cache: Option<PromptCacheRequest>,
352 pub metadata: MetadataMap,
354}
355
356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
361pub struct ModelTurnResult {
362 pub finish_reason: FinishReason,
364 pub output_items: Vec<Item>,
366 pub usage: Option<Usage>,
368 pub metadata: MetadataMap,
370}
371
372#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378pub enum ModelTurnEvent {
379 Delta(Delta),
381 ToolCall(ToolCallPart),
383 Usage(Usage),
385 Finished(ModelTurnResult),
387}
388
389#[async_trait]
426pub trait ModelAdapter: Send + Sync {
427 type Session: ModelSession;
429
430 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
436}
437
438#[async_trait]
445pub trait ModelSession: Send {
446 type Turn: ModelTurn;
448
449 async fn begin_turn(
462 &mut self,
463 request: TurnRequest,
464 cancellation: Option<TurnCancellation>,
465 ) -> Result<Self::Turn, LoopError>;
466}
467
468#[async_trait]
474pub trait ModelTurn: Send {
475 async fn next_event(
484 &mut self,
485 cancellation: Option<TurnCancellation>,
486 ) -> Result<Option<ModelTurnEvent>, LoopError>;
487}
488
489pub trait LoopObserver: Send {
509 fn handle_event(&mut self, event: AgentEvent);
511}
512
513#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
518pub enum AgentEvent {
519 RunStarted { session_id: SessionId },
521 TurnStarted {
523 session_id: SessionId,
524 turn_id: agentkit_core::TurnId,
525 },
526 InputAccepted {
528 session_id: SessionId,
529 items: Vec<Item>,
530 },
531 ContentDelta(Delta),
533 ToolCallRequested(ToolCallPart),
535 ApprovalRequired(ApprovalRequest),
537 AuthRequired(AuthRequest),
539 ApprovalResolved { approved: bool },
541 AuthResolved { provided: bool },
543 CompactionStarted {
545 session_id: SessionId,
546 turn_id: Option<agentkit_core::TurnId>,
547 reason: CompactionReason,
548 },
549 CompactionFinished {
551 session_id: SessionId,
552 turn_id: Option<agentkit_core::TurnId>,
553 replaced_items: usize,
554 transcript_len: usize,
555 metadata: MetadataMap,
556 },
557 UsageUpdated(Usage),
559 Warning { message: String },
561 RunFailed { message: String },
563 TurnFinished(TurnResult),
565}
566
567#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
589pub struct PendingApproval {
590 pub request: ApprovalRequest,
592}
593
594impl std::ops::Deref for PendingApproval {
595 type Target = ApprovalRequest;
596 fn deref(&self) -> &ApprovalRequest {
597 &self.request
598 }
599}
600
601impl PendingApproval {
602 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
604 let call_id = self
605 .request
606 .call_id
607 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
608 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
609 }
610
611 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
613 let call_id = self
614 .request
615 .call_id
616 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
617 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
618 }
619
620 pub fn deny_with_reason<S: ModelSession>(
622 self,
623 driver: &mut LoopDriver<S>,
624 reason: impl Into<String>,
625 ) -> Result<(), LoopError> {
626 let call_id = self
627 .request
628 .call_id
629 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
630 driver.resolve_approval_for(
631 call_id,
632 ApprovalDecision::Deny {
633 reason: Some(reason.into()),
634 },
635 )
636 }
637}
638
639#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660pub struct PendingAuth {
661 pub request: AuthRequest,
663}
664
665impl std::ops::Deref for PendingAuth {
666 type Target = AuthRequest;
667 fn deref(&self) -> &AuthRequest {
668 &self.request
669 }
670}
671
672impl PendingAuth {
673 pub fn provide<S: ModelSession>(
675 self,
676 driver: &mut LoopDriver<S>,
677 credentials: MetadataMap,
678 ) -> Result<(), LoopError> {
679 driver.resolve_auth(AuthResolution::Provided {
680 request: self.request,
681 credentials,
682 })
683 }
684
685 pub fn cancel<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
687 driver.resolve_auth(AuthResolution::Cancelled {
688 request: self.request,
689 })
690 }
691}
692
693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct InputRequest {
715 pub session_id: SessionId,
717 pub reason: String,
719}
720
721impl InputRequest {
722 pub fn submit<S: ModelSession>(
724 self,
725 driver: &mut LoopDriver<S>,
726 items: Vec<Item>,
727 ) -> Result<(), LoopError> {
728 driver.submit_input(items)
729 }
730}
731
732#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
737pub struct TurnResult {
738 pub turn_id: agentkit_core::TurnId,
740 pub finish_reason: FinishReason,
742 pub items: Vec<Item>,
744 pub usage: Option<Usage>,
746 pub metadata: MetadataMap,
748}
749
750#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
784pub enum LoopInterrupt {
785 ApprovalRequest(PendingApproval),
787 AuthRequest(PendingAuth),
789 AwaitingInput(InputRequest),
791}
792
793#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
821pub enum LoopStep {
822 Interrupt(LoopInterrupt),
824 Finished(TurnResult),
826}
827
828#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
834pub struct LoopSnapshot {
835 pub session_id: SessionId,
837 pub transcript: Vec<Item>,
839 pub pending_input: Vec<Item>,
841}
842
843#[derive(Clone, Debug)]
844struct PendingApprovalToolCall {
845 request: ApprovalRequest,
846 decision: Option<ApprovalDecision>,
847 surfaced: bool,
848 turn_id: agentkit_core::TurnId,
849 task_id: TaskId,
850 call: ToolCallPart,
851 tool_request: ToolRequest,
852}
853
854#[derive(Clone, Debug)]
855struct PendingAuthToolCall {
856 request: AuthRequest,
857 resolution: Option<AuthResolution>,
858 turn_id: agentkit_core::TurnId,
859 task_id: TaskId,
860 call: ToolCallPart,
861 tool_request: ToolRequest,
862}
863
864#[derive(Clone, Debug, Default)]
865struct ActiveToolRound {
866 turn_id: agentkit_core::TurnId,
867 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
868 background_pending: bool,
869 foreground_progressed: bool,
870}
871
872pub struct Agent<M>
905where
906 M: ModelAdapter,
907{
908 model: M,
909 tools: ToolRegistry,
910 task_manager: Arc<dyn TaskManager>,
911 permissions: Arc<dyn PermissionChecker>,
912 resources: Arc<dyn ToolResources>,
913 cancellation: Option<CancellationHandle>,
914 compaction: Option<CompactionConfig>,
915 observers: Vec<Box<dyn LoopObserver>>,
916}
917
918impl<M> Agent<M>
919where
920 M: ModelAdapter,
921{
922 pub fn builder() -> AgentBuilder<M> {
924 AgentBuilder::default()
925 }
926
927 pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
936 let session_id = config.session_id.clone();
937 let default_cache = config.cache.clone();
938 let session = self.model.start_session(config).await?;
939 let tool_executor = Arc::new(BasicToolExecutor::new(self.tools.clone()));
940 let mut driver = LoopDriver {
941 session_id: session_id.clone(),
942 default_cache,
943 next_turn_cache: None,
944 session: Some(session),
945 tool_executor,
946 task_manager: self.task_manager,
947 permissions: self.permissions,
948 resources: self.resources,
949 cancellation: self.cancellation,
950 compaction: self.compaction,
951 observers: self.observers,
952 transcript: Vec::new(),
953 pending_input: Vec::new(),
954 pending_approvals: BTreeMap::new(),
955 pending_approval_order: VecDeque::new(),
956 pending_auth: None,
957 active_tool_round: None,
958 next_turn_index: 1,
959 };
960 driver.emit(AgentEvent::RunStarted { session_id });
961 Ok(driver)
962 }
963}
964
965pub struct AgentBuilder<M>
971where
972 M: ModelAdapter,
973{
974 model: Option<M>,
975 tools: ToolRegistry,
976 task_manager: Option<Arc<dyn TaskManager>>,
977 permissions: Arc<dyn PermissionChecker>,
978 resources: Arc<dyn ToolResources>,
979 cancellation: Option<CancellationHandle>,
980 compaction: Option<CompactionConfig>,
981 observers: Vec<Box<dyn LoopObserver>>,
982}
983
984impl<M> Default for AgentBuilder<M>
985where
986 M: ModelAdapter,
987{
988 fn default() -> Self {
989 Self {
990 model: None,
991 tools: ToolRegistry::new(),
992 task_manager: None,
993 permissions: Arc::new(AllowAllPermissions),
994 resources: Arc::new(()),
995 cancellation: None,
996 compaction: None,
997 observers: Vec::new(),
998 }
999 }
1000}
1001
1002impl<M> AgentBuilder<M>
1003where
1004 M: ModelAdapter,
1005{
1006 pub fn model(mut self, model: M) -> Self {
1008 self.model = Some(model);
1009 self
1010 }
1011
1012 pub fn tools(mut self, tools: ToolRegistry) -> Self {
1014 self.tools = tools;
1015 self
1016 }
1017
1018 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1023 self.task_manager = Some(Arc::new(manager));
1024 self
1025 }
1026
1027 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1031 self.permissions = Arc::new(permissions);
1032 self
1033 }
1034
1035 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1037 self.resources = Arc::new(resources);
1038 self
1039 }
1040
1041 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1043 self.cancellation = Some(handle);
1044 self
1045 }
1046
1047 pub fn compaction(mut self, config: CompactionConfig) -> Self {
1052 self.compaction = Some(config);
1053 self
1054 }
1055
1056 pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1060 self.observers.push(Box::new(observer));
1061 self
1062 }
1063
1064 pub fn build(self) -> Result<Agent<M>, LoopError> {
1070 let model = self
1071 .model
1072 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1073 Ok(Agent {
1074 model,
1075 tools: self.tools,
1076 task_manager: self
1077 .task_manager
1078 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1079 permissions: self.permissions,
1080 resources: self.resources,
1081 cancellation: self.cancellation,
1082 compaction: self.compaction,
1083 observers: self.observers,
1084 })
1085 }
1086}
1087
1088pub struct LoopDriver<S>
1119where
1120 S: ModelSession,
1121{
1122 session_id: SessionId,
1123 default_cache: Option<PromptCacheRequest>,
1124 next_turn_cache: Option<PromptCacheRequest>,
1125 session: Option<S>,
1126 tool_executor: Arc<dyn ToolExecutor>,
1127 task_manager: Arc<dyn TaskManager>,
1128 permissions: Arc<dyn PermissionChecker>,
1129 resources: Arc<dyn ToolResources>,
1130 cancellation: Option<CancellationHandle>,
1131 compaction: Option<CompactionConfig>,
1132 observers: Vec<Box<dyn LoopObserver>>,
1133 transcript: Vec<Item>,
1134 pending_input: Vec<Item>,
1135 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1136 pending_approval_order: VecDeque<ToolCallId>,
1137 pending_auth: Option<PendingAuthToolCall>,
1138 active_tool_round: Option<ActiveToolRound>,
1139 next_turn_index: u64,
1140}
1141
1142impl<S> LoopDriver<S>
1143where
1144 S: ModelSession,
1145{
1146 fn start_task_via_manager(
1147 &self,
1148 task_id: Option<TaskId>,
1149 tool_request: ToolRequest,
1150 approved_request: Option<ApprovalRequest>,
1151 cancellation: Option<TurnCancellation>,
1152 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1153 {
1154 let task_manager = self.task_manager.clone();
1155 let tool_executor = self.tool_executor.clone();
1156 let permissions = self.permissions.clone();
1157 let resources = self.resources.clone();
1158 let session_id = self.session_id.clone();
1159 let turn_id = tool_request.turn_id.clone();
1160 let metadata = tool_request.metadata.clone();
1161
1162 async move {
1163 task_manager
1164 .start_task(
1165 TaskLaunchRequest {
1166 task_id,
1167 request: tool_request.clone(),
1168 approved_request,
1169 },
1170 TaskStartContext {
1171 executor: tool_executor,
1172 tool_context: OwnedToolContext {
1173 session_id,
1174 turn_id,
1175 metadata,
1176 permissions,
1177 resources,
1178 cancellation,
1179 },
1180 },
1181 )
1182 .await
1183 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1184 }
1185 }
1186
1187 fn has_pending_interrupts(&self) -> bool {
1188 self.pending_auth.is_some() || !self.pending_approvals.is_empty()
1189 }
1190
1191 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1192 let call_id = task.tool_request.call_id.clone();
1193 let call = ToolCallPart {
1194 id: call_id.clone(),
1195 name: task.tool_request.tool_name.to_string(),
1196 input: task.tool_request.input.clone(),
1197 metadata: task.tool_request.metadata.clone(),
1198 };
1199 let mut request = task.approval;
1200 request.call_id = Some(call_id.clone());
1201 let pending = PendingApprovalToolCall {
1202 request: request.clone(),
1203 decision: None,
1204 surfaced: false,
1205 turn_id: turn_id.clone(),
1206 task_id: task.task_id,
1207 call,
1208 tool_request: task.tool_request,
1209 };
1210 self.pending_approvals.insert(call_id.clone(), pending);
1211 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1212 self.pending_approval_order.push_back(call_id);
1213 }
1214 self.emit(AgentEvent::ApprovalRequired(request));
1215 }
1216
1217 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1218 for call_id in self.pending_approval_order.clone() {
1219 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1220 continue;
1221 };
1222 if pending.decision.is_none() && !pending.surfaced {
1223 pending.surfaced = true;
1224 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1225 PendingApproval {
1226 request: pending.request.clone(),
1227 },
1228 )));
1229 }
1230 }
1231 None
1232 }
1233
1234 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1235 self.pending_approval_order.iter().find_map(|call_id| {
1236 self.pending_approvals.get(call_id).and_then(|pending| {
1237 pending.decision.is_none().then(|| {
1238 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1239 request: pending.request.clone(),
1240 }))
1241 })
1242 })
1243 })
1244 }
1245
1246 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1247 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1248 self.pending_approvals
1249 .get(call_id)
1250 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1251 })?;
1252 self.pending_approval_order.retain(|id| id != &call_id);
1253 self.pending_approvals.remove(&call_id)
1254 }
1255
1256 fn pending_auth_interrupt(&self) -> Option<LoopStep> {
1257 self.pending_auth.as_ref().and_then(|pending| {
1258 pending.resolution.is_none().then(|| {
1259 LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth {
1260 request: pending.request.clone(),
1261 }))
1262 })
1263 })
1264 }
1265
1266 fn queue_auth_interrupt(
1267 &mut self,
1268 turn_id: &agentkit_core::TurnId,
1269 task: TaskAuth,
1270 ) -> LoopStep {
1271 let call = ToolCallPart {
1272 id: task.tool_request.call_id.clone(),
1273 name: task.tool_request.tool_name.to_string(),
1274 input: task.tool_request.input.clone(),
1275 metadata: task.tool_request.metadata.clone(),
1276 };
1277 let request = upgrade_auth_request(task.auth, &task.tool_request, &call);
1278 self.pending_auth = Some(PendingAuthToolCall {
1279 request: request.clone(),
1280 resolution: None,
1281 turn_id: turn_id.clone(),
1282 task_id: task.task_id,
1283 call,
1284 tool_request: task.tool_request,
1285 });
1286 self.emit(AgentEvent::AuthRequired(request.clone()));
1287 LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth { request }))
1288 }
1289
1290 fn queue_resolution_interrupt(
1291 &mut self,
1292 turn_id: &agentkit_core::TurnId,
1293 resolution: TaskResolution,
1294 ) -> Option<LoopStep> {
1295 match resolution {
1296 TaskResolution::Item(item) => {
1297 self.transcript.push(item);
1298 None
1299 }
1300 TaskResolution::Approval(task) => {
1301 self.enqueue_pending_approval(turn_id, task);
1302 self.take_next_unsurfaced_approval_interrupt()
1303 }
1304 TaskResolution::Auth(task) => Some(self.queue_auth_interrupt(turn_id, task)),
1305 }
1306 }
1307
1308 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1309 let PendingLoopUpdates { mut resolutions } = self
1310 .task_manager
1311 .take_pending_loop_updates()
1312 .await
1313 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1314 let mut saw_items = false;
1315 while let Some(resolution) = resolutions.pop_front() {
1316 match resolution {
1317 TaskResolution::Item(item) => {
1318 self.transcript.push(item);
1319 saw_items = true;
1320 }
1321 TaskResolution::Approval(task) => {
1322 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1323 }
1324 TaskResolution::Auth(task) => {
1325 return Ok((
1326 saw_items,
1327 Some(self.queue_auth_interrupt(&task.tool_request.turn_id.clone(), task)),
1328 ));
1329 }
1330 }
1331 }
1332 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1333 }
1334
1335 async fn maybe_compact(
1336 &mut self,
1337 turn_id: Option<&agentkit_core::TurnId>,
1338 cancellation: Option<TurnCancellation>,
1339 ) -> Result<(), LoopError> {
1340 let Some(compaction) = self.compaction.as_ref().cloned() else {
1341 return Ok(());
1342 };
1343 if cancellation
1344 .as_ref()
1345 .is_some_and(TurnCancellation::is_cancelled)
1346 {
1347 return Err(LoopError::Cancelled);
1348 }
1349 let Some(reason) =
1350 compaction
1351 .trigger
1352 .should_compact(&self.session_id, turn_id, &self.transcript)
1353 else {
1354 return Ok(());
1355 };
1356
1357 self.emit(AgentEvent::CompactionStarted {
1358 session_id: self.session_id.clone(),
1359 turn_id: turn_id.cloned(),
1360 reason: reason.clone(),
1361 });
1362
1363 let CompactionResult {
1364 transcript,
1365 replaced_items,
1366 metadata,
1367 } = compaction
1368 .strategy
1369 .apply(
1370 agentkit_compaction::CompactionRequest {
1371 session_id: self.session_id.clone(),
1372 turn_id: turn_id.cloned(),
1373 transcript: self.transcript.clone(),
1374 reason,
1375 metadata: compaction.metadata.clone(),
1376 },
1377 &mut CompactionContext {
1378 backend: compaction.backend.as_deref(),
1379 metadata: &compaction.metadata,
1380 cancellation,
1381 },
1382 )
1383 .await
1384 .map_err(|error| match error {
1385 agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1386 other => LoopError::Compaction(other.to_string()),
1387 })?;
1388
1389 self.transcript = transcript;
1390 self.emit(AgentEvent::CompactionFinished {
1391 session_id: self.session_id.clone(),
1392 turn_id: turn_id.cloned(),
1393 replaced_items,
1394 transcript_len: self.transcript.len(),
1395 metadata,
1396 });
1397 Ok(())
1398 }
1399
1400 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1401 let Some(_) = self.active_tool_round.as_ref() else {
1402 return Ok(None);
1403 };
1404 loop {
1405 let cancellation = self
1406 .cancellation
1407 .as_ref()
1408 .map(CancellationHandle::checkpoint);
1409 let turn_id = self
1410 .active_tool_round
1411 .as_ref()
1412 .map(|active| active.turn_id.clone())
1413 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1414
1415 if cancellation
1416 .as_ref()
1417 .is_some_and(TurnCancellation::is_cancelled)
1418 {
1419 self.task_manager
1420 .on_turn_interrupted(&turn_id)
1421 .await
1422 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1423 self.active_tool_round = None;
1424 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1425 }
1426
1427 let next_call = self
1428 .active_tool_round
1429 .as_mut()
1430 .and_then(|active| active.pending_calls.pop_front());
1431 if let Some((_call, tool_request)) = next_call {
1432 match self
1433 .start_task_via_manager(None, tool_request.clone(), None, cancellation.clone())
1434 .await?
1435 {
1436 TaskStartOutcome::Ready(resolution) => {
1437 let resolution = *resolution;
1438 match resolution {
1439 TaskResolution::Item(item) => {
1440 if let Some(active) = self.active_tool_round.as_mut() {
1441 active.foreground_progressed = true;
1442 }
1443 self.transcript.push(item);
1444 }
1445 TaskResolution::Approval(task) => {
1446 self.enqueue_pending_approval(&turn_id, task);
1447 }
1448 TaskResolution::Auth(task) => {
1449 return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1450 }
1451 }
1452 continue;
1453 }
1454 TaskStartOutcome::Pending { kind, .. } => {
1455 if kind == agentkit_task_manager::TaskKind::Background
1456 && let Some(active) = self.active_tool_round.as_mut()
1457 {
1458 active.background_pending = true;
1459 }
1460 continue;
1461 }
1462 }
1463 }
1464
1465 match self
1466 .task_manager
1467 .wait_for_turn(&turn_id, cancellation.clone())
1468 .await
1469 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1470 {
1471 Some(TurnTaskUpdate::Resolution(resolution)) => {
1472 let resolution = *resolution;
1473 match resolution {
1474 TaskResolution::Item(item) => {
1475 if let Some(active) = self.active_tool_round.as_mut() {
1476 active.foreground_progressed = true;
1477 }
1478 self.transcript.push(item);
1479 }
1480 TaskResolution::Approval(task) => {
1481 self.enqueue_pending_approval(&turn_id, task);
1482 }
1483 TaskResolution::Auth(task) => {
1484 return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1485 }
1486 }
1487 }
1488 Some(TurnTaskUpdate::Detached(snapshot)) => {
1489 self.transcript.push(Item {
1493 id: None,
1494 kind: ItemKind::Tool,
1495 parts: vec![Part::ToolResult(ToolResultPart {
1496 call_id: snapshot.call_id,
1497 output: ToolOutput::Text(format!(
1498 "Tool {} is now running in the background. \
1499 The result will be delivered when it completes.",
1500 snapshot.tool_name,
1501 )),
1502 is_error: false,
1503 metadata: MetadataMap::new(),
1504 })],
1505 metadata: MetadataMap::new(),
1506 });
1507 if let Some(active) = self.active_tool_round.as_mut() {
1508 active.background_pending = true;
1509 active.foreground_progressed = true;
1510 }
1511 }
1512 None => {
1513 if cancellation
1514 .as_ref()
1515 .is_some_and(TurnCancellation::is_cancelled)
1516 {
1517 self.task_manager
1518 .on_turn_interrupted(&turn_id)
1519 .await
1520 .map_err(|error| {
1521 LoopError::Tool(ToolError::Internal(error.to_string()))
1522 })?;
1523 self.active_tool_round = None;
1524 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1525 }
1526 let active = self.active_tool_round.take().ok_or_else(|| {
1527 LoopError::InvalidState("missing active tool round".into())
1528 })?;
1529 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1530 return Ok(Some(step));
1531 }
1532 if let Some(step) = self.pending_auth_interrupt() {
1533 return Ok(Some(step));
1534 }
1535 if let Some(step) = self.next_unresolved_approval_interrupt() {
1536 return Ok(Some(step));
1537 }
1538 if active.background_pending && !active.foreground_progressed {
1539 return Ok(None);
1540 }
1541 return Ok(Some(Box::pin(self.drive_turn(turn_id, false)).await?));
1542 }
1543 }
1544 }
1545 }
1546
1547 async fn drive_turn(
1548 &mut self,
1549 turn_id: agentkit_core::TurnId,
1550 emit_started: bool,
1551 ) -> Result<LoopStep, LoopError> {
1552 let cancellation = self
1553 .cancellation
1554 .as_ref()
1555 .map(CancellationHandle::checkpoint);
1556 match self
1557 .maybe_compact(Some(&turn_id), cancellation.clone())
1558 .await
1559 {
1560 Ok(()) => {}
1561 Err(LoopError::Cancelled) => {
1562 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1563 }
1564 Err(error) => return Err(error),
1565 }
1566 if emit_started {
1567 self.emit(AgentEvent::TurnStarted {
1568 session_id: self.session_id.clone(),
1569 turn_id: turn_id.clone(),
1570 });
1571 }
1572 if cancellation
1573 .as_ref()
1574 .is_some_and(TurnCancellation::is_cancelled)
1575 {
1576 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1577 }
1578
1579 let request = TurnRequest {
1580 session_id: self.session_id.clone(),
1581 turn_id: turn_id.clone(),
1582 transcript: self.transcript.clone(),
1583 available_tools: self.tool_executor.specs(),
1584 cache: self
1585 .next_turn_cache
1586 .take()
1587 .or_else(|| self.default_cache.clone()),
1588 metadata: MetadataMap::new(),
1589 };
1590
1591 let session = self
1592 .session
1593 .as_mut()
1594 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1595 let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1596 Ok(turn) => turn,
1597 Err(LoopError::Cancelled) => {
1598 self.task_manager
1599 .on_turn_interrupted(&turn_id)
1600 .await
1601 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1602 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1603 }
1604 Err(error) => return Err(error),
1605 };
1606 let mut saw_tool_call = false;
1607 let mut finished_result = None;
1608
1609 while let Some(event) = match turn.next_event(cancellation.clone()).await {
1610 Ok(event) => event,
1611 Err(LoopError::Cancelled) => {
1612 self.task_manager
1613 .on_turn_interrupted(&turn_id)
1614 .await
1615 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1616 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1617 }
1618 Err(error) => return Err(error),
1619 } {
1620 if cancellation
1621 .as_ref()
1622 .is_some_and(TurnCancellation::is_cancelled)
1623 {
1624 self.task_manager
1625 .on_turn_interrupted(&turn_id)
1626 .await
1627 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1628 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1629 }
1630 match event {
1631 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1632 ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1633 ModelTurnEvent::ToolCall(call) => {
1634 saw_tool_call = true;
1635 self.emit(AgentEvent::ToolCallRequested(call.clone()));
1636 }
1637 ModelTurnEvent::Finished(result) => {
1638 finished_result = Some(result);
1639 break;
1640 }
1641 }
1642 }
1643
1644 let result = finished_result.ok_or_else(|| {
1645 LoopError::Provider("model turn ended without a Finished event".into())
1646 })?;
1647 self.transcript.extend(result.output_items.clone());
1648
1649 if saw_tool_call {
1650 let pending_calls = extract_tool_calls(&result.output_items)
1651 .into_iter()
1652 .map(|call| {
1653 let tool_request = ToolRequest {
1654 call_id: call.id.clone(),
1655 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1656 input: call.input.clone(),
1657 session_id: self.session_id.clone(),
1658 turn_id: turn_id.clone(),
1659 metadata: call.metadata.clone(),
1660 };
1661 (call, tool_request)
1662 })
1663 .collect();
1664 self.active_tool_round = Some(ActiveToolRound {
1665 turn_id: turn_id.clone(),
1666 pending_calls,
1667 background_pending: false,
1668 foreground_progressed: false,
1669 });
1670 if let Some(step) = self.continue_active_tool_round().await? {
1671 return Ok(step);
1672 }
1673 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1674 InputRequest {
1675 session_id: self.session_id.clone(),
1676 reason: "driver is waiting for input".into(),
1677 },
1678 )));
1679 }
1680
1681 let turn_result = TurnResult {
1682 turn_id,
1683 finish_reason: result.finish_reason,
1684 items: result.output_items,
1685 usage: result.usage,
1686 metadata: result.metadata,
1687 };
1688 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1689 Ok(LoopStep::Finished(turn_result))
1690 }
1691
1692 async fn resume_after_auth(
1693 &mut self,
1694 pending: PendingAuthToolCall,
1695 ) -> Result<LoopStep, LoopError> {
1696 let resolution = pending
1697 .resolution
1698 .clone()
1699 .ok_or_else(|| LoopError::InvalidState("pending auth has no resolution".into()))?;
1700 match resolution {
1701 AuthResolution::Provided { .. } => match self
1702 .start_task_via_manager(
1703 Some(pending.task_id.clone()),
1704 pending.tool_request.clone(),
1705 None,
1706 self.cancellation
1707 .as_ref()
1708 .map(CancellationHandle::checkpoint),
1709 )
1710 .await?
1711 {
1712 TaskStartOutcome::Ready(resolution) => {
1713 let resolution = *resolution;
1714 if let Some(step) =
1715 self.queue_resolution_interrupt(&pending.turn_id, resolution)
1716 {
1717 return Ok(step);
1718 }
1719 }
1720 TaskStartOutcome::Pending { .. } => {}
1721 },
1722 AuthResolution::Cancelled { .. } => {
1723 self.transcript.push(Item {
1724 id: None,
1725 kind: ItemKind::Tool,
1726 parts: vec![Part::ToolResult(ToolResultPart {
1727 call_id: pending.call.id.clone(),
1728 output: ToolOutput::Text("auth cancelled".into()),
1729 is_error: true,
1730 metadata: pending.call.metadata.clone(),
1731 })],
1732 metadata: MetadataMap::new(),
1733 });
1734 }
1735 }
1736
1737 if let Some(step) = self.continue_active_tool_round().await? {
1738 Ok(step)
1739 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1740 Ok(step)
1741 } else if let Some(step) = self.pending_auth_interrupt() {
1742 Ok(step)
1743 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1744 Ok(step)
1745 } else {
1746 self.drive_turn(pending.turn_id, false).await
1747 }
1748 }
1749
1750 async fn resume_after_approval(
1751 &mut self,
1752 pending: PendingApprovalToolCall,
1753 ) -> Result<LoopStep, LoopError> {
1754 let decision = pending
1755 .decision
1756 .clone()
1757 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1758
1759 match decision {
1760 ApprovalDecision::Approve => match self
1761 .start_task_via_manager(
1762 Some(pending.task_id.clone()),
1763 pending.tool_request.clone(),
1764 Some(pending.request.clone()),
1765 self.cancellation
1766 .as_ref()
1767 .map(CancellationHandle::checkpoint),
1768 )
1769 .await?
1770 {
1771 TaskStartOutcome::Ready(resolution) => {
1772 let resolution = *resolution;
1773 if let Some(step) =
1774 self.queue_resolution_interrupt(&pending.turn_id, resolution)
1775 {
1776 return Ok(step);
1777 }
1778 }
1779 TaskStartOutcome::Pending { .. } => {}
1780 },
1781 ApprovalDecision::Deny { reason } => {
1782 self.transcript.push(Item {
1783 id: None,
1784 kind: ItemKind::Tool,
1785 parts: vec![Part::ToolResult(ToolResultPart {
1786 call_id: pending.call.id.clone(),
1787 output: ToolOutput::Text(
1788 reason.unwrap_or_else(|| "approval denied".into()),
1789 ),
1790 is_error: true,
1791 metadata: pending.call.metadata.clone(),
1792 })],
1793 metadata: MetadataMap::new(),
1794 });
1795 }
1796 }
1797
1798 if let Some(step) = self.continue_active_tool_round().await? {
1799 Ok(step)
1800 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1801 Ok(step)
1802 } else if let Some(step) = self.pending_auth_interrupt() {
1803 Ok(step)
1804 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1805 Ok(step)
1806 } else {
1807 self.drive_turn(pending.turn_id, false).await
1808 }
1809 }
1810
1811 fn finish_cancelled(
1812 &mut self,
1813 turn_id: agentkit_core::TurnId,
1814 items: Vec<Item>,
1815 ) -> Result<LoopStep, LoopError> {
1816 self.transcript.extend(items.clone());
1817 let turn_result = TurnResult {
1818 turn_id,
1819 finish_reason: FinishReason::Cancelled,
1820 items,
1821 usage: None,
1822 metadata: interrupted_metadata("turn"),
1823 };
1824 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1825 Ok(LoopStep::Finished(turn_result))
1826 }
1827
1828 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1837 if self.has_pending_interrupts() {
1838 return Err(LoopError::InvalidState(
1839 "cannot submit input while an interrupt is pending".into(),
1840 ));
1841 }
1842 self.emit(AgentEvent::InputAccepted {
1843 session_id: self.session_id.clone(),
1844 items: input.clone(),
1845 });
1846 self.pending_input.extend(input);
1847 Ok(())
1848 }
1849
1850 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
1855 if self.has_pending_interrupts() {
1856 return Err(LoopError::InvalidState(
1857 "cannot update next-turn cache while an interrupt is pending".into(),
1858 ));
1859 }
1860 self.next_turn_cache = Some(cache);
1861 Ok(())
1862 }
1863
1864 pub fn submit_input_with_cache(
1867 &mut self,
1868 input: Vec<Item>,
1869 cache: PromptCacheRequest,
1870 ) -> Result<(), LoopError> {
1871 self.set_next_turn_cache(cache)?;
1872 self.submit_input(input)
1873 }
1874
1875 pub fn resolve_approval_for(
1885 &mut self,
1886 call_id: ToolCallId,
1887 decision: ApprovalDecision,
1888 ) -> Result<(), LoopError> {
1889 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1890 return Err(LoopError::InvalidState(format!(
1891 "no approval request is pending for call {}",
1892 call_id.0
1893 )));
1894 };
1895 pending.decision = Some(decision.clone());
1896 self.emit(AgentEvent::ApprovalResolved {
1897 approved: matches!(decision, ApprovalDecision::Approve),
1898 });
1899 Ok(())
1900 }
1901
1902 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
1905 let mut unresolved = self
1906 .pending_approval_order
1907 .iter()
1908 .filter(|call_id| {
1909 self.pending_approvals
1910 .get(*call_id)
1911 .is_some_and(|pending| pending.decision.is_none())
1912 })
1913 .cloned();
1914 let Some(call_id) = unresolved.next() else {
1915 return Err(LoopError::InvalidState(
1916 "no approval request is pending".into(),
1917 ));
1918 };
1919 if unresolved.next().is_some() {
1920 return Err(LoopError::InvalidState(
1921 "multiple approvals are pending; use resolve_approval_for".into(),
1922 ));
1923 }
1924 self.resolve_approval_for(call_id, decision)
1925 }
1926
1927 pub fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), LoopError> {
1938 let Some(pending) = self.pending_auth.as_mut() else {
1939 return Err(LoopError::InvalidState("no auth request is pending".into()));
1940 };
1941 if pending.request.id != resolution.request().id {
1942 return Err(LoopError::InvalidState(
1943 "auth resolution does not match the pending request".into(),
1944 ));
1945 }
1946 pending.resolution = Some(resolution.clone());
1947 self.emit(AgentEvent::AuthResolved {
1948 provided: matches!(resolution, AuthResolution::Provided { .. }),
1949 });
1950 Ok(())
1951 }
1952
1953 pub fn snapshot(&self) -> LoopSnapshot {
1955 LoopSnapshot {
1956 session_id: self.session_id.clone(),
1957 transcript: self.transcript.clone(),
1958 pending_input: self.pending_input.clone(),
1959 }
1960 }
1961
1962 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
1977 if self
1978 .pending_auth
1979 .as_ref()
1980 .is_some_and(|pending| pending.resolution.is_some())
1981 {
1982 let pending = self
1983 .pending_auth
1984 .take()
1985 .ok_or_else(|| LoopError::InvalidState("missing pending auth state".into()))?;
1986 return self.resume_after_auth(pending).await;
1987 }
1988
1989 if let Some(pending) = self.take_next_resolved_approval() {
1990 return self.resume_after_approval(pending).await;
1991 }
1992
1993 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1994 return Ok(step);
1995 }
1996
1997 if let Some(step) = self.pending_auth_interrupt() {
1998 return Ok(step);
1999 }
2000
2001 if let Some(step) = self.next_unresolved_approval_interrupt() {
2002 return Ok(step);
2003 }
2004
2005 if let Some(step) = self.continue_active_tool_round().await? {
2006 return Ok(step);
2007 }
2008
2009 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2010 if let Some(step) = loop_step {
2011 return Ok(step);
2012 }
2013
2014 if self.pending_input.is_empty() && !had_loop_updates {
2015 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2016 InputRequest {
2017 session_id: self.session_id.clone(),
2018 reason: "driver is waiting for input".into(),
2019 },
2020 )));
2021 }
2022
2023 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2024 self.next_turn_index += 1;
2025 self.transcript.append(&mut self.pending_input);
2026 self.drive_turn(turn_id, true).await
2027 }
2028
2029 fn emit(&mut self, event: AgentEvent) {
2030 for observer in &mut self.observers {
2031 observer.handle_event(event.clone());
2032 }
2033 }
2034}
2035
2036fn interrupted_metadata(stage: &str) -> MetadataMap {
2037 let mut metadata = MetadataMap::new();
2038 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2039 metadata.insert(
2040 INTERRUPT_REASON_METADATA_KEY.into(),
2041 USER_CANCELLED_REASON.into(),
2042 );
2043 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2044 metadata
2045}
2046
2047fn interrupted_assistant_items() -> Vec<Item> {
2048 vec![Item {
2049 id: None,
2050 kind: ItemKind::Assistant,
2051 parts: vec![Part::Text(TextPart {
2052 text: "Previous assistant response was interrupted by the user before completion."
2053 .into(),
2054 metadata: interrupted_metadata("assistant"),
2055 })],
2056 metadata: interrupted_metadata("assistant"),
2057 }]
2058}
2059
2060fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2061 let mut calls = Vec::new();
2062 for item in items {
2063 for part in &item.parts {
2064 if let Part::ToolCall(call) = part {
2065 calls.push(call.clone());
2066 }
2067 }
2068 }
2069 calls
2070}
2071
2072fn upgrade_auth_request(
2073 mut request: AuthRequest,
2074 tool_request: &ToolRequest,
2075 _call: &ToolCallPart,
2076) -> AuthRequest {
2077 if matches!(request.operation, AuthOperation::ToolCall { .. }) {
2078 return request;
2079 }
2080
2081 let prior_server_id = request.challenge.get("server_id").cloned();
2082 let mut metadata = tool_request.metadata.clone();
2083 if let Some(server_id) = prior_server_id {
2084 metadata.entry("server_id".into()).or_insert(server_id);
2085 }
2086 request.operation = AuthOperation::ToolCall {
2087 tool_name: tool_request.tool_name.0.clone(),
2088 input: tool_request.input.clone(),
2089 call_id: Some(tool_request.call_id.clone()),
2090 session_id: Some(tool_request.session_id.clone()),
2091 turn_id: Some(tool_request.turn_id.clone()),
2092 metadata,
2093 };
2094 request
2095}
2096
2097struct AllowAllPermissions;
2098
2099impl PermissionChecker for AllowAllPermissions {
2100 fn evaluate(
2101 &self,
2102 _request: &dyn agentkit_tools_core::PermissionRequest,
2103 ) -> agentkit_tools_core::PermissionDecision {
2104 agentkit_tools_core::PermissionDecision::Allow
2105 }
2106}
2107
2108#[derive(Debug, Error)]
2110pub enum LoopError {
2111 #[error("invalid driver state: {0}")]
2113 InvalidState(String),
2114 #[error("turn cancelled")]
2116 Cancelled,
2117 #[error("provider error: {0}")]
2119 Provider(String),
2120 #[error("tool error: {0}")]
2122 Tool(#[from] ToolError),
2123 #[error("compaction error: {0}")]
2125 Compaction(String),
2126 #[error("unsupported operation: {0}")]
2128 Unsupported(String),
2129}
2130
2131#[cfg(test)]
2132mod tests {
2133 use std::collections::VecDeque;
2134 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2135 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2136
2137 use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2138 use agentkit_core::{
2139 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2140 };
2141 use agentkit_task_manager::{
2142 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2143 TaskRoutingPolicy,
2144 };
2145 use agentkit_tools_core::{
2146 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2147 ToolAnnotations, ToolName, ToolResult, ToolSpec,
2148 };
2149 use serde_json::{Value, json};
2150 use tokio::sync::Notify;
2151 use tokio::time::{Duration, timeout};
2152
2153 use super::*;
2154
2155 struct FakeAdapter;
2156 struct SlowAdapter;
2157 struct RecordingAdapter {
2158 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2159 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2160 }
2161 struct MultiToolAdapter;
2162 struct DualApprovalAdapter;
2163
2164 struct FakeSession;
2165 struct SlowSession;
2166 struct RecordingSession {
2167 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2168 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2169 }
2170 struct MultiToolSession;
2171 struct DualApprovalSession;
2172
2173 struct FakeTurn {
2174 events: VecDeque<ModelTurnEvent>,
2175 }
2176
2177 struct SlowTurn {
2178 emitted: bool,
2179 }
2180
2181 struct RecordingTurn {
2182 emitted: bool,
2183 }
2184 struct MultiToolTurn {
2185 events: VecDeque<ModelTurnEvent>,
2186 }
2187 struct DualApprovalTurn {
2188 events: VecDeque<ModelTurnEvent>,
2189 }
2190
2191 #[async_trait]
2192 impl ModelAdapter for FakeAdapter {
2193 type Session = FakeSession;
2194
2195 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2196 Ok(FakeSession)
2197 }
2198 }
2199
2200 #[async_trait]
2201 impl ModelAdapter for SlowAdapter {
2202 type Session = SlowSession;
2203
2204 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2205 Ok(SlowSession)
2206 }
2207 }
2208
2209 #[async_trait]
2210 impl ModelAdapter for RecordingAdapter {
2211 type Session = RecordingSession;
2212
2213 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2214 Ok(RecordingSession {
2215 seen_descriptions: self.seen_descriptions.clone(),
2216 seen_caches: self.seen_caches.clone(),
2217 })
2218 }
2219 }
2220
2221 #[async_trait]
2222 impl ModelAdapter for MultiToolAdapter {
2223 type Session = MultiToolSession;
2224
2225 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2226 Ok(MultiToolSession)
2227 }
2228 }
2229
2230 #[async_trait]
2231 impl ModelAdapter for DualApprovalAdapter {
2232 type Session = DualApprovalSession;
2233
2234 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2235 Ok(DualApprovalSession)
2236 }
2237 }
2238
2239 #[async_trait]
2240 impl ModelSession for FakeSession {
2241 type Turn = FakeTurn;
2242
2243 async fn begin_turn(
2244 &mut self,
2245 request: TurnRequest,
2246 _cancellation: Option<TurnCancellation>,
2247 ) -> Result<Self::Turn, LoopError> {
2248 let has_tool_result = request.transcript.iter().any(|item| {
2249 item.kind == ItemKind::Tool
2250 && item
2251 .parts
2252 .iter()
2253 .any(|part| matches!(part, Part::ToolResult(_)))
2254 });
2255 let tool_name = request
2256 .available_tools
2257 .first()
2258 .map(|tool| tool.name.0.clone())
2259 .unwrap_or_else(|| "echo".into());
2260
2261 let events = if has_tool_result {
2262 let result_text = request
2263 .transcript
2264 .iter()
2265 .rev()
2266 .find_map(|item| {
2267 item.parts.iter().find_map(|part| match part {
2268 Part::ToolResult(ToolResultPart {
2269 output: ToolOutput::Text(text),
2270 ..
2271 }) => Some(text.clone()),
2272 _ => None,
2273 })
2274 })
2275 .unwrap_or_else(|| "missing".into());
2276
2277 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2278 finish_reason: FinishReason::Completed,
2279 output_items: vec![Item {
2280 id: None,
2281 kind: ItemKind::Assistant,
2282 parts: vec![Part::Text(TextPart {
2283 text: format!("tool said: {result_text}"),
2284 metadata: MetadataMap::new(),
2285 })],
2286 metadata: MetadataMap::new(),
2287 }],
2288 usage: None,
2289 metadata: MetadataMap::new(),
2290 })])
2291 } else {
2292 VecDeque::from([
2293 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2294 id: ToolCallId::new("call-1"),
2295 name: tool_name.clone(),
2296 input: json!({ "value": "pong" }),
2297 metadata: MetadataMap::new(),
2298 }),
2299 ModelTurnEvent::Finished(ModelTurnResult {
2300 finish_reason: FinishReason::ToolCall,
2301 output_items: vec![Item {
2302 id: None,
2303 kind: ItemKind::Assistant,
2304 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2305 id: ToolCallId::new("call-1"),
2306 name: tool_name,
2307 input: json!({ "value": "pong" }),
2308 metadata: MetadataMap::new(),
2309 })],
2310 metadata: MetadataMap::new(),
2311 }],
2312 usage: None,
2313 metadata: MetadataMap::new(),
2314 }),
2315 ])
2316 };
2317
2318 Ok(FakeTurn { events })
2319 }
2320 }
2321
2322 #[async_trait]
2323 impl ModelSession for SlowSession {
2324 type Turn = SlowTurn;
2325
2326 async fn begin_turn(
2327 &mut self,
2328 request: TurnRequest,
2329 cancellation: Option<TurnCancellation>,
2330 ) -> Result<Self::Turn, LoopError> {
2331 let should_block = request
2332 .transcript
2333 .iter()
2334 .rev()
2335 .find(|item| item.kind == ItemKind::User)
2336 .is_some_and(|item| {
2337 item.parts.iter().any(|part| match part {
2338 Part::Text(text) => text.text == "do the long task",
2339 _ => false,
2340 })
2341 });
2342
2343 if should_block && let Some(cancellation) = cancellation {
2344 cancellation.cancelled().await;
2345 return Err(LoopError::Cancelled);
2346 }
2347
2348 Ok(SlowTurn { emitted: false })
2349 }
2350 }
2351
2352 #[async_trait]
2353 impl ModelSession for RecordingSession {
2354 type Turn = RecordingTurn;
2355
2356 async fn begin_turn(
2357 &mut self,
2358 request: TurnRequest,
2359 _cancellation: Option<TurnCancellation>,
2360 ) -> Result<Self::Turn, LoopError> {
2361 let descriptions = request
2362 .available_tools
2363 .iter()
2364 .map(|tool| tool.description.clone())
2365 .collect::<Vec<_>>();
2366 self.seen_descriptions.lock().unwrap().push(descriptions);
2367 self.seen_caches.lock().unwrap().push(request.cache.clone());
2368
2369 Ok(RecordingTurn { emitted: false })
2370 }
2371 }
2372
2373 #[async_trait]
2374 impl ModelSession for MultiToolSession {
2375 type Turn = MultiToolTurn;
2376
2377 async fn begin_turn(
2378 &mut self,
2379 request: TurnRequest,
2380 _cancellation: Option<TurnCancellation>,
2381 ) -> Result<Self::Turn, LoopError> {
2382 let has_tool_result = request.transcript.iter().any(|item| {
2383 item.kind == ItemKind::Tool
2384 && item
2385 .parts
2386 .iter()
2387 .any(|part| matches!(part, Part::ToolResult(_)))
2388 });
2389
2390 let events = if has_tool_result {
2391 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2392 finish_reason: FinishReason::Completed,
2393 output_items: vec![Item {
2394 id: None,
2395 kind: ItemKind::Assistant,
2396 parts: vec![Part::Text(TextPart {
2397 text: "mixed tools finished".into(),
2398 metadata: MetadataMap::new(),
2399 })],
2400 metadata: MetadataMap::new(),
2401 }],
2402 usage: None,
2403 metadata: MetadataMap::new(),
2404 })])
2405 } else {
2406 let foreground = agentkit_core::ToolCallPart {
2407 id: ToolCallId::new("call-foreground"),
2408 name: "foreground-wait".into(),
2409 input: json!({}),
2410 metadata: MetadataMap::new(),
2411 };
2412 let background = agentkit_core::ToolCallPart {
2413 id: ToolCallId::new("call-background"),
2414 name: "background-wait".into(),
2415 input: json!({}),
2416 metadata: MetadataMap::new(),
2417 };
2418 VecDeque::from([
2419 ModelTurnEvent::ToolCall(foreground.clone()),
2420 ModelTurnEvent::ToolCall(background.clone()),
2421 ModelTurnEvent::Finished(ModelTurnResult {
2422 finish_reason: FinishReason::ToolCall,
2423 output_items: vec![Item {
2424 id: None,
2425 kind: ItemKind::Assistant,
2426 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2427 metadata: MetadataMap::new(),
2428 }],
2429 usage: None,
2430 metadata: MetadataMap::new(),
2431 }),
2432 ])
2433 };
2434
2435 Ok(MultiToolTurn { events })
2436 }
2437 }
2438
2439 #[async_trait]
2440 impl ModelSession for DualApprovalSession {
2441 type Turn = DualApprovalTurn;
2442
2443 async fn begin_turn(
2444 &mut self,
2445 request: TurnRequest,
2446 _cancellation: Option<TurnCancellation>,
2447 ) -> Result<Self::Turn, LoopError> {
2448 let tool_results = request
2449 .transcript
2450 .iter()
2451 .flat_map(|item| item.parts.iter())
2452 .filter(|part| matches!(part, Part::ToolResult(_)))
2453 .count();
2454
2455 let events = if tool_results >= 2 {
2456 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2457 finish_reason: FinishReason::Completed,
2458 output_items: vec![Item {
2459 id: None,
2460 kind: ItemKind::Assistant,
2461 parts: vec![Part::Text(TextPart {
2462 text: "both approvals finished".into(),
2463 metadata: MetadataMap::new(),
2464 })],
2465 metadata: MetadataMap::new(),
2466 }],
2467 usage: None,
2468 metadata: MetadataMap::new(),
2469 })])
2470 } else {
2471 let first = agentkit_core::ToolCallPart {
2472 id: ToolCallId::new("call-1"),
2473 name: "echo".into(),
2474 input: json!({ "value": "first" }),
2475 metadata: MetadataMap::new(),
2476 };
2477 let second = agentkit_core::ToolCallPart {
2478 id: ToolCallId::new("call-2"),
2479 name: "echo".into(),
2480 input: json!({ "value": "second" }),
2481 metadata: MetadataMap::new(),
2482 };
2483 VecDeque::from([
2484 ModelTurnEvent::ToolCall(first.clone()),
2485 ModelTurnEvent::ToolCall(second.clone()),
2486 ModelTurnEvent::Finished(ModelTurnResult {
2487 finish_reason: FinishReason::ToolCall,
2488 output_items: vec![Item {
2489 id: None,
2490 kind: ItemKind::Assistant,
2491 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2492 metadata: MetadataMap::new(),
2493 }],
2494 usage: None,
2495 metadata: MetadataMap::new(),
2496 }),
2497 ])
2498 };
2499
2500 Ok(DualApprovalTurn { events })
2501 }
2502 }
2503
2504 #[async_trait]
2505 impl ModelTurn for FakeTurn {
2506 async fn next_event(
2507 &mut self,
2508 _cancellation: Option<TurnCancellation>,
2509 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2510 Ok(self.events.pop_front())
2511 }
2512 }
2513
2514 #[async_trait]
2515 impl ModelTurn for SlowTurn {
2516 async fn next_event(
2517 &mut self,
2518 cancellation: Option<TurnCancellation>,
2519 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2520 if let Some(cancellation) = cancellation
2521 && cancellation.is_cancelled()
2522 {
2523 return Err(LoopError::Cancelled);
2524 }
2525
2526 if self.emitted {
2527 Ok(None)
2528 } else {
2529 self.emitted = true;
2530 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2531 finish_reason: FinishReason::Completed,
2532 output_items: vec![Item {
2533 id: None,
2534 kind: ItemKind::Assistant,
2535 parts: vec![Part::Text(TextPart {
2536 text: "done".into(),
2537 metadata: MetadataMap::new(),
2538 })],
2539 metadata: MetadataMap::new(),
2540 }],
2541 usage: None,
2542 metadata: MetadataMap::new(),
2543 })))
2544 }
2545 }
2546 }
2547
2548 #[async_trait]
2549 impl ModelTurn for RecordingTurn {
2550 async fn next_event(
2551 &mut self,
2552 _cancellation: Option<TurnCancellation>,
2553 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2554 if self.emitted {
2555 Ok(None)
2556 } else {
2557 self.emitted = true;
2558 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2559 finish_reason: FinishReason::Completed,
2560 output_items: vec![Item {
2561 id: None,
2562 kind: ItemKind::Assistant,
2563 parts: vec![Part::Text(TextPart {
2564 text: "done".into(),
2565 metadata: MetadataMap::new(),
2566 })],
2567 metadata: MetadataMap::new(),
2568 }],
2569 usage: None,
2570 metadata: MetadataMap::new(),
2571 })))
2572 }
2573 }
2574 }
2575
2576 #[async_trait]
2577 impl ModelTurn for MultiToolTurn {
2578 async fn next_event(
2579 &mut self,
2580 _cancellation: Option<TurnCancellation>,
2581 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2582 Ok(self.events.pop_front())
2583 }
2584 }
2585
2586 #[async_trait]
2587 impl ModelTurn for DualApprovalTurn {
2588 async fn next_event(
2589 &mut self,
2590 _cancellation: Option<TurnCancellation>,
2591 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2592 Ok(self.events.pop_front())
2593 }
2594 }
2595
2596 #[derive(Clone)]
2597 struct EchoTool {
2598 spec: ToolSpec,
2599 }
2600
2601 impl Default for EchoTool {
2602 fn default() -> Self {
2603 Self {
2604 spec: ToolSpec {
2605 name: ToolName::new("echo"),
2606 description: "Echo back a value".into(),
2607 input_schema: json!({
2608 "type": "object",
2609 "properties": {
2610 "value": { "type": "string" }
2611 },
2612 "required": ["value"],
2613 "additionalProperties": false
2614 }),
2615 annotations: ToolAnnotations::default(),
2616 metadata: MetadataMap::new(),
2617 },
2618 }
2619 }
2620 }
2621
2622 #[derive(Clone)]
2623 struct DynamicSpecTool {
2624 spec: ToolSpec,
2625 version: StdArc<AtomicUsize>,
2626 }
2627
2628 impl DynamicSpecTool {
2629 fn new(version: StdArc<AtomicUsize>) -> Self {
2630 Self {
2631 spec: ToolSpec {
2632 name: ToolName::new("dynamic"),
2633 description: "dynamic version 0".into(),
2634 input_schema: json!({
2635 "type": "object",
2636 "properties": {},
2637 "additionalProperties": false
2638 }),
2639 annotations: ToolAnnotations::default(),
2640 metadata: MetadataMap::new(),
2641 },
2642 version,
2643 }
2644 }
2645 }
2646
2647 #[async_trait]
2648 impl Tool for EchoTool {
2649 fn spec(&self) -> &ToolSpec {
2650 &self.spec
2651 }
2652
2653 fn proposed_requests(
2654 &self,
2655 request: &agentkit_tools_core::ToolRequest,
2656 ) -> Result<
2657 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2658 agentkit_tools_core::ToolError,
2659 > {
2660 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2661 path: "/tmp/echo".into(),
2662 metadata: request.metadata.clone(),
2663 })])
2664 }
2665
2666 async fn invoke(
2667 &self,
2668 request: agentkit_tools_core::ToolRequest,
2669 _ctx: &mut ToolContext<'_>,
2670 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2671 let value = request
2672 .input
2673 .get("value")
2674 .and_then(Value::as_str)
2675 .ok_or_else(|| {
2676 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2677 })?;
2678
2679 Ok(ToolResult {
2680 result: ToolResultPart {
2681 call_id: request.call_id,
2682 output: ToolOutput::Text(value.into()),
2683 is_error: false,
2684 metadata: MetadataMap::new(),
2685 },
2686 duration: None,
2687 metadata: MetadataMap::new(),
2688 })
2689 }
2690 }
2691
2692 #[async_trait]
2693 impl Tool for DynamicSpecTool {
2694 fn spec(&self) -> &ToolSpec {
2695 &self.spec
2696 }
2697
2698 fn current_spec(&self) -> Option<ToolSpec> {
2699 let mut spec = self.spec.clone();
2700 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2701 Some(spec)
2702 }
2703
2704 async fn invoke(
2705 &self,
2706 request: agentkit_tools_core::ToolRequest,
2707 _ctx: &mut ToolContext<'_>,
2708 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2709 Ok(ToolResult {
2710 result: ToolResultPart {
2711 call_id: request.call_id,
2712 output: ToolOutput::Text("ok".into()),
2713 is_error: false,
2714 metadata: MetadataMap::new(),
2715 },
2716 duration: None,
2717 metadata: MetadataMap::new(),
2718 })
2719 }
2720 }
2721
2722 struct DenyFsReads;
2723
2724 impl PermissionChecker for DenyFsReads {
2725 fn evaluate(
2726 &self,
2727 request: &dyn agentkit_tools_core::PermissionRequest,
2728 ) -> PermissionDecision {
2729 if request.kind() == "filesystem.read" {
2730 return PermissionDecision::Deny(PermissionDenial {
2731 code: PermissionCode::PathNotAllowed,
2732 message: "reads denied in test".into(),
2733 metadata: MetadataMap::new(),
2734 });
2735 }
2736
2737 PermissionDecision::Allow
2738 }
2739 }
2740
2741 struct ApproveFsReads;
2742
2743 impl PermissionChecker for ApproveFsReads {
2744 fn evaluate(
2745 &self,
2746 request: &dyn agentkit_tools_core::PermissionRequest,
2747 ) -> PermissionDecision {
2748 if request.kind() == "filesystem.read" {
2749 return PermissionDecision::RequireApproval(ApprovalRequest {
2750 task_id: None,
2751 call_id: None,
2752 id: "approval:fs-read".into(),
2753 request_kind: request.kind().into(),
2754 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2755 summary: request.summary(),
2756 metadata: request.metadata().clone(),
2757 });
2758 }
2759
2760 PermissionDecision::Allow
2761 }
2762 }
2763
2764 struct CountTrigger;
2765
2766 impl CompactionTrigger for CountTrigger {
2767 fn should_compact(
2768 &self,
2769 _session_id: &SessionId,
2770 _turn_id: Option<&agentkit_core::TurnId>,
2771 transcript: &[Item],
2772 ) -> Option<agentkit_compaction::CompactionReason> {
2773 (transcript.len() >= 2)
2774 .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2775 }
2776 }
2777
2778 struct RecordingObserver {
2779 events: StdArc<StdMutex<Vec<AgentEvent>>>,
2780 }
2781
2782 impl LoopObserver for RecordingObserver {
2783 fn handle_event(&mut self, event: AgentEvent) {
2784 self.events.lock().unwrap().push(event);
2785 }
2786 }
2787
2788 #[derive(Clone)]
2789 struct AuthTool {
2790 spec: ToolSpec,
2791 }
2792
2793 impl Default for AuthTool {
2794 fn default() -> Self {
2795 Self {
2796 spec: ToolSpec {
2797 name: ToolName::new("auth-tool"),
2798 description: "Always requires auth".into(),
2799 input_schema: json!({
2800 "type": "object",
2801 "properties": {},
2802 "additionalProperties": false
2803 }),
2804 annotations: ToolAnnotations::default(),
2805 metadata: MetadataMap::new(),
2806 },
2807 }
2808 }
2809 }
2810
2811 #[async_trait]
2812 impl Tool for AuthTool {
2813 fn spec(&self) -> &ToolSpec {
2814 &self.spec
2815 }
2816
2817 async fn invoke(
2818 &self,
2819 request: agentkit_tools_core::ToolRequest,
2820 _ctx: &mut ToolContext<'_>,
2821 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2822 let mut challenge = MetadataMap::new();
2823 challenge.insert("server_id".into(), json!("mock"));
2824 challenge.insert("scope".into(), json!("secret.read"));
2825
2826 Err(agentkit_tools_core::ToolError::AuthRequired(Box::new(
2827 AuthRequest {
2828 task_id: None,
2829 id: "auth-1".into(),
2830 provider: "mcp.mock".into(),
2831 operation: AuthOperation::ToolCall {
2832 tool_name: request.tool_name.0,
2833 input: request.input,
2834 call_id: Some(request.call_id),
2835 session_id: Some(request.session_id),
2836 turn_id: Some(request.turn_id),
2837 metadata: request.metadata,
2838 },
2839 challenge,
2840 },
2841 )))
2842 }
2843 }
2844
2845 #[derive(Clone)]
2846 struct BlockingTool {
2847 spec: ToolSpec,
2848 entered: StdArc<AtomicBool>,
2849 release: StdArc<Notify>,
2850 output: &'static str,
2851 }
2852
2853 impl BlockingTool {
2854 fn new(
2855 name: &str,
2856 entered: StdArc<AtomicBool>,
2857 release: StdArc<Notify>,
2858 output: &'static str,
2859 ) -> Self {
2860 Self {
2861 spec: ToolSpec {
2862 name: ToolName::new(name),
2863 description: format!("blocking tool {name}"),
2864 input_schema: json!({
2865 "type": "object",
2866 "properties": {},
2867 "additionalProperties": false
2868 }),
2869 annotations: ToolAnnotations::default(),
2870 metadata: MetadataMap::new(),
2871 },
2872 entered,
2873 release,
2874 output,
2875 }
2876 }
2877 }
2878
2879 #[async_trait]
2880 impl Tool for BlockingTool {
2881 fn spec(&self) -> &ToolSpec {
2882 &self.spec
2883 }
2884
2885 async fn invoke(
2886 &self,
2887 request: agentkit_tools_core::ToolRequest,
2888 _ctx: &mut ToolContext<'_>,
2889 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2890 self.entered.store(true, Ordering::SeqCst);
2891 self.release.notified().await;
2892 Ok(ToolResult {
2893 result: ToolResultPart {
2894 call_id: request.call_id,
2895 output: ToolOutput::Text(self.output.into()),
2896 is_error: false,
2897 metadata: MetadataMap::new(),
2898 },
2899 duration: None,
2900 metadata: MetadataMap::new(),
2901 })
2902 }
2903 }
2904
2905 struct NameRoutingPolicy {
2906 routes: Vec<(String, RoutingDecision)>,
2907 }
2908
2909 impl NameRoutingPolicy {
2910 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
2911 Self {
2912 routes: routes
2913 .into_iter()
2914 .map(|(name, decision)| (name.into(), decision))
2915 .collect(),
2916 }
2917 }
2918 }
2919
2920 impl TaskRoutingPolicy for NameRoutingPolicy {
2921 fn route(&self, request: &ToolRequest) -> RoutingDecision {
2922 self.routes
2923 .iter()
2924 .find(|(name, _)| name == &request.tool_name.0)
2925 .map(|(_, decision)| *decision)
2926 .unwrap_or(RoutingDecision::Foreground)
2927 }
2928 }
2929
2930 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
2931 timeout(Duration::from_secs(1), handle.next_event())
2932 .await
2933 .expect("timed out waiting for task event")
2934 .expect("task event stream ended unexpectedly")
2935 }
2936
2937 async fn wait_until_entered(flag: &AtomicBool) {
2938 timeout(Duration::from_secs(1), async {
2939 while !flag.load(Ordering::SeqCst) {
2940 tokio::task::yield_now().await;
2941 }
2942 })
2943 .await
2944 .expect("task never entered execution");
2945 }
2946
2947 #[tokio::test]
2948 async fn loop_continues_after_completed_tool_call() {
2949 let tools = ToolRegistry::new().with(EchoTool::default());
2950 let agent = Agent::builder()
2951 .model(FakeAdapter)
2952 .tools(tools)
2953 .permissions(AllowAllPermissions)
2954 .build()
2955 .unwrap();
2956
2957 let mut driver = agent
2958 .start(SessionConfig {
2959 session_id: SessionId::new("session-1"),
2960 metadata: MetadataMap::new(),
2961 cache: None,
2962 })
2963 .await
2964 .unwrap();
2965
2966 driver
2967 .submit_input(vec![Item {
2968 id: None,
2969 kind: ItemKind::User,
2970 parts: vec![Part::Text(TextPart {
2971 text: "ping".into(),
2972 metadata: MetadataMap::new(),
2973 })],
2974 metadata: MetadataMap::new(),
2975 }])
2976 .unwrap();
2977
2978 let result = driver.next().await.unwrap();
2979
2980 match result {
2981 LoopStep::Finished(turn) => {
2982 assert_eq!(turn.finish_reason, FinishReason::Completed);
2983 assert_eq!(turn.items.len(), 1);
2984 match &turn.items[0].parts[0] {
2985 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
2986 other => panic!("unexpected part: {other:?}"),
2987 }
2988 }
2989 other => panic!("unexpected loop step: {other:?}"),
2990 }
2991 }
2992
2993 #[tokio::test]
2994 async fn loop_uses_injected_permission_checker() {
2995 let tools = ToolRegistry::new().with(EchoTool::default());
2996 let agent = Agent::builder()
2997 .model(FakeAdapter)
2998 .tools(tools)
2999 .permissions(DenyFsReads)
3000 .build()
3001 .unwrap();
3002
3003 let mut driver = agent
3004 .start(SessionConfig {
3005 session_id: SessionId::new("session-2"),
3006 metadata: MetadataMap::new(),
3007 cache: None,
3008 })
3009 .await
3010 .unwrap();
3011
3012 driver
3013 .submit_input(vec![Item {
3014 id: None,
3015 kind: ItemKind::User,
3016 parts: vec![Part::Text(TextPart {
3017 text: "ping".into(),
3018 metadata: MetadataMap::new(),
3019 })],
3020 metadata: MetadataMap::new(),
3021 }])
3022 .unwrap();
3023
3024 let result = driver.next().await.unwrap();
3025
3026 match result {
3027 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3028 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3029 other => panic!("unexpected part: {other:?}"),
3030 },
3031 other => panic!("unexpected loop step: {other:?}"),
3032 }
3033 }
3034
3035 #[tokio::test]
3036 async fn loop_surfaces_auth_interruptions_from_tools() {
3037 let tools = ToolRegistry::new().with(AuthTool::default());
3038 let agent = Agent::builder()
3039 .model(FakeAdapter)
3040 .tools(tools)
3041 .permissions(AllowAllPermissions)
3042 .build()
3043 .unwrap();
3044
3045 let mut driver = agent
3046 .start(SessionConfig {
3047 session_id: SessionId::new("session-3"),
3048 metadata: MetadataMap::new(),
3049 cache: None,
3050 })
3051 .await
3052 .unwrap();
3053
3054 driver
3055 .submit_input(vec![Item {
3056 id: None,
3057 kind: ItemKind::User,
3058 parts: vec![Part::Text(TextPart {
3059 text: "ping".into(),
3060 metadata: MetadataMap::new(),
3061 })],
3062 metadata: MetadataMap::new(),
3063 }])
3064 .unwrap();
3065
3066 let result = driver.next().await.unwrap();
3067
3068 match result {
3069 LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
3070 let request = &pending.request;
3071 assert!(request.task_id.is_some());
3072 assert_eq!(request.provider, "mcp.mock");
3073 assert_eq!(request.challenge.get("scope"), Some(&json!("secret.read")));
3074 match &request.operation {
3075 AuthOperation::ToolCall { tool_name, .. } => {
3076 assert_eq!(tool_name, "auth-tool");
3077 }
3078 other => panic!("unexpected auth operation: {other:?}"),
3079 }
3080 }
3081 other => panic!("unexpected loop step: {other:?}"),
3082 }
3083 }
3084
3085 #[tokio::test]
3086 async fn async_task_manager_background_round_requires_explicit_continue() {
3087 let entered = StdArc::new(AtomicBool::new(false));
3088 let release = StdArc::new(Notify::new());
3089 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3090 "background-wait",
3091 RoutingDecision::Background,
3092 )]));
3093 let handle = task_manager.handle();
3094 let tools = ToolRegistry::new().with(BlockingTool::new(
3095 "background-wait",
3096 entered.clone(),
3097 release.clone(),
3098 "background-done",
3099 ));
3100 let agent = Agent::builder()
3101 .model(FakeAdapter)
3102 .tools(tools)
3103 .permissions(AllowAllPermissions)
3104 .task_manager(task_manager)
3105 .build()
3106 .unwrap();
3107
3108 let mut driver = agent
3109 .start(SessionConfig {
3110 session_id: SessionId::new("session-background"),
3111 metadata: MetadataMap::new(),
3112 cache: None,
3113 })
3114 .await
3115 .unwrap();
3116
3117 driver
3118 .submit_input(vec![Item {
3119 id: None,
3120 kind: ItemKind::User,
3121 parts: vec![Part::Text(TextPart {
3122 text: "ping".into(),
3123 metadata: MetadataMap::new(),
3124 })],
3125 metadata: MetadataMap::new(),
3126 }])
3127 .unwrap();
3128
3129 let first = driver.next().await.unwrap();
3130 match first {
3131 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3132 other => panic!("unexpected first loop step: {other:?}"),
3133 }
3134
3135 match wait_for_task_event(&handle).await {
3136 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3137 other => panic!("unexpected task event: {other:?}"),
3138 }
3139 wait_until_entered(entered.as_ref()).await;
3140 release.notify_waiters();
3141
3142 match wait_for_task_event(&handle).await {
3143 TaskEvent::Completed(_, result) => {
3144 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3145 }
3146 other => panic!("unexpected completion event: {other:?}"),
3147 }
3148
3149 let resumed = driver.next().await.unwrap();
3150 match resumed {
3151 LoopStep::Finished(turn) => {
3152 assert_eq!(turn.finish_reason, FinishReason::Completed);
3153 match &turn.items[0].parts[0] {
3154 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3155 other => panic!("unexpected part after resume: {other:?}"),
3156 }
3157 }
3158 other => panic!("unexpected resumed step: {other:?}"),
3159 }
3160 }
3161
3162 #[tokio::test]
3163 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3164 let controller = CancellationController::new();
3165 let agent = Agent::builder()
3166 .model(SlowAdapter)
3167 .cancellation(controller.handle())
3168 .build()
3169 .unwrap();
3170
3171 let mut driver = agent
3172 .start(SessionConfig {
3173 session_id: SessionId::new("session-cancel"),
3174 metadata: MetadataMap::new(),
3175 cache: None,
3176 })
3177 .await
3178 .unwrap();
3179
3180 driver
3181 .submit_input(vec![Item {
3182 id: None,
3183 kind: ItemKind::User,
3184 parts: vec![Part::Text(TextPart {
3185 text: "do the long task".into(),
3186 metadata: MetadataMap::new(),
3187 })],
3188 metadata: MetadataMap::new(),
3189 }])
3190 .unwrap();
3191
3192 let cancelled = tokio::join!(async { driver.next().await }, async {
3193 tokio::task::yield_now().await;
3194 controller.interrupt();
3195 })
3196 .0
3197 .unwrap();
3198
3199 match cancelled {
3200 LoopStep::Finished(turn) => {
3201 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3202 assert_eq!(turn.items.len(), 1);
3203 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3204 assert_eq!(
3205 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3206 Some(&Value::Bool(true))
3207 );
3208 }
3209 other => panic!("unexpected loop step: {other:?}"),
3210 }
3211
3212 driver
3213 .submit_input(vec![Item {
3214 id: None,
3215 kind: ItemKind::User,
3216 parts: vec![Part::Text(TextPart {
3217 text: "try again".into(),
3218 metadata: MetadataMap::new(),
3219 })],
3220 metadata: MetadataMap::new(),
3221 }])
3222 .unwrap();
3223
3224 let result = driver.next().await.unwrap();
3225 match result {
3226 LoopStep::Finished(turn) => {
3227 assert_eq!(turn.finish_reason, FinishReason::Completed);
3228 }
3229 other => panic!("unexpected loop step after retry: {other:?}"),
3230 }
3231 }
3232
3233 #[tokio::test]
3234 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3235 let controller = CancellationController::new();
3236 let fg_entered = StdArc::new(AtomicBool::new(false));
3237 let fg_release = StdArc::new(Notify::new());
3238 let bg_entered = StdArc::new(AtomicBool::new(false));
3239 let bg_release = StdArc::new(Notify::new());
3240 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3241 ("foreground-wait", RoutingDecision::Foreground),
3242 ("background-wait", RoutingDecision::Background),
3243 ]));
3244 let handle = task_manager.handle();
3245 let tools = ToolRegistry::new()
3246 .with(BlockingTool::new(
3247 "foreground-wait",
3248 fg_entered.clone(),
3249 fg_release,
3250 "foreground-done",
3251 ))
3252 .with(BlockingTool::new(
3253 "background-wait",
3254 bg_entered.clone(),
3255 bg_release.clone(),
3256 "background-done",
3257 ));
3258 let agent = Agent::builder()
3259 .model(MultiToolAdapter)
3260 .tools(tools)
3261 .permissions(AllowAllPermissions)
3262 .cancellation(controller.handle())
3263 .task_manager(task_manager)
3264 .build()
3265 .unwrap();
3266
3267 let mut driver = agent
3268 .start(SessionConfig {
3269 session_id: SessionId::new("session-mixed-cancel"),
3270 metadata: MetadataMap::new(),
3271 cache: None,
3272 })
3273 .await
3274 .unwrap();
3275
3276 driver
3277 .submit_input(vec![Item {
3278 id: None,
3279 kind: ItemKind::User,
3280 parts: vec![Part::Text(TextPart {
3281 text: "run both".into(),
3282 metadata: MetadataMap::new(),
3283 })],
3284 metadata: MetadataMap::new(),
3285 }])
3286 .unwrap();
3287
3288 let cancelled = tokio::join!(async { driver.next().await }, async {
3289 let _ = wait_for_task_event(&handle).await;
3290 let _ = wait_for_task_event(&handle).await;
3291 wait_until_entered(fg_entered.as_ref()).await;
3292 wait_until_entered(bg_entered.as_ref()).await;
3293 controller.interrupt();
3294 })
3295 .0
3296 .unwrap();
3297
3298 match cancelled {
3299 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3300 other => panic!("unexpected loop step after interrupt: {other:?}"),
3301 }
3302
3303 match wait_for_task_event(&handle).await {
3304 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3305 other => panic!("unexpected post-interrupt event: {other:?}"),
3306 }
3307
3308 let running = handle.list_running().await;
3309 assert_eq!(running.len(), 1);
3310 assert_eq!(running[0].tool_name, "background-wait");
3311
3312 bg_release.notify_waiters();
3313 match wait_for_task_event(&handle).await {
3314 TaskEvent::Completed(snapshot, result) => {
3315 assert_eq!(snapshot.tool_name, "background-wait");
3316 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3317 }
3318 other => panic!("unexpected background completion event: {other:?}"),
3319 }
3320 }
3321
3322 #[tokio::test]
3323 async fn loop_resumes_after_approved_tool_request() {
3324 let tools = ToolRegistry::new().with(EchoTool::default());
3325 let agent = Agent::builder()
3326 .model(FakeAdapter)
3327 .tools(tools)
3328 .permissions(ApproveFsReads)
3329 .build()
3330 .unwrap();
3331
3332 let mut driver = agent
3333 .start(SessionConfig {
3334 session_id: SessionId::new("session-approval"),
3335 metadata: MetadataMap::new(),
3336 cache: None,
3337 })
3338 .await
3339 .unwrap();
3340
3341 driver
3342 .submit_input(vec![Item {
3343 id: None,
3344 kind: ItemKind::User,
3345 parts: vec![Part::Text(TextPart {
3346 text: "ping".into(),
3347 metadata: MetadataMap::new(),
3348 })],
3349 metadata: MetadataMap::new(),
3350 }])
3351 .unwrap();
3352
3353 let first = driver.next().await.unwrap();
3354 match first {
3355 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3356 assert!(pending.request.task_id.is_some());
3357 assert_eq!(pending.request.id.0, "approval:fs-read");
3358 pending.approve(&mut driver).unwrap();
3359 }
3360 other => panic!("unexpected loop step: {other:?}"),
3361 }
3362 let second = driver.next().await.unwrap();
3363 match second {
3364 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3365 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3366 other => panic!("unexpected part: {other:?}"),
3367 },
3368 other => panic!("unexpected loop step after approval: {other:?}"),
3369 }
3370 }
3371
3372 #[tokio::test]
3373 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3374 let tools = ToolRegistry::new().with(EchoTool::default());
3375 let agent = Agent::builder()
3376 .model(DualApprovalAdapter)
3377 .tools(tools)
3378 .permissions(ApproveFsReads)
3379 .build()
3380 .unwrap();
3381
3382 let mut driver = agent
3383 .start(SessionConfig {
3384 session_id: SessionId::new("session-dual-approval"),
3385 metadata: MetadataMap::new(),
3386 cache: None,
3387 })
3388 .await
3389 .unwrap();
3390
3391 driver
3392 .submit_input(vec![Item {
3393 id: None,
3394 kind: ItemKind::User,
3395 parts: vec![Part::Text(TextPart {
3396 text: "run both approvals".into(),
3397 metadata: MetadataMap::new(),
3398 })],
3399 metadata: MetadataMap::new(),
3400 }])
3401 .unwrap();
3402
3403 let pending_first = match driver.next().await.unwrap() {
3404 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3405 assert_eq!(
3406 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3407 Some("call-1")
3408 );
3409 pending
3410 }
3411 other => panic!("unexpected first loop step: {other:?}"),
3412 };
3413
3414 let pending_second = match driver.next().await.unwrap() {
3415 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3416 assert_eq!(
3417 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3418 Some("call-2")
3419 );
3420 pending
3421 }
3422 other => panic!("unexpected second loop step: {other:?}"),
3423 };
3424
3425 pending_second.approve(&mut driver).unwrap();
3426 match driver.next().await.unwrap() {
3427 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3428 assert_eq!(
3429 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3430 Some("call-1")
3431 );
3432 }
3433 other => panic!("unexpected step after approving second request: {other:?}"),
3434 }
3435
3436 pending_first.approve(&mut driver).unwrap();
3437 match driver.next().await.unwrap() {
3438 LoopStep::Finished(turn) => {
3439 assert_eq!(turn.finish_reason, FinishReason::Completed);
3440 match &turn.items[0].parts[0] {
3441 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3442 other => panic!("unexpected final part: {other:?}"),
3443 }
3444 }
3445 other => panic!("unexpected final loop step: {other:?}"),
3446 }
3447 }
3448
3449 #[tokio::test]
3450 async fn loop_compacts_transcript_before_new_turns() {
3451 let events = StdArc::new(StdMutex::new(Vec::new()));
3452 let agent = Agent::builder()
3453 .model(FakeAdapter)
3454 .compaction(CompactionConfig::new(
3455 CountTrigger,
3456 CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3457 ))
3458 .observer(RecordingObserver {
3459 events: events.clone(),
3460 })
3461 .build()
3462 .unwrap();
3463
3464 let mut driver = agent
3465 .start(SessionConfig {
3466 session_id: SessionId::new("session-4"),
3467 metadata: MetadataMap::new(),
3468 cache: None,
3469 })
3470 .await
3471 .unwrap();
3472
3473 for text in ["first", "second"] {
3474 driver
3475 .submit_input(vec![Item {
3476 id: None,
3477 kind: ItemKind::User,
3478 parts: vec![Part::Text(TextPart {
3479 text: text.into(),
3480 metadata: MetadataMap::new(),
3481 })],
3482 metadata: MetadataMap::new(),
3483 }])
3484 .unwrap();
3485 let _ = driver.next().await.unwrap();
3486 }
3487
3488 let events = events.lock().unwrap();
3489 assert!(events.iter().any(|event| matches!(
3490 event,
3491 AgentEvent::CompactionFinished {
3492 replaced_items,
3493 ..
3494 } if *replaced_items > 0
3495 )));
3496 }
3497
3498 #[tokio::test]
3499 async fn loop_refreshes_tool_specs_each_turn() {
3500 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3501 let version = StdArc::new(AtomicUsize::new(1));
3502 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3503 let agent = Agent::builder()
3504 .model(RecordingAdapter {
3505 seen_descriptions: seen_descriptions.clone(),
3506 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3507 })
3508 .tools(tools)
3509 .permissions(AllowAllPermissions)
3510 .build()
3511 .unwrap();
3512
3513 let mut driver = agent
3514 .start(SessionConfig {
3515 session_id: SessionId::new("session-dynamic-tools"),
3516 metadata: MetadataMap::new(),
3517 cache: None,
3518 })
3519 .await
3520 .unwrap();
3521
3522 for text in ["first", "second"] {
3523 driver
3524 .submit_input(vec![Item {
3525 id: None,
3526 kind: ItemKind::User,
3527 parts: vec![Part::Text(TextPart {
3528 text: text.into(),
3529 metadata: MetadataMap::new(),
3530 })],
3531 metadata: MetadataMap::new(),
3532 }])
3533 .unwrap();
3534
3535 let _ = driver.next().await.unwrap();
3536 if text == "first" {
3537 version.store(2, Ordering::SeqCst);
3538 }
3539 }
3540
3541 let seen_descriptions = seen_descriptions.lock().unwrap();
3542 assert_eq!(seen_descriptions.len(), 2);
3543 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3544 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3545 }
3546
3547 #[tokio::test]
3548 async fn loop_passes_session_default_and_next_turn_cache_requests() {
3549 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3550 let agent = Agent::builder()
3551 .model(RecordingAdapter {
3552 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3553 seen_caches: seen_caches.clone(),
3554 })
3555 .permissions(AllowAllPermissions)
3556 .build()
3557 .unwrap();
3558
3559 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3560 .with_retention(PromptCacheRetention::Short);
3561 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3562 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3563 });
3564
3565 let mut driver = agent
3566 .start(SessionConfig {
3567 session_id: SessionId::new("session-cache"),
3568 metadata: MetadataMap::new(),
3569 cache: Some(default_cache.clone()),
3570 })
3571 .await
3572 .unwrap();
3573
3574 driver
3575 .submit_input(vec![Item {
3576 id: None,
3577 kind: ItemKind::User,
3578 parts: vec![Part::Text(TextPart {
3579 text: "first".into(),
3580 metadata: MetadataMap::new(),
3581 })],
3582 metadata: MetadataMap::new(),
3583 }])
3584 .unwrap();
3585 let _ = driver.next().await.unwrap();
3586
3587 driver
3588 .submit_input_with_cache(
3589 vec![Item {
3590 id: None,
3591 kind: ItemKind::User,
3592 parts: vec![Part::Text(TextPart {
3593 text: "second".into(),
3594 metadata: MetadataMap::new(),
3595 })],
3596 metadata: MetadataMap::new(),
3597 }],
3598 override_cache.clone(),
3599 )
3600 .unwrap();
3601 let _ = driver.next().await.unwrap();
3602
3603 let seen = seen_caches.lock().unwrap();
3604 assert_eq!(seen.len(), 2);
3605 assert_eq!(seen[0], Some(default_cache));
3606 assert_eq!(seen[1], Some(override_cache));
3607 }
3608
3609 #[test]
3610 fn convenience_cache_builders_construct_expected_defaults() {
3611 let cache = PromptCacheRequest::automatic()
3612 .with_retention(PromptCacheRetention::Short)
3613 .with_key("workspace:demo");
3614 let session = SessionConfig::new("demo").with_cache(cache.clone());
3615
3616 assert_eq!(session.session_id, SessionId::new("demo"));
3617 assert_eq!(session.cache, Some(cache));
3618
3619 let explicit = PromptCacheRequest::explicit([
3620 PromptCacheBreakpoint::tools_end(),
3621 PromptCacheBreakpoint::transcript_item_end(2),
3622 PromptCacheBreakpoint::transcript_part_end(3, 1),
3623 ]);
3624
3625 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
3626 assert_eq!(
3627 explicit.strategy,
3628 PromptCacheStrategy::Explicit {
3629 breakpoints: vec![
3630 PromptCacheBreakpoint::ToolsEnd,
3631 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
3632 PromptCacheBreakpoint::TranscriptPartEnd {
3633 item_index: 3,
3634 part_index: 1,
3635 },
3636 ],
3637 }
3638 );
3639 }
3640}