1use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_compaction::{
68 CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
69};
70use agentkit_core::{
71 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
72 TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
73};
74use agentkit_task_manager::{
75 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
76 TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
77};
78#[cfg(test)]
79use agentkit_tools_core::ToolContext;
80use agentkit_tools_core::{
81 AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
82 PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutor, ToolRequest, ToolResources,
83 ToolSource, ToolSpec,
84};
85use async_trait::async_trait;
86use serde::{Deserialize, Serialize};
87use thiserror::Error;
88
89const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
90const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
91const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
92const USER_CANCELLED_REASON: &str = "user_cancelled";
93
94#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SessionConfig {
110 pub session_id: SessionId,
112 pub metadata: MetadataMap,
114 pub cache: Option<PromptCacheRequest>,
116}
117
118impl SessionConfig {
119 pub fn new(session_id: impl Into<SessionId>) -> Self {
121 Self {
122 session_id: session_id.into(),
123 metadata: MetadataMap::new(),
124 cache: None,
125 }
126 }
127
128 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
130 self.metadata = metadata;
131 self
132 }
133
134 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
136 self.cache = Some(cache);
137 self
138 }
139
140 pub fn without_cache(mut self) -> Self {
142 self.cache = None;
143 self
144 }
145}
146
147#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub enum PromptCacheMode {
154 Disabled,
156 #[default]
158 BestEffort,
159 Required,
161}
162
163#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
169pub enum PromptCacheRetention {
170 Default,
172 Short,
174 Extended,
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
180pub enum PromptCacheStrategy {
181 #[default]
183 Automatic,
184 Explicit {
186 breakpoints: Vec<PromptCacheBreakpoint>,
188 },
189}
190
191impl PromptCacheStrategy {
192 pub fn automatic() -> Self {
195 Self::Automatic
196 }
197
198 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
200 Self::Explicit {
201 breakpoints: breakpoints.into_iter().collect(),
202 }
203 }
204}
205
206#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
208pub enum PromptCacheBreakpoint {
209 ToolsEnd,
211 TranscriptItemEnd { index: usize },
213 TranscriptPartEnd {
219 item_index: usize,
220 part_index: usize,
221 },
222}
223
224impl PromptCacheBreakpoint {
225 pub fn tools_end() -> Self {
227 Self::ToolsEnd
228 }
229
230 pub fn transcript_item_end(index: usize) -> Self {
232 Self::TranscriptItemEnd { index }
233 }
234
235 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
237 Self::TranscriptPartEnd {
238 item_index,
239 part_index,
240 }
241 }
242}
243
244#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
246pub struct PromptCacheRequest {
247 pub mode: PromptCacheMode,
249 pub strategy: PromptCacheStrategy,
251 pub retention: Option<PromptCacheRetention>,
253 pub key: Option<String>,
255}
256
257impl PromptCacheRequest {
258 pub fn automatic() -> Self {
260 Self::best_effort(PromptCacheStrategy::automatic())
261 }
262
263 pub fn automatic_required() -> Self {
265 Self::required(PromptCacheStrategy::automatic())
266 }
267
268 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
270 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
271 }
272
273 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
275 Self::required(PromptCacheStrategy::explicit(breakpoints))
276 }
277
278 pub fn disabled() -> Self {
280 Self {
281 mode: PromptCacheMode::Disabled,
282 strategy: PromptCacheStrategy::Automatic,
283 retention: None,
284 key: None,
285 }
286 }
287
288 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
290 Self {
291 mode: PromptCacheMode::BestEffort,
292 strategy,
293 retention: None,
294 key: None,
295 }
296 }
297
298 pub fn required(strategy: PromptCacheStrategy) -> Self {
300 Self {
301 mode: PromptCacheMode::Required,
302 strategy,
303 retention: None,
304 key: None,
305 }
306 }
307
308 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
310 self.mode = mode;
311 self
312 }
313
314 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
316 self.strategy = strategy;
317 self
318 }
319
320 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
322 self.retention = Some(retention);
323 self
324 }
325
326 pub fn with_key(mut self, key: impl Into<String>) -> Self {
328 self.key = Some(key.into());
329 self
330 }
331
332 pub fn without_retention(mut self) -> Self {
334 self.retention = None;
335 self
336 }
337
338 pub fn without_key(mut self) -> Self {
340 self.key = None;
341 self
342 }
343
344 pub fn is_enabled(&self) -> bool {
346 !matches!(self.mode, PromptCacheMode::Disabled)
347 }
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356pub struct TurnRequest {
357 pub session_id: SessionId,
359 pub turn_id: agentkit_core::TurnId,
361 pub transcript: Vec<Item>,
363 pub available_tools: Vec<ToolSpec>,
365 pub cache: Option<PromptCacheRequest>,
367 pub metadata: MetadataMap,
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
376pub struct ModelTurnResult {
377 pub finish_reason: FinishReason,
379 pub output_items: Vec<Item>,
381 pub usage: Option<Usage>,
383 pub metadata: MetadataMap,
385}
386
387#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
393pub enum ModelTurnEvent {
394 Delta(Delta),
396 ToolCall(ToolCallPart),
398 Usage(Usage),
400 Finished(ModelTurnResult),
402}
403
404#[async_trait]
441pub trait ModelAdapter: Send + Sync {
442 type Session: ModelSession;
444
445 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
451}
452
453#[async_trait]
460pub trait ModelSession: Send {
461 type Turn: ModelTurn;
463
464 async fn begin_turn(
477 &mut self,
478 request: TurnRequest,
479 cancellation: Option<TurnCancellation>,
480 ) -> Result<Self::Turn, LoopError>;
481}
482
483#[async_trait]
489pub trait ModelTurn: Send {
490 async fn next_event(
499 &mut self,
500 cancellation: Option<TurnCancellation>,
501 ) -> Result<Option<ModelTurnEvent>, LoopError>;
502}
503
504pub trait LoopObserver: Send {
524 fn handle_event(&mut self, event: AgentEvent);
526}
527
528pub trait TranscriptObserver: Send {
562 fn on_item_appended(&mut self, item: &Item);
565}
566
567#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
572pub enum AgentEvent {
573 RunStarted { session_id: SessionId },
575 TurnStarted {
577 session_id: SessionId,
578 turn_id: agentkit_core::TurnId,
579 },
580 InputAccepted {
582 session_id: SessionId,
583 items: Vec<Item>,
584 },
585 ContentDelta(Delta),
587 ToolCallRequested(ToolCallPart),
589 ToolResultReceived(ToolResultPart),
601 ApprovalRequired(ApprovalRequest),
603 ApprovalResolved { approved: bool },
605 ToolCatalogChanged(ToolCatalogEvent),
607 CompactionStarted {
609 session_id: SessionId,
610 turn_id: Option<agentkit_core::TurnId>,
611 reason: CompactionReason,
612 },
613 CompactionFinished {
615 session_id: SessionId,
616 turn_id: Option<agentkit_core::TurnId>,
617 replaced_items: usize,
618 transcript_len: usize,
619 metadata: MetadataMap,
620 },
621 UsageUpdated(Usage),
623 Warning { message: String },
625 RunFailed { message: String },
627 TurnFinished(TurnResult),
629}
630
631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
653pub struct PendingApproval {
654 pub request: ApprovalRequest,
656}
657
658impl std::ops::Deref for PendingApproval {
659 type Target = ApprovalRequest;
660 fn deref(&self) -> &ApprovalRequest {
661 &self.request
662 }
663}
664
665impl PendingApproval {
666 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
668 let call_id = self
669 .request
670 .call_id
671 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
672 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
673 }
674
675 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
677 let call_id = self
678 .request
679 .call_id
680 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
681 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
682 }
683
684 pub fn deny_with_reason<S: ModelSession>(
686 self,
687 driver: &mut LoopDriver<S>,
688 reason: impl Into<String>,
689 ) -> Result<(), LoopError> {
690 let call_id = self
691 .request
692 .call_id
693 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
694 driver.resolve_approval_for(
695 call_id,
696 ApprovalDecision::Deny {
697 reason: Some(reason.into()),
698 },
699 )
700 }
701
702 pub fn approve_with_patched_input<S: ModelSession>(
712 self,
713 driver: &mut LoopDriver<S>,
714 input: serde_json::Value,
715 ) -> Result<(), LoopError> {
716 let call_id = self
717 .request
718 .call_id
719 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
720 driver.resolve_approval_for_with_patched_input(call_id, input)
721 }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
749pub struct InputRequest {
750 pub session_id: SessionId,
752 pub reason: String,
754}
755
756impl InputRequest {
757 pub fn submit<S: ModelSession>(
759 self,
760 driver: &mut LoopDriver<S>,
761 items: Vec<Item>,
762 ) -> Result<(), LoopError> {
763 driver.submit_input(items)
764 }
765}
766
767#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
772pub struct TurnResult {
773 pub turn_id: agentkit_core::TurnId,
775 pub finish_reason: FinishReason,
777 pub items: Vec<Item>,
779 pub usage: Option<Usage>,
781 pub metadata: MetadataMap,
783}
784
785#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
821pub enum LoopInterrupt {
822 ApprovalRequest(PendingApproval),
824 AwaitingInput(InputRequest),
826 AfterToolResult(ToolRoundInfo),
836}
837
838impl LoopInterrupt {
839 pub fn is_blocking(&self) -> bool {
845 matches!(self, LoopInterrupt::ApprovalRequest(_))
846 }
847}
848
849#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
852pub struct ToolRoundInfo {
853 pub session_id: SessionId,
855 pub turn_id: agentkit_core::TurnId,
857 pub transcript_len: usize,
859}
860
861impl ToolRoundInfo {
862 pub fn submit<S: ModelSession>(
865 self,
866 driver: &mut LoopDriver<S>,
867 items: Vec<Item>,
868 ) -> Result<(), LoopError> {
869 driver.submit_input(items)
870 }
871}
872
873#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
901pub enum LoopStep {
902 Interrupt(LoopInterrupt),
904 Finished(TurnResult),
906}
907
908#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
914pub struct LoopSnapshot {
915 pub session_id: SessionId,
917 pub transcript: Vec<Item>,
919 pub pending_input: Vec<Item>,
921}
922
923#[derive(Clone, Debug)]
924struct PendingApprovalToolCall {
925 request: ApprovalRequest,
926 decision: Option<ApprovalDecision>,
927 surfaced: bool,
928 turn_id: agentkit_core::TurnId,
929 task_id: TaskId,
930 call: ToolCallPart,
931 tool_request: ToolRequest,
932}
933
934#[derive(Clone, Debug, Default)]
935struct ActiveToolRound {
936 turn_id: agentkit_core::TurnId,
937 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
938 background_pending: bool,
939 foreground_progressed: bool,
940}
941
942pub struct Agent<M>
984where
985 M: ModelAdapter,
986{
987 model: M,
988 tool_sources: Vec<Arc<dyn ToolSource>>,
989 tool_executor: Option<Arc<dyn ToolExecutor>>,
990 task_manager: Arc<dyn TaskManager>,
991 permissions: Arc<dyn PermissionChecker>,
992 resources: Arc<dyn ToolResources>,
993 cancellation: Option<CancellationHandle>,
994 compaction: Option<CompactionConfig>,
995 observers: Vec<Box<dyn LoopObserver>>,
996 transcript_observers: Vec<Box<dyn TranscriptObserver>>,
997 transcript: Vec<Item>,
998 input: Vec<Item>,
999}
1000
1001impl<M> Agent<M>
1002where
1003 M: ModelAdapter,
1004{
1005 pub fn builder() -> AgentBuilder<M> {
1007 AgentBuilder::default()
1008 }
1009
1010 pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1022 let session_id = config.session_id.clone();
1023 let default_cache = config.cache.clone();
1024 let session = self.model.start_session(config).await?;
1025 let tool_executor = self
1026 .tool_executor
1027 .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1028 let mut driver = LoopDriver {
1029 session_id: session_id.clone(),
1030 default_cache,
1031 next_turn_cache: None,
1032 session: Some(session),
1033 tool_executor,
1034 task_manager: self.task_manager,
1035 permissions: self.permissions,
1036 resources: self.resources,
1037 cancellation: self.cancellation,
1038 compaction: self.compaction,
1039 observers: self.observers,
1040 transcript_observers: self.transcript_observers,
1041 transcript: self.transcript,
1042 pending_input: self.input,
1043 pending_approvals: BTreeMap::new(),
1044 pending_approval_order: VecDeque::new(),
1045 active_tool_round: None,
1046 pending_round_resume: None,
1047 next_turn_index: 1,
1048 detached_call_ids: HashSet::new(),
1049 };
1050 driver.emit(AgentEvent::RunStarted { session_id });
1051 Ok(driver)
1052 }
1053}
1054
1055pub struct AgentBuilder<M>
1061where
1062 M: ModelAdapter,
1063{
1064 model: Option<M>,
1065 tool_sources: Vec<Arc<dyn ToolSource>>,
1066 tool_executor: Option<Arc<dyn ToolExecutor>>,
1067 task_manager: Option<Arc<dyn TaskManager>>,
1068 permissions: Arc<dyn PermissionChecker>,
1069 resources: Arc<dyn ToolResources>,
1070 cancellation: Option<CancellationHandle>,
1071 compaction: Option<CompactionConfig>,
1072 observers: Vec<Box<dyn LoopObserver>>,
1073 transcript_observers: Vec<Box<dyn TranscriptObserver>>,
1074 transcript: Vec<Item>,
1075 input: Vec<Item>,
1076}
1077
1078impl<M> Default for AgentBuilder<M>
1079where
1080 M: ModelAdapter,
1081{
1082 fn default() -> Self {
1083 Self {
1084 model: None,
1085 tool_sources: Vec::new(),
1086 tool_executor: None,
1087 task_manager: None,
1088 permissions: Arc::new(AllowAllPermissions),
1089 resources: Arc::new(()),
1090 cancellation: None,
1091 compaction: None,
1092 observers: Vec::new(),
1093 transcript_observers: Vec::new(),
1094 transcript: Vec::new(),
1095 input: Vec::new(),
1096 }
1097 }
1098}
1099
1100impl<M> AgentBuilder<M>
1101where
1102 M: ModelAdapter,
1103{
1104 pub fn model(mut self, model: M) -> Self {
1106 self.model = Some(model);
1107 self
1108 }
1109
1110 pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1123 self.tool_sources.push(Arc::new(source));
1124 self
1125 }
1126
1127 pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1133 self.tool_executor = Some(Arc::new(executor));
1134 self
1135 }
1136
1137 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1142 self.task_manager = Some(Arc::new(manager));
1143 self
1144 }
1145
1146 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1150 self.permissions = Arc::new(permissions);
1151 self
1152 }
1153
1154 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1156 self.resources = Arc::new(resources);
1157 self
1158 }
1159
1160 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1162 self.cancellation = Some(handle);
1163 self
1164 }
1165
1166 pub fn compaction(mut self, config: CompactionConfig) -> Self {
1171 self.compaction = Some(config);
1172 self
1173 }
1174
1175 pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1179 self.observers.push(Box::new(observer));
1180 self
1181 }
1182
1183 pub fn transcript_observer(mut self, observer: impl TranscriptObserver + 'static) -> Self {
1192 self.transcript_observers.push(Box::new(observer));
1193 self
1194 }
1195
1196 pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1204 self.transcript = transcript;
1205 self
1206 }
1207
1208 pub fn input(mut self, input: Vec<Item>) -> Self {
1217 self.input = input;
1218 self
1219 }
1220
1221 pub fn build(self) -> Result<Agent<M>, LoopError> {
1227 let model = self
1228 .model
1229 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1230 Ok(Agent {
1231 model,
1232 tool_sources: self.tool_sources,
1233 tool_executor: self.tool_executor,
1234 task_manager: self
1235 .task_manager
1236 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1237 permissions: self.permissions,
1238 resources: self.resources,
1239 cancellation: self.cancellation,
1240 compaction: self.compaction,
1241 observers: self.observers,
1242 transcript_observers: self.transcript_observers,
1243 transcript: self.transcript,
1244 input: self.input,
1245 })
1246 }
1247}
1248
1249pub struct LoopDriver<S>
1280where
1281 S: ModelSession,
1282{
1283 session_id: SessionId,
1284 default_cache: Option<PromptCacheRequest>,
1285 next_turn_cache: Option<PromptCacheRequest>,
1286 session: Option<S>,
1287 tool_executor: Arc<dyn ToolExecutor>,
1288 task_manager: Arc<dyn TaskManager>,
1289 permissions: Arc<dyn PermissionChecker>,
1290 resources: Arc<dyn ToolResources>,
1291 cancellation: Option<CancellationHandle>,
1292 compaction: Option<CompactionConfig>,
1293 observers: Vec<Box<dyn LoopObserver>>,
1294 transcript_observers: Vec<Box<dyn TranscriptObserver>>,
1295 transcript: Vec<Item>,
1296 pending_input: Vec<Item>,
1297 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1298 pending_approval_order: VecDeque<ToolCallId>,
1299 active_tool_round: Option<ActiveToolRound>,
1300 pending_round_resume: Option<agentkit_core::TurnId>,
1301 next_turn_index: u64,
1302 detached_call_ids: HashSet<ToolCallId>,
1310}
1311
1312impl<S> LoopDriver<S>
1313where
1314 S: ModelSession,
1315{
1316 fn execute_tool_span(
1317 &self,
1318 request: &ToolRequest,
1319 turn_id: &agentkit_core::TurnId,
1320 launch_kind: &'static str,
1321 ) -> tracing::Span {
1322 tracing::info_span!(
1323 "agent.execute_tool",
1324 "gen_ai.tool.name" = %request.tool_name,
1325 "gen_ai.tool.call.id" = %request.call_id,
1326 session.id = %self.session_id,
1327 turn.id = %turn_id,
1328 launch_kind = launch_kind,
1329 )
1330 }
1331
1332 fn start_task_via_manager(
1333 &self,
1334 task_id: Option<TaskId>,
1335 tool_request: ToolRequest,
1336 kind: TaskLaunchKind,
1337 cancellation: Option<TurnCancellation>,
1338 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1339 {
1340 let task_manager = self.task_manager.clone();
1341 let tool_executor = self.tool_executor.clone();
1342 let permissions = self.permissions.clone();
1343 let resources = self.resources.clone();
1344 let session_id = self.session_id.clone();
1345 let turn_id = tool_request.turn_id.clone();
1346 let metadata = tool_request.metadata.clone();
1347
1348 async move {
1349 task_manager
1350 .start_task(
1351 TaskLaunchRequest {
1352 task_id,
1353 request: tool_request.clone(),
1354 kind,
1355 },
1356 TaskStartContext {
1357 executor: tool_executor,
1358 tool_context: OwnedToolContext {
1359 session_id,
1360 turn_id,
1361 metadata,
1362 permissions,
1363 resources,
1364 cancellation,
1365 },
1366 },
1367 )
1368 .await
1369 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1370 }
1371 }
1372
1373 fn has_pending_interrupts(&self) -> bool {
1374 !self.pending_approvals.is_empty()
1375 }
1376
1377 fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1378 for event in events {
1379 self.emit(AgentEvent::ToolCatalogChanged(event));
1380 }
1381 }
1382
1383 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1384 let call_id = task.tool_request.call_id.clone();
1385 let call = ToolCallPart {
1386 id: call_id.clone(),
1387 name: task.tool_request.tool_name.to_string(),
1388 input: task.tool_request.input.clone(),
1389 metadata: task.tool_request.metadata.clone(),
1390 };
1391 let mut request = task.approval;
1392 request.call_id = Some(call_id.clone());
1393 let pending = PendingApprovalToolCall {
1394 request: request.clone(),
1395 decision: None,
1396 surfaced: false,
1397 turn_id: turn_id.clone(),
1398 task_id: task.task_id,
1399 call,
1400 tool_request: task.tool_request,
1401 };
1402 self.pending_approvals.insert(call_id.clone(), pending);
1403 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1404 self.pending_approval_order.push_back(call_id);
1405 }
1406 self.emit(AgentEvent::ApprovalRequired(request));
1407 }
1408
1409 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1410 for call_id in self.pending_approval_order.clone() {
1411 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1412 continue;
1413 };
1414 if pending.decision.is_none() && !pending.surfaced {
1415 pending.surfaced = true;
1416 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1417 PendingApproval {
1418 request: pending.request.clone(),
1419 },
1420 )));
1421 }
1422 }
1423 None
1424 }
1425
1426 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1427 self.pending_approval_order.iter().find_map(|call_id| {
1428 self.pending_approvals.get(call_id).and_then(|pending| {
1429 pending.decision.is_none().then(|| {
1430 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1431 request: pending.request.clone(),
1432 }))
1433 })
1434 })
1435 })
1436 }
1437
1438 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1439 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1440 self.pending_approvals
1441 .get(call_id)
1442 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1443 })?;
1444 self.pending_approval_order.retain(|id| id != &call_id);
1445 self.pending_approvals.remove(&call_id)
1446 }
1447
1448 fn queue_resolution_interrupt(
1449 &mut self,
1450 turn_id: &agentkit_core::TurnId,
1451 resolution: TaskResolution,
1452 ) -> Option<LoopStep> {
1453 match resolution {
1454 TaskResolution::Item(item) => {
1455 self.append_tool_result_item(item);
1456 None
1457 }
1458 TaskResolution::Approval(task) => {
1459 self.enqueue_pending_approval(turn_id, task);
1460 self.take_next_unsurfaced_approval_interrupt()
1461 }
1462 }
1463 }
1464
1465 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1466 let PendingLoopUpdates { mut resolutions } = self
1467 .task_manager
1468 .take_pending_loop_updates()
1469 .await
1470 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1471 let mut saw_items = false;
1472 while let Some(resolution) = resolutions.pop_front() {
1473 match resolution {
1474 TaskResolution::Item(item) => {
1475 self.append_tool_result_item(item);
1476 saw_items = true;
1477 }
1478 TaskResolution::Approval(task) => {
1479 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1480 }
1481 }
1482 }
1483 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1484 }
1485
1486 async fn maybe_compact(
1487 &mut self,
1488 turn_id: Option<&agentkit_core::TurnId>,
1489 cancellation: Option<TurnCancellation>,
1490 ) -> Result<(), LoopError> {
1491 let Some(compaction) = self.compaction.as_ref().cloned() else {
1492 return Ok(());
1493 };
1494 if cancellation
1495 .as_ref()
1496 .is_some_and(TurnCancellation::is_cancelled)
1497 {
1498 return Err(LoopError::Cancelled);
1499 }
1500 let Some(reason) =
1501 compaction
1502 .trigger
1503 .should_compact(&self.session_id, turn_id, &self.transcript)
1504 else {
1505 return Ok(());
1506 };
1507
1508 self.emit(AgentEvent::CompactionStarted {
1509 session_id: self.session_id.clone(),
1510 turn_id: turn_id.cloned(),
1511 reason: reason.clone(),
1512 });
1513
1514 let CompactionResult {
1515 transcript,
1516 replaced_items,
1517 metadata,
1518 } = compaction
1519 .strategy
1520 .apply(
1521 agentkit_compaction::CompactionRequest {
1522 session_id: self.session_id.clone(),
1523 turn_id: turn_id.cloned(),
1524 transcript: self.transcript.clone(),
1525 reason,
1526 metadata: compaction.metadata.clone(),
1527 },
1528 &mut CompactionContext {
1529 backend: compaction.backend.as_deref(),
1530 metadata: &compaction.metadata,
1531 cancellation,
1532 },
1533 )
1534 .await
1535 .map_err(|error| match error {
1536 agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1537 other => LoopError::Compaction(other.to_string()),
1538 })?;
1539
1540 self.transcript = transcript;
1541 self.emit(AgentEvent::CompactionFinished {
1542 session_id: self.session_id.clone(),
1543 turn_id: turn_id.cloned(),
1544 replaced_items,
1545 transcript_len: self.transcript.len(),
1546 metadata,
1547 });
1548 Ok(())
1549 }
1550
1551 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1552 let Some(_) = self.active_tool_round.as_ref() else {
1553 return Ok(None);
1554 };
1555 loop {
1556 let cancellation = self
1557 .cancellation
1558 .as_ref()
1559 .map(CancellationHandle::checkpoint);
1560 let turn_id = self
1561 .active_tool_round
1562 .as_ref()
1563 .map(|active| active.turn_id.clone())
1564 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1565
1566 if cancellation
1567 .as_ref()
1568 .is_some_and(TurnCancellation::is_cancelled)
1569 {
1570 self.task_manager
1571 .on_turn_interrupted(&turn_id)
1572 .await
1573 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1574 self.active_tool_round = None;
1575 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1576 }
1577
1578 let next_call = self
1579 .active_tool_round
1580 .as_mut()
1581 .and_then(|active| active.pending_calls.pop_front());
1582 if let Some((_call, tool_request)) = next_call {
1583 use tracing::Instrument;
1584 let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1585 match self
1586 .start_task_via_manager(
1587 None,
1588 tool_request.clone(),
1589 TaskLaunchKind::Plain,
1590 cancellation.clone(),
1591 )
1592 .instrument(dispatch_span)
1593 .await?
1594 {
1595 TaskStartOutcome::Ready(resolution) => {
1596 let resolution = *resolution;
1597 match resolution {
1598 TaskResolution::Item(item) => {
1599 if let Some(active) = self.active_tool_round.as_mut() {
1600 active.foreground_progressed = true;
1601 }
1602 self.append_tool_result_item(item);
1603 }
1604 TaskResolution::Approval(task) => {
1605 self.enqueue_pending_approval(&turn_id, task);
1606 }
1607 }
1608 continue;
1609 }
1610 TaskStartOutcome::Pending { kind, .. } => {
1611 if kind == agentkit_task_manager::TaskKind::Background
1612 && let Some(active) = self.active_tool_round.as_mut()
1613 {
1614 active.background_pending = true;
1615 }
1616 continue;
1617 }
1618 }
1619 }
1620
1621 match self
1622 .task_manager
1623 .wait_for_turn(&turn_id, cancellation.clone())
1624 .await
1625 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1626 {
1627 Some(TurnTaskUpdate::Resolution(resolution)) => {
1628 let resolution = *resolution;
1629 match resolution {
1630 TaskResolution::Item(item) => {
1631 if let Some(active) = self.active_tool_round.as_mut() {
1632 active.foreground_progressed = true;
1633 }
1634 self.append_tool_result_item(item);
1635 }
1636 TaskResolution::Approval(task) => {
1637 self.enqueue_pending_approval(&turn_id, task);
1638 }
1639 }
1640 }
1641 Some(TurnTaskUpdate::Detached(snapshot)) => {
1642 let detached_call_id = snapshot.call_id.clone();
1660 self.append_tool_result_item(Item {
1661 id: None,
1662 kind: ItemKind::Tool,
1663 parts: vec![Part::ToolResult(ToolResultPart {
1664 call_id: detached_call_id.clone(),
1665 output: ToolOutput::Text(format!(
1666 "Tool {} is now running in the background. \
1667 The result will be delivered when it completes.",
1668 snapshot.tool_name,
1669 )),
1670 is_error: false,
1671 metadata: MetadataMap::new(),
1672 })],
1673 metadata: MetadataMap::new(),
1674 });
1675 self.detached_call_ids.insert(detached_call_id);
1676 if let Some(active) = self.active_tool_round.as_mut() {
1677 active.background_pending = true;
1678 active.foreground_progressed = true;
1679 }
1680 }
1681 None => {
1682 if cancellation
1683 .as_ref()
1684 .is_some_and(TurnCancellation::is_cancelled)
1685 {
1686 self.task_manager
1687 .on_turn_interrupted(&turn_id)
1688 .await
1689 .map_err(|error| {
1690 LoopError::Tool(ToolError::Internal(error.to_string()))
1691 })?;
1692 self.active_tool_round = None;
1693 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1694 }
1695 let active = self.active_tool_round.take().ok_or_else(|| {
1696 LoopError::InvalidState("missing active tool round".into())
1697 })?;
1698 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1699 return Ok(Some(step));
1700 }
1701 if let Some(step) = self.next_unresolved_approval_interrupt() {
1702 return Ok(Some(step));
1703 }
1704 if active.background_pending && !active.foreground_progressed {
1705 return Ok(None);
1706 }
1707 let info = ToolRoundInfo {
1714 session_id: self.session_id.clone(),
1715 turn_id: turn_id.clone(),
1716 transcript_len: self.transcript.len(),
1717 };
1718 self.pending_round_resume = Some(turn_id);
1719 return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1720 info,
1721 ))));
1722 }
1723 }
1724 }
1725 }
1726
1727 #[tracing::instrument(
1728 name = "agent.turn",
1729 skip_all,
1730 fields(
1731 session.id = %self.session_id,
1732 turn.id = %turn_id,
1733 transcript.len = self.transcript.len(),
1734 saw_tool_call = tracing::field::Empty,
1735 finish_reason = tracing::field::Empty,
1736 ),
1737 )]
1738 async fn drive_turn(
1739 &mut self,
1740 turn_id: agentkit_core::TurnId,
1741 emit_started: bool,
1742 ) -> Result<LoopStep, LoopError> {
1743 let cancellation = self
1744 .cancellation
1745 .as_ref()
1746 .map(CancellationHandle::checkpoint);
1747 match self
1748 .maybe_compact(Some(&turn_id), cancellation.clone())
1749 .await
1750 {
1751 Ok(()) => {}
1752 Err(LoopError::Cancelled) => {
1753 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1754 }
1755 Err(error) => return Err(error),
1756 }
1757 if emit_started {
1758 self.emit(AgentEvent::TurnStarted {
1759 session_id: self.session_id.clone(),
1760 turn_id: turn_id.clone(),
1761 });
1762 }
1763 if cancellation
1764 .as_ref()
1765 .is_some_and(TurnCancellation::is_cancelled)
1766 {
1767 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1768 }
1769
1770 let catalog_events = self.tool_executor.drain_catalog_events();
1771 self.emit_tool_catalog_events(catalog_events);
1772
1773 let request = TurnRequest {
1774 session_id: self.session_id.clone(),
1775 turn_id: turn_id.clone(),
1776 transcript: self.transcript.clone(),
1777 available_tools: self.tool_executor.specs(),
1778 cache: self
1779 .next_turn_cache
1780 .take()
1781 .or_else(|| self.default_cache.clone()),
1782 metadata: MetadataMap::new(),
1783 };
1784
1785 let session = self
1786 .session
1787 .as_mut()
1788 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1789 let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1790 Ok(turn) => turn,
1791 Err(LoopError::Cancelled) => {
1792 self.task_manager
1793 .on_turn_interrupted(&turn_id)
1794 .await
1795 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1796 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1797 }
1798 Err(error) => return Err(error),
1799 };
1800 let mut saw_tool_call = false;
1801 let mut finished_result = None;
1802
1803 while let Some(event) = match turn.next_event(cancellation.clone()).await {
1804 Ok(event) => event,
1805 Err(LoopError::Cancelled) => {
1806 self.task_manager
1807 .on_turn_interrupted(&turn_id)
1808 .await
1809 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1810 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1811 }
1812 Err(error) => return Err(error),
1813 } {
1814 if cancellation
1815 .as_ref()
1816 .is_some_and(TurnCancellation::is_cancelled)
1817 {
1818 self.task_manager
1819 .on_turn_interrupted(&turn_id)
1820 .await
1821 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1822 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1823 }
1824 match event {
1825 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1826 ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1827 ModelTurnEvent::ToolCall(call) => {
1828 saw_tool_call = true;
1829 self.emit(AgentEvent::ToolCallRequested(call.clone()));
1830 }
1831 ModelTurnEvent::Finished(result) => {
1832 finished_result = Some(result);
1833 break;
1834 }
1835 }
1836 }
1837
1838 let result = finished_result.ok_or_else(|| {
1839 LoopError::Provider("model turn ended without a Finished event".into())
1840 })?;
1841 tracing::Span::current().record("saw_tool_call", saw_tool_call);
1842 tracing::Span::current().record(
1843 "finish_reason",
1844 tracing::field::debug(&result.finish_reason),
1845 );
1846 self.extend_transcript(result.output_items.clone());
1847
1848 if saw_tool_call {
1849 let pending_calls = extract_tool_calls(&result.output_items)
1850 .into_iter()
1851 .map(|call| {
1852 let tool_request = ToolRequest {
1853 call_id: call.id.clone(),
1854 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1855 input: call.input.clone(),
1856 session_id: self.session_id.clone(),
1857 turn_id: turn_id.clone(),
1858 metadata: call.metadata.clone(),
1859 };
1860 (call, tool_request)
1861 })
1862 .collect();
1863 self.active_tool_round = Some(ActiveToolRound {
1864 turn_id: turn_id.clone(),
1865 pending_calls,
1866 background_pending: false,
1867 foreground_progressed: false,
1868 });
1869 if let Some(step) = self.continue_active_tool_round().await? {
1870 return Ok(step);
1871 }
1872 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1873 InputRequest {
1874 session_id: self.session_id.clone(),
1875 reason: "driver is waiting for input".into(),
1876 },
1877 )));
1878 }
1879
1880 let turn_result = TurnResult {
1881 turn_id,
1882 finish_reason: result.finish_reason,
1883 items: result.output_items,
1884 usage: result.usage,
1885 metadata: result.metadata,
1886 };
1887 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1888 Ok(LoopStep::Finished(turn_result))
1889 }
1890
1891 async fn resume_after_approval(
1892 &mut self,
1893 pending: PendingApprovalToolCall,
1894 ) -> Result<LoopStep, LoopError> {
1895 let decision = pending
1896 .decision
1897 .clone()
1898 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1899
1900 match decision {
1901 ApprovalDecision::Approve => {
1902 use tracing::Instrument;
1903 let dispatch_span =
1904 self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
1905 match self
1906 .start_task_via_manager(
1907 Some(pending.task_id.clone()),
1908 pending.tool_request.clone(),
1909 TaskLaunchKind::Approved(pending.request.clone()),
1910 self.cancellation
1911 .as_ref()
1912 .map(CancellationHandle::checkpoint),
1913 )
1914 .instrument(dispatch_span)
1915 .await?
1916 {
1917 TaskStartOutcome::Ready(resolution) => {
1918 let resolution = *resolution;
1919 if let Some(step) =
1920 self.queue_resolution_interrupt(&pending.turn_id, resolution)
1921 {
1922 return Ok(step);
1923 }
1924 }
1925 TaskStartOutcome::Pending { .. } => {}
1926 }
1927 }
1928 ApprovalDecision::Deny { reason } => {
1929 self.append_tool_result_item(Item {
1930 id: None,
1931 kind: ItemKind::Tool,
1932 parts: vec![Part::ToolResult(ToolResultPart {
1933 call_id: pending.call.id.clone(),
1934 output: ToolOutput::Text(
1935 reason.unwrap_or_else(|| "approval denied".into()),
1936 ),
1937 is_error: true,
1938 metadata: pending.call.metadata.clone(),
1939 })],
1940 metadata: MetadataMap::new(),
1941 });
1942 }
1943 }
1944
1945 if let Some(step) = self.continue_active_tool_round().await? {
1946 Ok(step)
1947 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1948 Ok(step)
1949 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1950 Ok(step)
1951 } else {
1952 self.drive_turn(pending.turn_id, false).await
1953 }
1954 }
1955
1956 fn finish_cancelled(
1957 &mut self,
1958 turn_id: agentkit_core::TurnId,
1959 items: Vec<Item>,
1960 ) -> Result<LoopStep, LoopError> {
1961 self.extend_transcript(items.clone());
1962 let turn_result = TurnResult {
1963 turn_id,
1964 finish_reason: FinishReason::Cancelled,
1965 items,
1966 usage: None,
1967 metadata: interrupted_metadata("turn"),
1968 };
1969 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1970 Ok(LoopStep::Finished(turn_result))
1971 }
1972
1973 pub(crate) fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1983 if self.has_pending_interrupts() {
1984 return Err(LoopError::InvalidState(
1985 "cannot submit input while an interrupt is pending".into(),
1986 ));
1987 }
1988 self.emit(AgentEvent::InputAccepted {
1989 session_id: self.session_id.clone(),
1990 items: input.clone(),
1991 });
1992 self.pending_input.extend(input);
1993 Ok(())
1994 }
1995
1996 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2001 if self.has_pending_interrupts() {
2002 return Err(LoopError::InvalidState(
2003 "cannot update next-turn cache while an interrupt is pending".into(),
2004 ));
2005 }
2006 self.next_turn_cache = Some(cache);
2007 Ok(())
2008 }
2009
2010 #[cfg(test)]
2011 pub(crate) fn submit_input_with_cache(
2012 &mut self,
2013 input: Vec<Item>,
2014 cache: PromptCacheRequest,
2015 ) -> Result<(), LoopError> {
2016 self.set_next_turn_cache(cache)?;
2017 self.submit_input(input)
2018 }
2019
2020 pub fn resolve_approval_for(
2030 &mut self,
2031 call_id: ToolCallId,
2032 decision: ApprovalDecision,
2033 ) -> Result<(), LoopError> {
2034 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2035 return Err(LoopError::InvalidState(format!(
2036 "no approval request is pending for call {}",
2037 call_id.0
2038 )));
2039 };
2040 pending.decision = Some(decision.clone());
2041 self.emit(AgentEvent::ApprovalResolved {
2042 approved: matches!(decision, ApprovalDecision::Approve),
2043 });
2044 Ok(())
2045 }
2046
2047 pub fn resolve_approval_for_with_patched_input(
2060 &mut self,
2061 call_id: ToolCallId,
2062 input: serde_json::Value,
2063 ) -> Result<(), LoopError> {
2064 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2065 return Err(LoopError::InvalidState(format!(
2066 "no approval request is pending for call {}",
2067 call_id.0
2068 )));
2069 };
2070 pending.tool_request.input = input;
2071 self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2072 }
2073
2074 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2077 let mut unresolved = self
2078 .pending_approval_order
2079 .iter()
2080 .filter(|call_id| {
2081 self.pending_approvals
2082 .get(*call_id)
2083 .is_some_and(|pending| pending.decision.is_none())
2084 })
2085 .cloned();
2086 let Some(call_id) = unresolved.next() else {
2087 return Err(LoopError::InvalidState(
2088 "no approval request is pending".into(),
2089 ));
2090 };
2091 if unresolved.next().is_some() {
2092 return Err(LoopError::InvalidState(
2093 "multiple approvals are pending; use resolve_approval_for".into(),
2094 ));
2095 }
2096 self.resolve_approval_for(call_id, decision)
2097 }
2098
2099 pub fn snapshot(&self) -> LoopSnapshot {
2101 LoopSnapshot {
2102 session_id: self.session_id.clone(),
2103 transcript: self.transcript.clone(),
2104 pending_input: self.pending_input.clone(),
2105 }
2106 }
2107
2108 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2129 if let Some(pending) = self.take_next_resolved_approval() {
2130 return self.resume_after_approval(pending).await;
2131 }
2132
2133 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2134 return Ok(step);
2135 }
2136
2137 if let Some(step) = self.next_unresolved_approval_interrupt() {
2138 return Ok(step);
2139 }
2140
2141 if let Some(step) = self.continue_active_tool_round().await? {
2142 return Ok(step);
2143 }
2144
2145 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2146 if let Some(step) = loop_step {
2147 return Ok(step);
2148 }
2149
2150 if let Some(turn_id) = self.pending_round_resume.take() {
2155 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2156 self.extend_transcript(drained);
2157 return self.drive_turn(turn_id, false).await;
2158 }
2159
2160 if self.pending_input.is_empty() && !had_loop_updates {
2161 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2162 InputRequest {
2163 session_id: self.session_id.clone(),
2164 reason: "driver is waiting for input".into(),
2165 },
2166 )));
2167 }
2168
2169 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2170 self.next_turn_index += 1;
2171 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2172 self.extend_transcript(drained);
2173 self.drive_turn(turn_id, true).await
2174 }
2175
2176 fn emit(&mut self, event: AgentEvent) {
2177 for observer in &mut self.observers {
2178 observer.handle_event(event.clone());
2179 }
2180 }
2181
2182 fn append_item(&mut self, item: Item) {
2187 for observer in &mut self.transcript_observers {
2188 observer.on_item_appended(&item);
2189 }
2190 self.transcript.push(item);
2191 }
2192
2193 fn append_tool_result_item(&mut self, item: Item) {
2206 for part in &item.parts {
2207 if let Part::ToolResult(result) = part {
2208 self.emit(AgentEvent::ToolResultReceived(result.clone()));
2209 }
2210 }
2211 let item = self.maybe_convert_detached(item);
2212 self.append_item(item);
2213 }
2214
2215 fn maybe_convert_detached(&mut self, item: Item) -> Item {
2216 if !matches!(item.kind, ItemKind::Tool) {
2217 return item;
2218 }
2219 let results: Vec<&ToolResultPart> = item
2220 .parts
2221 .iter()
2222 .filter_map(|p| match p {
2223 Part::ToolResult(r) => Some(r),
2224 _ => None,
2225 })
2226 .collect();
2227 if results.is_empty()
2228 || !results
2229 .iter()
2230 .all(|r| self.detached_call_ids.contains(&r.call_id))
2231 {
2232 return item;
2233 }
2234 let mut text = String::new();
2235 for result in &results {
2236 self.detached_call_ids.remove(&result.call_id);
2237 if !text.is_empty() {
2238 text.push_str("\n\n");
2239 }
2240 let label = if result.is_error {
2241 "failed"
2242 } else {
2243 "completed"
2244 };
2245 let body = render_tool_output_brief(&result.output);
2246 text.push_str(&format!(
2247 "Background tool call {} {}: {body}",
2248 result.call_id.0, label
2249 ));
2250 }
2251 Item::notification(text)
2252 }
2253
2254 fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2256 for item in items {
2257 self.append_item(item);
2258 }
2259 }
2260}
2261
2262fn render_tool_output_brief(output: &ToolOutput) -> String {
2263 match output {
2264 ToolOutput::Text(t) => t.clone(),
2265 ToolOutput::Structured(value) => value.to_string(),
2266 ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2267 ToolOutput::Files(files) => format!("[{} files]", files.len()),
2268 }
2269}
2270
2271fn interrupted_metadata(stage: &str) -> MetadataMap {
2272 let mut metadata = MetadataMap::new();
2273 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2274 metadata.insert(
2275 INTERRUPT_REASON_METADATA_KEY.into(),
2276 USER_CANCELLED_REASON.into(),
2277 );
2278 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2279 metadata
2280}
2281
2282fn interrupted_assistant_items() -> Vec<Item> {
2283 vec![Item {
2284 id: None,
2285 kind: ItemKind::Assistant,
2286 parts: vec![Part::Text(TextPart {
2287 text: "Previous assistant response was interrupted by the user before completion."
2288 .into(),
2289 metadata: interrupted_metadata("assistant"),
2290 })],
2291 metadata: interrupted_metadata("assistant"),
2292 }]
2293}
2294
2295fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2296 let mut calls = Vec::new();
2297 for item in items {
2298 for part in &item.parts {
2299 if let Part::ToolCall(call) = part {
2300 calls.push(call.clone());
2301 }
2302 }
2303 }
2304 calls
2305}
2306
2307#[derive(Debug, Error)]
2309pub enum LoopError {
2310 #[error("invalid driver state: {0}")]
2312 InvalidState(String),
2313 #[error("turn cancelled")]
2315 Cancelled,
2316 #[error("provider error: {0}")]
2318 Provider(String),
2319 #[error("tool error: {0}")]
2321 Tool(#[from] ToolError),
2322 #[error("compaction error: {0}")]
2324 Compaction(String),
2325 #[error("unsupported operation: {0}")]
2327 Unsupported(String),
2328}
2329
2330#[cfg(test)]
2331mod tests {
2332 use std::collections::VecDeque;
2333 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2334 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2335
2336 use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2337 use agentkit_core::{
2338 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2339 };
2340 use agentkit_task_manager::{
2341 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2342 TaskRoutingPolicy,
2343 };
2344 use agentkit_tools_core::{
2345 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2346 ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2347 ToolResult, ToolSpec,
2348 };
2349 use serde_json::{Value, json};
2350 use tokio::sync::Notify;
2351 use tokio::time::{Duration, timeout};
2352
2353 use super::*;
2354
2355 struct FakeAdapter;
2356 struct SlowAdapter;
2357 struct RecordingAdapter {
2358 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2359 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2360 }
2361 struct MultiToolAdapter;
2362 struct DualApprovalAdapter;
2363
2364 struct FakeSession;
2365 struct SlowSession;
2366 struct RecordingSession {
2367 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2368 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2369 }
2370 struct MultiToolSession;
2371 struct DualApprovalSession;
2372
2373 struct FakeTurn {
2374 events: VecDeque<ModelTurnEvent>,
2375 }
2376
2377 struct SlowTurn {
2378 emitted: bool,
2379 }
2380
2381 struct RecordingTurn {
2382 emitted: bool,
2383 }
2384 struct MultiToolTurn {
2385 events: VecDeque<ModelTurnEvent>,
2386 }
2387 struct DualApprovalTurn {
2388 events: VecDeque<ModelTurnEvent>,
2389 }
2390
2391 #[async_trait]
2392 impl ModelAdapter for FakeAdapter {
2393 type Session = FakeSession;
2394
2395 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2396 Ok(FakeSession)
2397 }
2398 }
2399
2400 #[async_trait]
2401 impl ModelAdapter for SlowAdapter {
2402 type Session = SlowSession;
2403
2404 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2405 Ok(SlowSession)
2406 }
2407 }
2408
2409 #[async_trait]
2410 impl ModelAdapter for RecordingAdapter {
2411 type Session = RecordingSession;
2412
2413 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2414 Ok(RecordingSession {
2415 seen_descriptions: self.seen_descriptions.clone(),
2416 seen_caches: self.seen_caches.clone(),
2417 })
2418 }
2419 }
2420
2421 #[async_trait]
2422 impl ModelAdapter for MultiToolAdapter {
2423 type Session = MultiToolSession;
2424
2425 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2426 Ok(MultiToolSession)
2427 }
2428 }
2429
2430 #[async_trait]
2431 impl ModelAdapter for DualApprovalAdapter {
2432 type Session = DualApprovalSession;
2433
2434 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2435 Ok(DualApprovalSession)
2436 }
2437 }
2438
2439 #[async_trait]
2440 impl ModelSession for FakeSession {
2441 type Turn = FakeTurn;
2442
2443 async fn begin_turn(
2444 &mut self,
2445 request: TurnRequest,
2446 _cancellation: Option<TurnCancellation>,
2447 ) -> Result<Self::Turn, LoopError> {
2448 let has_tool_result = request.transcript.iter().any(|item| {
2449 item.kind == ItemKind::Tool
2450 && item
2451 .parts
2452 .iter()
2453 .any(|part| matches!(part, Part::ToolResult(_)))
2454 });
2455 let tool_name = request
2456 .available_tools
2457 .first()
2458 .map(|tool| tool.name.0.clone())
2459 .unwrap_or_else(|| "echo".into());
2460
2461 let events = if has_tool_result {
2462 let result_text = request
2463 .transcript
2464 .iter()
2465 .rev()
2466 .find_map(|item| {
2467 item.parts.iter().find_map(|part| match part {
2468 Part::ToolResult(ToolResultPart {
2469 output: ToolOutput::Text(text),
2470 ..
2471 }) => Some(text.clone()),
2472 _ => None,
2473 })
2474 })
2475 .unwrap_or_else(|| "missing".into());
2476
2477 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2478 finish_reason: FinishReason::Completed,
2479 output_items: vec![Item {
2480 id: None,
2481 kind: ItemKind::Assistant,
2482 parts: vec![Part::Text(TextPart {
2483 text: format!("tool said: {result_text}"),
2484 metadata: MetadataMap::new(),
2485 })],
2486 metadata: MetadataMap::new(),
2487 }],
2488 usage: None,
2489 metadata: MetadataMap::new(),
2490 })])
2491 } else {
2492 VecDeque::from([
2493 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2494 id: ToolCallId::new("call-1"),
2495 name: tool_name.clone(),
2496 input: json!({ "value": "pong" }),
2497 metadata: MetadataMap::new(),
2498 }),
2499 ModelTurnEvent::Finished(ModelTurnResult {
2500 finish_reason: FinishReason::ToolCall,
2501 output_items: vec![Item {
2502 id: None,
2503 kind: ItemKind::Assistant,
2504 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2505 id: ToolCallId::new("call-1"),
2506 name: tool_name,
2507 input: json!({ "value": "pong" }),
2508 metadata: MetadataMap::new(),
2509 })],
2510 metadata: MetadataMap::new(),
2511 }],
2512 usage: None,
2513 metadata: MetadataMap::new(),
2514 }),
2515 ])
2516 };
2517
2518 Ok(FakeTurn { events })
2519 }
2520 }
2521
2522 #[async_trait]
2523 impl ModelSession for SlowSession {
2524 type Turn = SlowTurn;
2525
2526 async fn begin_turn(
2527 &mut self,
2528 request: TurnRequest,
2529 cancellation: Option<TurnCancellation>,
2530 ) -> Result<Self::Turn, LoopError> {
2531 let should_block = request
2532 .transcript
2533 .iter()
2534 .rev()
2535 .find(|item| item.kind == ItemKind::User)
2536 .is_some_and(|item| {
2537 item.parts.iter().any(|part| match part {
2538 Part::Text(text) => text.text == "do the long task",
2539 _ => false,
2540 })
2541 });
2542
2543 if should_block && let Some(cancellation) = cancellation {
2544 cancellation.cancelled().await;
2545 return Err(LoopError::Cancelled);
2546 }
2547
2548 Ok(SlowTurn { emitted: false })
2549 }
2550 }
2551
2552 #[async_trait]
2553 impl ModelSession for RecordingSession {
2554 type Turn = RecordingTurn;
2555
2556 async fn begin_turn(
2557 &mut self,
2558 request: TurnRequest,
2559 _cancellation: Option<TurnCancellation>,
2560 ) -> Result<Self::Turn, LoopError> {
2561 let descriptions = request
2562 .available_tools
2563 .iter()
2564 .map(|tool| tool.description.clone())
2565 .collect::<Vec<_>>();
2566 self.seen_descriptions.lock().unwrap().push(descriptions);
2567 self.seen_caches.lock().unwrap().push(request.cache.clone());
2568
2569 Ok(RecordingTurn { emitted: false })
2570 }
2571 }
2572
2573 #[async_trait]
2574 impl ModelSession for MultiToolSession {
2575 type Turn = MultiToolTurn;
2576
2577 async fn begin_turn(
2578 &mut self,
2579 request: TurnRequest,
2580 _cancellation: Option<TurnCancellation>,
2581 ) -> Result<Self::Turn, LoopError> {
2582 let has_tool_result = request.transcript.iter().any(|item| {
2583 item.kind == ItemKind::Tool
2584 && item
2585 .parts
2586 .iter()
2587 .any(|part| matches!(part, Part::ToolResult(_)))
2588 });
2589
2590 let events = if has_tool_result {
2591 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2592 finish_reason: FinishReason::Completed,
2593 output_items: vec![Item {
2594 id: None,
2595 kind: ItemKind::Assistant,
2596 parts: vec![Part::Text(TextPart {
2597 text: "mixed tools finished".into(),
2598 metadata: MetadataMap::new(),
2599 })],
2600 metadata: MetadataMap::new(),
2601 }],
2602 usage: None,
2603 metadata: MetadataMap::new(),
2604 })])
2605 } else {
2606 let foreground = agentkit_core::ToolCallPart {
2607 id: ToolCallId::new("call-foreground"),
2608 name: "foreground-wait".into(),
2609 input: json!({}),
2610 metadata: MetadataMap::new(),
2611 };
2612 let background = agentkit_core::ToolCallPart {
2613 id: ToolCallId::new("call-background"),
2614 name: "background-wait".into(),
2615 input: json!({}),
2616 metadata: MetadataMap::new(),
2617 };
2618 VecDeque::from([
2619 ModelTurnEvent::ToolCall(foreground.clone()),
2620 ModelTurnEvent::ToolCall(background.clone()),
2621 ModelTurnEvent::Finished(ModelTurnResult {
2622 finish_reason: FinishReason::ToolCall,
2623 output_items: vec![Item {
2624 id: None,
2625 kind: ItemKind::Assistant,
2626 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2627 metadata: MetadataMap::new(),
2628 }],
2629 usage: None,
2630 metadata: MetadataMap::new(),
2631 }),
2632 ])
2633 };
2634
2635 Ok(MultiToolTurn { events })
2636 }
2637 }
2638
2639 #[async_trait]
2640 impl ModelSession for DualApprovalSession {
2641 type Turn = DualApprovalTurn;
2642
2643 async fn begin_turn(
2644 &mut self,
2645 request: TurnRequest,
2646 _cancellation: Option<TurnCancellation>,
2647 ) -> Result<Self::Turn, LoopError> {
2648 let tool_results = request
2649 .transcript
2650 .iter()
2651 .flat_map(|item| item.parts.iter())
2652 .filter(|part| matches!(part, Part::ToolResult(_)))
2653 .count();
2654
2655 let events = if tool_results >= 2 {
2656 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2657 finish_reason: FinishReason::Completed,
2658 output_items: vec![Item {
2659 id: None,
2660 kind: ItemKind::Assistant,
2661 parts: vec![Part::Text(TextPart {
2662 text: "both approvals finished".into(),
2663 metadata: MetadataMap::new(),
2664 })],
2665 metadata: MetadataMap::new(),
2666 }],
2667 usage: None,
2668 metadata: MetadataMap::new(),
2669 })])
2670 } else {
2671 let first = agentkit_core::ToolCallPart {
2672 id: ToolCallId::new("call-1"),
2673 name: "echo".into(),
2674 input: json!({ "value": "first" }),
2675 metadata: MetadataMap::new(),
2676 };
2677 let second = agentkit_core::ToolCallPart {
2678 id: ToolCallId::new("call-2"),
2679 name: "echo".into(),
2680 input: json!({ "value": "second" }),
2681 metadata: MetadataMap::new(),
2682 };
2683 VecDeque::from([
2684 ModelTurnEvent::ToolCall(first.clone()),
2685 ModelTurnEvent::ToolCall(second.clone()),
2686 ModelTurnEvent::Finished(ModelTurnResult {
2687 finish_reason: FinishReason::ToolCall,
2688 output_items: vec![Item {
2689 id: None,
2690 kind: ItemKind::Assistant,
2691 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2692 metadata: MetadataMap::new(),
2693 }],
2694 usage: None,
2695 metadata: MetadataMap::new(),
2696 }),
2697 ])
2698 };
2699
2700 Ok(DualApprovalTurn { events })
2701 }
2702 }
2703
2704 #[async_trait]
2705 impl ModelTurn for FakeTurn {
2706 async fn next_event(
2707 &mut self,
2708 _cancellation: Option<TurnCancellation>,
2709 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2710 Ok(self.events.pop_front())
2711 }
2712 }
2713
2714 #[async_trait]
2715 impl ModelTurn for SlowTurn {
2716 async fn next_event(
2717 &mut self,
2718 cancellation: Option<TurnCancellation>,
2719 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2720 if let Some(cancellation) = cancellation
2721 && cancellation.is_cancelled()
2722 {
2723 return Err(LoopError::Cancelled);
2724 }
2725
2726 if self.emitted {
2727 Ok(None)
2728 } else {
2729 self.emitted = true;
2730 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2731 finish_reason: FinishReason::Completed,
2732 output_items: vec![Item {
2733 id: None,
2734 kind: ItemKind::Assistant,
2735 parts: vec![Part::Text(TextPart {
2736 text: "done".into(),
2737 metadata: MetadataMap::new(),
2738 })],
2739 metadata: MetadataMap::new(),
2740 }],
2741 usage: None,
2742 metadata: MetadataMap::new(),
2743 })))
2744 }
2745 }
2746 }
2747
2748 #[async_trait]
2749 impl ModelTurn for RecordingTurn {
2750 async fn next_event(
2751 &mut self,
2752 _cancellation: Option<TurnCancellation>,
2753 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2754 if self.emitted {
2755 Ok(None)
2756 } else {
2757 self.emitted = true;
2758 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2759 finish_reason: FinishReason::Completed,
2760 output_items: vec![Item {
2761 id: None,
2762 kind: ItemKind::Assistant,
2763 parts: vec![Part::Text(TextPart {
2764 text: "done".into(),
2765 metadata: MetadataMap::new(),
2766 })],
2767 metadata: MetadataMap::new(),
2768 }],
2769 usage: None,
2770 metadata: MetadataMap::new(),
2771 })))
2772 }
2773 }
2774 }
2775
2776 #[async_trait]
2777 impl ModelTurn for MultiToolTurn {
2778 async fn next_event(
2779 &mut self,
2780 _cancellation: Option<TurnCancellation>,
2781 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2782 Ok(self.events.pop_front())
2783 }
2784 }
2785
2786 #[async_trait]
2787 impl ModelTurn for DualApprovalTurn {
2788 async fn next_event(
2789 &mut self,
2790 _cancellation: Option<TurnCancellation>,
2791 ) -> Result<Option<ModelTurnEvent>, LoopError> {
2792 Ok(self.events.pop_front())
2793 }
2794 }
2795
2796 #[derive(Clone)]
2797 struct EchoTool {
2798 spec: ToolSpec,
2799 }
2800
2801 impl Default for EchoTool {
2802 fn default() -> Self {
2803 Self {
2804 spec: ToolSpec {
2805 name: ToolName::new("echo"),
2806 description: "Echo back a value".into(),
2807 input_schema: json!({
2808 "type": "object",
2809 "properties": {
2810 "value": { "type": "string" }
2811 },
2812 "required": ["value"],
2813 "additionalProperties": false
2814 }),
2815 annotations: ToolAnnotations::default(),
2816 metadata: MetadataMap::new(),
2817 },
2818 }
2819 }
2820 }
2821
2822 #[derive(Clone)]
2823 struct DynamicSpecTool {
2824 spec: ToolSpec,
2825 version: StdArc<AtomicUsize>,
2826 }
2827
2828 impl DynamicSpecTool {
2829 fn new(version: StdArc<AtomicUsize>) -> Self {
2830 Self {
2831 spec: ToolSpec {
2832 name: ToolName::new("dynamic"),
2833 description: "dynamic version 0".into(),
2834 input_schema: json!({
2835 "type": "object",
2836 "properties": {},
2837 "additionalProperties": false
2838 }),
2839 annotations: ToolAnnotations::default(),
2840 metadata: MetadataMap::new(),
2841 },
2842 version,
2843 }
2844 }
2845 }
2846
2847 #[async_trait]
2848 impl Tool for EchoTool {
2849 fn spec(&self) -> &ToolSpec {
2850 &self.spec
2851 }
2852
2853 fn proposed_requests(
2854 &self,
2855 request: &agentkit_tools_core::ToolRequest,
2856 ) -> Result<
2857 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2858 agentkit_tools_core::ToolError,
2859 > {
2860 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2861 path: "/tmp/echo".into(),
2862 metadata: request.metadata.clone(),
2863 })])
2864 }
2865
2866 async fn invoke(
2867 &self,
2868 request: agentkit_tools_core::ToolRequest,
2869 _ctx: &mut ToolContext<'_>,
2870 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2871 let value = request
2872 .input
2873 .get("value")
2874 .and_then(Value::as_str)
2875 .ok_or_else(|| {
2876 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2877 })?;
2878
2879 Ok(ToolResult {
2880 result: ToolResultPart {
2881 call_id: request.call_id,
2882 output: ToolOutput::Text(value.into()),
2883 is_error: false,
2884 metadata: MetadataMap::new(),
2885 },
2886 duration: None,
2887 metadata: MetadataMap::new(),
2888 })
2889 }
2890 }
2891
2892 #[async_trait]
2893 impl Tool for DynamicSpecTool {
2894 fn spec(&self) -> &ToolSpec {
2895 &self.spec
2896 }
2897
2898 fn current_spec(&self) -> Option<ToolSpec> {
2899 let mut spec = self.spec.clone();
2900 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2901 Some(spec)
2902 }
2903
2904 async fn invoke(
2905 &self,
2906 request: agentkit_tools_core::ToolRequest,
2907 _ctx: &mut ToolContext<'_>,
2908 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2909 Ok(ToolResult {
2910 result: ToolResultPart {
2911 call_id: request.call_id,
2912 output: ToolOutput::Text("ok".into()),
2913 is_error: false,
2914 metadata: MetadataMap::new(),
2915 },
2916 duration: None,
2917 metadata: MetadataMap::new(),
2918 })
2919 }
2920 }
2921
2922 struct DenyFsReads;
2923
2924 impl PermissionChecker for DenyFsReads {
2925 fn evaluate(
2926 &self,
2927 request: &dyn agentkit_tools_core::PermissionRequest,
2928 ) -> PermissionDecision {
2929 if request.kind() == "filesystem.read" {
2930 return PermissionDecision::Deny(PermissionDenial {
2931 code: PermissionCode::PathNotAllowed,
2932 message: "reads denied in test".into(),
2933 metadata: MetadataMap::new(),
2934 });
2935 }
2936
2937 PermissionDecision::Allow
2938 }
2939 }
2940
2941 struct ApproveFsReads;
2942
2943 impl PermissionChecker for ApproveFsReads {
2944 fn evaluate(
2945 &self,
2946 request: &dyn agentkit_tools_core::PermissionRequest,
2947 ) -> PermissionDecision {
2948 if request.kind() == "filesystem.read" {
2949 return PermissionDecision::RequireApproval(ApprovalRequest {
2950 task_id: None,
2951 call_id: None,
2952 id: "approval:fs-read".into(),
2953 request_kind: request.kind().into(),
2954 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2955 summary: request.summary(),
2956 metadata: request.metadata().clone(),
2957 });
2958 }
2959
2960 PermissionDecision::Allow
2961 }
2962 }
2963
2964 struct CountTrigger;
2965
2966 impl CompactionTrigger for CountTrigger {
2967 fn should_compact(
2968 &self,
2969 _session_id: &SessionId,
2970 _turn_id: Option<&agentkit_core::TurnId>,
2971 transcript: &[Item],
2972 ) -> Option<agentkit_compaction::CompactionReason> {
2973 (transcript.len() >= 2)
2974 .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2975 }
2976 }
2977
2978 struct RecordingObserver {
2979 events: StdArc<StdMutex<Vec<AgentEvent>>>,
2980 }
2981
2982 impl LoopObserver for RecordingObserver {
2983 fn handle_event(&mut self, event: AgentEvent) {
2984 self.events.lock().unwrap().push(event);
2985 }
2986 }
2987
2988 struct CatalogExecutor {
2989 version: AtomicUsize,
2990 events: StdMutex<Vec<ToolCatalogEvent>>,
2991 }
2992
2993 impl CatalogExecutor {
2994 fn new() -> Self {
2995 Self {
2996 version: AtomicUsize::new(0),
2997 events: StdMutex::new(Vec::new()),
2998 }
2999 }
3000
3001 fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3002 self.version.store(version, Ordering::SeqCst);
3003 self.events.lock().unwrap().push(event);
3004 }
3005 }
3006
3007 #[async_trait]
3008 impl ToolExecutor for CatalogExecutor {
3009 fn specs(&self) -> Vec<ToolSpec> {
3010 vec![ToolSpec {
3011 name: ToolName::new("dynamic"),
3012 description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3013 input_schema: json!({
3014 "type": "object",
3015 "properties": {},
3016 "additionalProperties": false
3017 }),
3018 annotations: ToolAnnotations::default(),
3019 metadata: MetadataMap::new(),
3020 }]
3021 }
3022
3023 fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3024 std::mem::take(&mut *self.events.lock().unwrap())
3025 }
3026
3027 async fn execute(
3028 &self,
3029 request: ToolRequest,
3030 _ctx: &mut ToolContext<'_>,
3031 ) -> ToolExecutionOutcome {
3032 ToolExecutionOutcome::Completed(ToolResult {
3033 result: ToolResultPart {
3034 call_id: request.call_id,
3035 output: ToolOutput::Text("dynamic-ok".into()),
3036 is_error: false,
3037 metadata: MetadataMap::new(),
3038 },
3039 duration: None,
3040 metadata: MetadataMap::new(),
3041 })
3042 }
3043 }
3044
3045 #[derive(Clone)]
3046 struct BlockingTool {
3047 spec: ToolSpec,
3048 entered: StdArc<AtomicBool>,
3049 release: StdArc<Notify>,
3050 output: &'static str,
3051 }
3052
3053 impl BlockingTool {
3054 fn new(
3055 name: &str,
3056 entered: StdArc<AtomicBool>,
3057 release: StdArc<Notify>,
3058 output: &'static str,
3059 ) -> Self {
3060 Self {
3061 spec: ToolSpec {
3062 name: ToolName::new(name),
3063 description: format!("blocking tool {name}"),
3064 input_schema: json!({
3065 "type": "object",
3066 "properties": {},
3067 "additionalProperties": false
3068 }),
3069 annotations: ToolAnnotations::default(),
3070 metadata: MetadataMap::new(),
3071 },
3072 entered,
3073 release,
3074 output,
3075 }
3076 }
3077 }
3078
3079 #[async_trait]
3080 impl Tool for BlockingTool {
3081 fn spec(&self) -> &ToolSpec {
3082 &self.spec
3083 }
3084
3085 async fn invoke(
3086 &self,
3087 request: agentkit_tools_core::ToolRequest,
3088 _ctx: &mut ToolContext<'_>,
3089 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3090 self.entered.store(true, Ordering::SeqCst);
3091 self.release.notified().await;
3092 Ok(ToolResult {
3093 result: ToolResultPart {
3094 call_id: request.call_id,
3095 output: ToolOutput::Text(self.output.into()),
3096 is_error: false,
3097 metadata: MetadataMap::new(),
3098 },
3099 duration: None,
3100 metadata: MetadataMap::new(),
3101 })
3102 }
3103 }
3104
3105 struct NameRoutingPolicy {
3106 routes: Vec<(String, RoutingDecision)>,
3107 }
3108
3109 impl NameRoutingPolicy {
3110 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3111 Self {
3112 routes: routes
3113 .into_iter()
3114 .map(|(name, decision)| (name.into(), decision))
3115 .collect(),
3116 }
3117 }
3118 }
3119
3120 impl TaskRoutingPolicy for NameRoutingPolicy {
3121 fn route(&self, request: &ToolRequest) -> RoutingDecision {
3122 self.routes
3123 .iter()
3124 .find(|(name, _)| name == &request.tool_name.0)
3125 .map(|(_, decision)| *decision)
3126 .unwrap_or(RoutingDecision::Foreground)
3127 }
3128 }
3129
3130 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3131 timeout(Duration::from_secs(1), handle.next_event())
3132 .await
3133 .expect("timed out waiting for task event")
3134 .expect("task event stream ended unexpectedly")
3135 }
3136
3137 async fn wait_until_entered(flag: &AtomicBool) {
3138 timeout(Duration::from_secs(1), async {
3139 while !flag.load(Ordering::SeqCst) {
3140 tokio::task::yield_now().await;
3141 }
3142 })
3143 .await
3144 .expect("task never entered execution");
3145 }
3146
3147 #[tokio::test]
3148 async fn loop_continues_after_completed_tool_call() {
3149 let tools = ToolRegistry::new().with(EchoTool::default());
3150 let agent = Agent::builder()
3151 .model(FakeAdapter)
3152 .add_tool_source(tools)
3153 .permissions(AllowAllPermissions)
3154 .build()
3155 .unwrap();
3156
3157 let mut driver = agent
3158 .start(SessionConfig {
3159 session_id: SessionId::new("session-1"),
3160 metadata: MetadataMap::new(),
3161 cache: None,
3162 })
3163 .await
3164 .unwrap();
3165
3166 driver
3167 .submit_input(vec![Item {
3168 id: None,
3169 kind: ItemKind::User,
3170 parts: vec![Part::Text(TextPart {
3171 text: "ping".into(),
3172 metadata: MetadataMap::new(),
3173 })],
3174 metadata: MetadataMap::new(),
3175 }])
3176 .unwrap();
3177
3178 let result = run_until_finished(&mut driver).await;
3179
3180 match result {
3181 LoopStep::Finished(turn) => {
3182 assert_eq!(turn.finish_reason, FinishReason::Completed);
3183 assert_eq!(turn.items.len(), 1);
3184 match &turn.items[0].parts[0] {
3185 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3186 other => panic!("unexpected part: {other:?}"),
3187 }
3188 }
3189 other => panic!("unexpected loop step: {other:?}"),
3190 }
3191 }
3192
3193 async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3197 loop {
3198 match driver.next().await.unwrap() {
3199 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3200 step => return step,
3201 }
3202 }
3203 }
3204
3205 #[tokio::test]
3206 async fn loop_uses_injected_permission_checker() {
3207 let tools = ToolRegistry::new().with(EchoTool::default());
3208 let agent = Agent::builder()
3209 .model(FakeAdapter)
3210 .add_tool_source(tools)
3211 .permissions(DenyFsReads)
3212 .build()
3213 .unwrap();
3214
3215 let mut driver = agent
3216 .start(SessionConfig {
3217 session_id: SessionId::new("session-2"),
3218 metadata: MetadataMap::new(),
3219 cache: None,
3220 })
3221 .await
3222 .unwrap();
3223
3224 driver
3225 .submit_input(vec![Item {
3226 id: None,
3227 kind: ItemKind::User,
3228 parts: vec![Part::Text(TextPart {
3229 text: "ping".into(),
3230 metadata: MetadataMap::new(),
3231 })],
3232 metadata: MetadataMap::new(),
3233 }])
3234 .unwrap();
3235
3236 let result = run_until_finished(&mut driver).await;
3237
3238 match result {
3239 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3240 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3241 other => panic!("unexpected part: {other:?}"),
3242 },
3243 other => panic!("unexpected loop step: {other:?}"),
3244 }
3245 }
3246
3247 #[tokio::test]
3248 async fn async_task_manager_background_round_requires_explicit_continue() {
3249 let entered = StdArc::new(AtomicBool::new(false));
3250 let release = StdArc::new(Notify::new());
3251 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3252 "background-wait",
3253 RoutingDecision::Background,
3254 )]));
3255 let handle = task_manager.handle();
3256 let tools = ToolRegistry::new().with(BlockingTool::new(
3257 "background-wait",
3258 entered.clone(),
3259 release.clone(),
3260 "background-done",
3261 ));
3262 let agent = Agent::builder()
3263 .model(FakeAdapter)
3264 .add_tool_source(tools)
3265 .permissions(AllowAllPermissions)
3266 .task_manager(task_manager)
3267 .build()
3268 .unwrap();
3269
3270 let mut driver = agent
3271 .start(SessionConfig {
3272 session_id: SessionId::new("session-background"),
3273 metadata: MetadataMap::new(),
3274 cache: None,
3275 })
3276 .await
3277 .unwrap();
3278
3279 driver
3280 .submit_input(vec![Item {
3281 id: None,
3282 kind: ItemKind::User,
3283 parts: vec![Part::Text(TextPart {
3284 text: "ping".into(),
3285 metadata: MetadataMap::new(),
3286 })],
3287 metadata: MetadataMap::new(),
3288 }])
3289 .unwrap();
3290
3291 let first = driver.next().await.unwrap();
3292 match first {
3293 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3294 other => panic!("unexpected first loop step: {other:?}"),
3295 }
3296
3297 match wait_for_task_event(&handle).await {
3298 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3299 other => panic!("unexpected task event: {other:?}"),
3300 }
3301 wait_until_entered(entered.as_ref()).await;
3302 release.notify_waiters();
3303
3304 match wait_for_task_event(&handle).await {
3305 TaskEvent::Completed(_, result) => {
3306 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3307 }
3308 other => panic!("unexpected completion event: {other:?}"),
3309 }
3310
3311 let resumed = driver.next().await.unwrap();
3312 match resumed {
3313 LoopStep::Finished(turn) => {
3314 assert_eq!(turn.finish_reason, FinishReason::Completed);
3315 match &turn.items[0].parts[0] {
3316 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3317 other => panic!("unexpected part after resume: {other:?}"),
3318 }
3319 }
3320 other => panic!("unexpected resumed step: {other:?}"),
3321 }
3322 }
3323
3324 #[tokio::test]
3325 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3326 let controller = CancellationController::new();
3327 let agent = Agent::builder()
3328 .model(SlowAdapter)
3329 .cancellation(controller.handle())
3330 .build()
3331 .unwrap();
3332
3333 let mut driver = agent
3334 .start(SessionConfig {
3335 session_id: SessionId::new("session-cancel"),
3336 metadata: MetadataMap::new(),
3337 cache: None,
3338 })
3339 .await
3340 .unwrap();
3341
3342 driver
3343 .submit_input(vec![Item {
3344 id: None,
3345 kind: ItemKind::User,
3346 parts: vec![Part::Text(TextPart {
3347 text: "do the long task".into(),
3348 metadata: MetadataMap::new(),
3349 })],
3350 metadata: MetadataMap::new(),
3351 }])
3352 .unwrap();
3353
3354 let cancelled = tokio::join!(async { driver.next().await }, async {
3355 tokio::task::yield_now().await;
3356 controller.interrupt();
3357 })
3358 .0
3359 .unwrap();
3360
3361 match cancelled {
3362 LoopStep::Finished(turn) => {
3363 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3364 assert_eq!(turn.items.len(), 1);
3365 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3366 assert_eq!(
3367 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3368 Some(&Value::Bool(true))
3369 );
3370 }
3371 other => panic!("unexpected loop step: {other:?}"),
3372 }
3373
3374 driver
3375 .submit_input(vec![Item {
3376 id: None,
3377 kind: ItemKind::User,
3378 parts: vec![Part::Text(TextPart {
3379 text: "try again".into(),
3380 metadata: MetadataMap::new(),
3381 })],
3382 metadata: MetadataMap::new(),
3383 }])
3384 .unwrap();
3385
3386 let result = driver.next().await.unwrap();
3387 match result {
3388 LoopStep::Finished(turn) => {
3389 assert_eq!(turn.finish_reason, FinishReason::Completed);
3390 }
3391 other => panic!("unexpected loop step after retry: {other:?}"),
3392 }
3393 }
3394
3395 #[tokio::test]
3396 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3397 let controller = CancellationController::new();
3398 let fg_entered = StdArc::new(AtomicBool::new(false));
3399 let fg_release = StdArc::new(Notify::new());
3400 let bg_entered = StdArc::new(AtomicBool::new(false));
3401 let bg_release = StdArc::new(Notify::new());
3402 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3403 ("foreground-wait", RoutingDecision::Foreground),
3404 ("background-wait", RoutingDecision::Background),
3405 ]));
3406 let handle = task_manager.handle();
3407 let tools = ToolRegistry::new()
3408 .with(BlockingTool::new(
3409 "foreground-wait",
3410 fg_entered.clone(),
3411 fg_release,
3412 "foreground-done",
3413 ))
3414 .with(BlockingTool::new(
3415 "background-wait",
3416 bg_entered.clone(),
3417 bg_release.clone(),
3418 "background-done",
3419 ));
3420 let agent = Agent::builder()
3421 .model(MultiToolAdapter)
3422 .add_tool_source(tools)
3423 .permissions(AllowAllPermissions)
3424 .cancellation(controller.handle())
3425 .task_manager(task_manager)
3426 .build()
3427 .unwrap();
3428
3429 let mut driver = agent
3430 .start(SessionConfig {
3431 session_id: SessionId::new("session-mixed-cancel"),
3432 metadata: MetadataMap::new(),
3433 cache: None,
3434 })
3435 .await
3436 .unwrap();
3437
3438 driver
3439 .submit_input(vec![Item {
3440 id: None,
3441 kind: ItemKind::User,
3442 parts: vec![Part::Text(TextPart {
3443 text: "run both".into(),
3444 metadata: MetadataMap::new(),
3445 })],
3446 metadata: MetadataMap::new(),
3447 }])
3448 .unwrap();
3449
3450 let cancelled = tokio::join!(async { driver.next().await }, async {
3451 let _ = wait_for_task_event(&handle).await;
3452 let _ = wait_for_task_event(&handle).await;
3453 wait_until_entered(fg_entered.as_ref()).await;
3454 wait_until_entered(bg_entered.as_ref()).await;
3455 controller.interrupt();
3456 })
3457 .0
3458 .unwrap();
3459
3460 match cancelled {
3461 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3462 other => panic!("unexpected loop step after interrupt: {other:?}"),
3463 }
3464
3465 match wait_for_task_event(&handle).await {
3466 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3467 other => panic!("unexpected post-interrupt event: {other:?}"),
3468 }
3469
3470 let running = handle.list_running().await;
3471 assert_eq!(running.len(), 1);
3472 assert_eq!(running[0].tool_name, "background-wait");
3473
3474 bg_release.notify_waiters();
3475 match wait_for_task_event(&handle).await {
3476 TaskEvent::Completed(snapshot, result) => {
3477 assert_eq!(snapshot.tool_name, "background-wait");
3478 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3479 }
3480 other => panic!("unexpected background completion event: {other:?}"),
3481 }
3482 }
3483
3484 #[tokio::test]
3485 async fn loop_resumes_after_approved_tool_request() {
3486 let tools = ToolRegistry::new().with(EchoTool::default());
3487 let agent = Agent::builder()
3488 .model(FakeAdapter)
3489 .add_tool_source(tools)
3490 .permissions(ApproveFsReads)
3491 .build()
3492 .unwrap();
3493
3494 let mut driver = agent
3495 .start(SessionConfig {
3496 session_id: SessionId::new("session-approval"),
3497 metadata: MetadataMap::new(),
3498 cache: None,
3499 })
3500 .await
3501 .unwrap();
3502
3503 driver
3504 .submit_input(vec![Item {
3505 id: None,
3506 kind: ItemKind::User,
3507 parts: vec![Part::Text(TextPart {
3508 text: "ping".into(),
3509 metadata: MetadataMap::new(),
3510 })],
3511 metadata: MetadataMap::new(),
3512 }])
3513 .unwrap();
3514
3515 let first = driver.next().await.unwrap();
3516 match first {
3517 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3518 assert!(pending.request.task_id.is_some());
3519 assert_eq!(pending.request.id.0, "approval:fs-read");
3520 pending.approve(&mut driver).unwrap();
3521 }
3522 other => panic!("unexpected loop step: {other:?}"),
3523 }
3524 let second = driver.next().await.unwrap();
3525 match second {
3526 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3527 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3528 other => panic!("unexpected part: {other:?}"),
3529 },
3530 other => panic!("unexpected loop step after approval: {other:?}"),
3531 }
3532 }
3533
3534 #[tokio::test]
3535 async fn loop_resumes_with_patched_input_on_approval() {
3536 let tools = ToolRegistry::new().with(EchoTool::default());
3537 let agent = Agent::builder()
3538 .model(FakeAdapter)
3539 .add_tool_source(tools)
3540 .permissions(ApproveFsReads)
3541 .build()
3542 .unwrap();
3543
3544 let mut driver = agent
3545 .start(SessionConfig {
3546 session_id: SessionId::new("session-approval-patched"),
3547 metadata: MetadataMap::new(),
3548 cache: None,
3549 })
3550 .await
3551 .unwrap();
3552
3553 driver
3554 .submit_input(vec![Item {
3555 id: None,
3556 kind: ItemKind::User,
3557 parts: vec![Part::Text(TextPart {
3558 text: "ping".into(),
3559 metadata: MetadataMap::new(),
3560 })],
3561 metadata: MetadataMap::new(),
3562 }])
3563 .unwrap();
3564
3565 match driver.next().await.unwrap() {
3566 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3567 pending
3568 .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3569 .unwrap();
3570 }
3571 other => panic!("unexpected loop step: {other:?}"),
3572 }
3573 match driver.next().await.unwrap() {
3574 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3575 Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
3576 other => panic!("unexpected part: {other:?}"),
3577 },
3578 other => panic!("unexpected loop step after approval: {other:?}"),
3579 }
3580 }
3581
3582 #[tokio::test]
3583 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3584 let tools = ToolRegistry::new().with(EchoTool::default());
3585 let agent = Agent::builder()
3586 .model(DualApprovalAdapter)
3587 .add_tool_source(tools)
3588 .permissions(ApproveFsReads)
3589 .build()
3590 .unwrap();
3591
3592 let mut driver = agent
3593 .start(SessionConfig {
3594 session_id: SessionId::new("session-dual-approval"),
3595 metadata: MetadataMap::new(),
3596 cache: None,
3597 })
3598 .await
3599 .unwrap();
3600
3601 driver
3602 .submit_input(vec![Item {
3603 id: None,
3604 kind: ItemKind::User,
3605 parts: vec![Part::Text(TextPart {
3606 text: "run both approvals".into(),
3607 metadata: MetadataMap::new(),
3608 })],
3609 metadata: MetadataMap::new(),
3610 }])
3611 .unwrap();
3612
3613 let pending_first = match driver.next().await.unwrap() {
3614 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3615 assert_eq!(
3616 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3617 Some("call-1")
3618 );
3619 pending
3620 }
3621 other => panic!("unexpected first loop step: {other:?}"),
3622 };
3623
3624 let pending_second = match driver.next().await.unwrap() {
3625 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3626 assert_eq!(
3627 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3628 Some("call-2")
3629 );
3630 pending
3631 }
3632 other => panic!("unexpected second loop step: {other:?}"),
3633 };
3634
3635 pending_second.approve(&mut driver).unwrap();
3636 match driver.next().await.unwrap() {
3637 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3638 assert_eq!(
3639 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3640 Some("call-1")
3641 );
3642 }
3643 other => panic!("unexpected step after approving second request: {other:?}"),
3644 }
3645
3646 pending_first.approve(&mut driver).unwrap();
3647 match driver.next().await.unwrap() {
3648 LoopStep::Finished(turn) => {
3649 assert_eq!(turn.finish_reason, FinishReason::Completed);
3650 match &turn.items[0].parts[0] {
3651 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3652 other => panic!("unexpected final part: {other:?}"),
3653 }
3654 }
3655 other => panic!("unexpected final loop step: {other:?}"),
3656 }
3657 }
3658
3659 #[tokio::test]
3660 async fn loop_compacts_transcript_before_new_turns() {
3661 let events = StdArc::new(StdMutex::new(Vec::new()));
3662 let agent = Agent::builder()
3663 .model(FakeAdapter)
3664 .compaction(CompactionConfig::new(
3665 CountTrigger,
3666 CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3667 ))
3668 .observer(RecordingObserver {
3669 events: events.clone(),
3670 })
3671 .build()
3672 .unwrap();
3673
3674 let mut driver = agent
3675 .start(SessionConfig {
3676 session_id: SessionId::new("session-4"),
3677 metadata: MetadataMap::new(),
3678 cache: None,
3679 })
3680 .await
3681 .unwrap();
3682
3683 for text in ["first", "second"] {
3684 driver
3685 .submit_input(vec![Item {
3686 id: None,
3687 kind: ItemKind::User,
3688 parts: vec![Part::Text(TextPart {
3689 text: text.into(),
3690 metadata: MetadataMap::new(),
3691 })],
3692 metadata: MetadataMap::new(),
3693 }])
3694 .unwrap();
3695 let _ = driver.next().await.unwrap();
3696 }
3697
3698 let events = events.lock().unwrap();
3699 assert!(events.iter().any(|event| matches!(
3700 event,
3701 AgentEvent::CompactionFinished {
3702 replaced_items,
3703 ..
3704 } if *replaced_items > 0
3705 )));
3706 }
3707
3708 #[tokio::test]
3709 async fn loop_refreshes_tool_specs_each_turn() {
3710 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3711 let version = StdArc::new(AtomicUsize::new(1));
3712 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3713 let agent = Agent::builder()
3714 .model(RecordingAdapter {
3715 seen_descriptions: seen_descriptions.clone(),
3716 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3717 })
3718 .add_tool_source(tools)
3719 .permissions(AllowAllPermissions)
3720 .build()
3721 .unwrap();
3722
3723 let mut driver = agent
3724 .start(SessionConfig {
3725 session_id: SessionId::new("session-dynamic-tools"),
3726 metadata: MetadataMap::new(),
3727 cache: None,
3728 })
3729 .await
3730 .unwrap();
3731
3732 for text in ["first", "second"] {
3733 driver
3734 .submit_input(vec![Item {
3735 id: None,
3736 kind: ItemKind::User,
3737 parts: vec![Part::Text(TextPart {
3738 text: text.into(),
3739 metadata: MetadataMap::new(),
3740 })],
3741 metadata: MetadataMap::new(),
3742 }])
3743 .unwrap();
3744
3745 let _ = driver.next().await.unwrap();
3746 if text == "first" {
3747 version.store(2, Ordering::SeqCst);
3748 }
3749 }
3750
3751 let seen_descriptions = seen_descriptions.lock().unwrap();
3752 assert_eq!(seen_descriptions.len(), 2);
3753 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3754 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3755 }
3756
3757 #[tokio::test]
3758 async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
3759 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3760 let events = StdArc::new(StdMutex::new(Vec::new()));
3761 let executor = StdArc::new(CatalogExecutor::new());
3762 let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
3763 let agent = Agent::builder()
3764 .model(RecordingAdapter {
3765 seen_descriptions: seen_descriptions.clone(),
3766 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3767 })
3768 .tool_executor(executor_for_agent)
3769 .permissions(AllowAllPermissions)
3770 .observer(RecordingObserver {
3771 events: events.clone(),
3772 })
3773 .build()
3774 .unwrap();
3775
3776 let mut driver = agent
3777 .start(SessionConfig {
3778 session_id: SessionId::new("session-catalog-events"),
3779 metadata: MetadataMap::new(),
3780 cache: None,
3781 })
3782 .await
3783 .unwrap();
3784
3785 driver
3786 .submit_input(vec![Item::text(ItemKind::User, "first")])
3787 .unwrap();
3788 let _ = driver.next().await.unwrap();
3789
3790 executor.publish_change(
3791 1,
3792 ToolCatalogEvent {
3793 source: "mcp:mock".into(),
3794 added: vec!["dynamic".into()],
3795 removed: Vec::new(),
3796 changed: Vec::new(),
3797 },
3798 );
3799
3800 driver
3801 .submit_input(vec![Item::text(ItemKind::User, "second")])
3802 .unwrap();
3803 let _ = driver.next().await.unwrap();
3804
3805 let seen_descriptions = seen_descriptions.lock().unwrap();
3806 assert_eq!(seen_descriptions.len(), 2);
3807 assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
3808 assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
3809
3810 let events = events.lock().unwrap();
3811 assert!(events.iter().any(|event| matches!(
3812 event,
3813 AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
3814 source,
3815 added,
3816 removed,
3817 changed,
3818 }) if source == "mcp:mock"
3819 && added == &vec!["dynamic".to_string()]
3820 && removed.is_empty()
3821 && changed.is_empty()
3822 )));
3823 }
3824
3825 #[tokio::test]
3826 async fn loop_passes_session_default_and_next_turn_cache_requests() {
3827 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3828 let agent = Agent::builder()
3829 .model(RecordingAdapter {
3830 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3831 seen_caches: seen_caches.clone(),
3832 })
3833 .permissions(AllowAllPermissions)
3834 .build()
3835 .unwrap();
3836
3837 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3838 .with_retention(PromptCacheRetention::Short);
3839 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3840 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3841 });
3842
3843 let mut driver = agent
3844 .start(SessionConfig {
3845 session_id: SessionId::new("session-cache"),
3846 metadata: MetadataMap::new(),
3847 cache: Some(default_cache.clone()),
3848 })
3849 .await
3850 .unwrap();
3851
3852 driver
3853 .submit_input(vec![Item {
3854 id: None,
3855 kind: ItemKind::User,
3856 parts: vec![Part::Text(TextPart {
3857 text: "first".into(),
3858 metadata: MetadataMap::new(),
3859 })],
3860 metadata: MetadataMap::new(),
3861 }])
3862 .unwrap();
3863 let _ = driver.next().await.unwrap();
3864
3865 driver
3866 .submit_input_with_cache(
3867 vec![Item {
3868 id: None,
3869 kind: ItemKind::User,
3870 parts: vec![Part::Text(TextPart {
3871 text: "second".into(),
3872 metadata: MetadataMap::new(),
3873 })],
3874 metadata: MetadataMap::new(),
3875 }],
3876 override_cache.clone(),
3877 )
3878 .unwrap();
3879 let _ = driver.next().await.unwrap();
3880
3881 let seen = seen_caches.lock().unwrap();
3882 assert_eq!(seen.len(), 2);
3883 assert_eq!(seen[0], Some(default_cache));
3884 assert_eq!(seen[1], Some(override_cache));
3885 }
3886
3887 #[tokio::test]
3888 async fn loop_yields_after_tool_result_between_rounds() {
3889 let tools = ToolRegistry::new().with(EchoTool::default());
3890 let agent = Agent::builder()
3891 .model(FakeAdapter)
3892 .add_tool_source(tools)
3893 .permissions(AllowAllPermissions)
3894 .build()
3895 .unwrap();
3896
3897 let mut driver = agent
3898 .start(SessionConfig {
3899 session_id: SessionId::new("yield-session"),
3900 metadata: MetadataMap::new(),
3901 cache: None,
3902 })
3903 .await
3904 .unwrap();
3905
3906 driver
3907 .submit_input(vec![Item::text(ItemKind::User, "ping")])
3908 .unwrap();
3909
3910 let step = driver.next().await.unwrap();
3913 let info = match step {
3914 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
3915 other => panic!("expected AfterToolResult, got {other:?}"),
3916 };
3917 assert_eq!(info.session_id, SessionId::new("yield-session"));
3918 assert_eq!(info.transcript_len, 3);
3920
3921 let interrupt = LoopInterrupt::AfterToolResult(info.clone());
3923 assert!(!interrupt.is_blocking());
3924
3925 driver
3927 .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
3928 .unwrap();
3929
3930 let step = driver.next().await.unwrap();
3933 match step {
3934 LoopStep::Finished(turn) => {
3935 assert_eq!(turn.finish_reason, FinishReason::Completed);
3936 }
3937 other => panic!("expected Finished, got {other:?}"),
3938 }
3939
3940 let snapshot = driver.snapshot();
3942 let has_injected_message = snapshot.transcript.iter().any(|item| {
3943 item.kind == ItemKind::User
3944 && item.parts.iter().any(|part| match part {
3945 Part::Text(text) => text.text == "also: report back",
3946 _ => false,
3947 })
3948 });
3949 assert!(
3950 has_injected_message,
3951 "injected user message should be in transcript, got: {:?}",
3952 snapshot.transcript
3953 );
3954 }
3955
3956 struct RecordingTranscriptObserver {
3957 items: StdArc<StdMutex<Vec<Item>>>,
3958 }
3959
3960 impl TranscriptObserver for RecordingTranscriptObserver {
3961 fn on_item_appended(&mut self, item: &Item) {
3962 self.items.lock().unwrap().push(item.clone());
3963 }
3964 }
3965
3966 #[tokio::test]
3967 async fn observers_see_full_tool_round() {
3968 let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
3974 let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
3975 let agent = Agent::builder()
3976 .model(FakeAdapter)
3977 .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
3978 .permissions(AllowAllPermissions)
3979 .observer(RecordingObserver {
3980 events: events.clone(),
3981 })
3982 .transcript_observer(RecordingTranscriptObserver {
3983 items: items.clone(),
3984 })
3985 .build()
3986 .unwrap();
3987
3988 let mut driver = agent
3989 .start(SessionConfig {
3990 session_id: SessionId::new("observer-session"),
3991 metadata: MetadataMap::new(),
3992 cache: None,
3993 })
3994 .await
3995 .unwrap();
3996
3997 driver
3998 .submit_input(vec![Item {
3999 id: None,
4000 kind: ItemKind::User,
4001 parts: vec![Part::Text(TextPart {
4002 text: "ping".into(),
4003 metadata: MetadataMap::new(),
4004 })],
4005 metadata: MetadataMap::new(),
4006 }])
4007 .unwrap();
4008
4009 let result = run_until_finished(&mut driver).await;
4010 assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4011
4012 let events = events.lock().unwrap().clone();
4015 let tool_call_id = events.iter().find_map(|e| match e {
4016 AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4017 _ => None,
4018 });
4019 let tool_results: Vec<_> = events
4020 .iter()
4021 .filter_map(|e| match e {
4022 AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4023 _ => None,
4024 })
4025 .collect();
4026 assert_eq!(tool_results.len(), 1, "events: {events:?}");
4027 assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4028 assert!(!tool_results[0].is_error);
4029
4030 let items = items.lock().unwrap().clone();
4034 assert_eq!(items.len(), 4, "items: {items:?}");
4035 assert_eq!(items[0].kind, ItemKind::User);
4036 assert_eq!(items[1].kind, ItemKind::Assistant);
4037 assert!(
4038 items[1]
4039 .parts
4040 .iter()
4041 .any(|p| matches!(p, Part::ToolCall(_)))
4042 );
4043 assert_eq!(items[2].kind, ItemKind::Tool);
4044 assert!(
4045 items[2]
4046 .parts
4047 .iter()
4048 .any(|p| matches!(p, Part::ToolResult(_)))
4049 );
4050 assert_eq!(items[3].kind, ItemKind::Assistant);
4051 }
4052
4053 #[test]
4054 fn convenience_cache_builders_construct_expected_defaults() {
4055 let cache = PromptCacheRequest::automatic()
4056 .with_retention(PromptCacheRetention::Short)
4057 .with_key("workspace:demo");
4058 let session = SessionConfig::new("demo").with_cache(cache.clone());
4059
4060 assert_eq!(session.session_id, SessionId::new("demo"));
4061 assert_eq!(session.cache, Some(cache));
4062
4063 let explicit = PromptCacheRequest::explicit([
4064 PromptCacheBreakpoint::tools_end(),
4065 PromptCacheBreakpoint::transcript_item_end(2),
4066 PromptCacheBreakpoint::transcript_part_end(3, 1),
4067 ]);
4068
4069 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4070 assert_eq!(
4071 explicit.strategy,
4072 PromptCacheStrategy::Explicit {
4073 breakpoints: vec![
4074 PromptCacheBreakpoint::ToolsEnd,
4075 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4076 PromptCacheBreakpoint::TranscriptPartEnd {
4077 item_index: 3,
4078 part_index: 1,
4079 },
4080 ],
4081 }
4082 );
4083 }
4084}