1use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69 TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70 Usage,
71};
72use agentkit_task_manager::{
73 PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74 TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79 AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80 PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutionScope, ToolExecutor, ToolRequest,
81 ToolResources, ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108 pub session_id: SessionId,
110 pub metadata: MetadataMap,
112 pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117 pub fn new(session_id: impl Into<SessionId>) -> Self {
119 Self {
120 session_id: session_id.into(),
121 metadata: MetadataMap::new(),
122 cache: None,
123 }
124 }
125
126 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128 self.metadata = metadata;
129 self
130 }
131
132 pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134 self.cache = Some(cache);
135 self
136 }
137
138 pub fn without_cache(mut self) -> Self {
140 self.cache = None;
141 self
142 }
143}
144
145#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152 Disabled,
154 #[default]
156 BestEffort,
157 Required,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168 Default,
170 Short,
172 Extended,
174}
175
176#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179 #[default]
181 Automatic,
182 Explicit {
184 breakpoints: Vec<PromptCacheBreakpoint>,
186 },
187}
188
189impl PromptCacheStrategy {
190 pub fn automatic() -> Self {
193 Self::Automatic
194 }
195
196 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198 Self::Explicit {
199 breakpoints: breakpoints.into_iter().collect(),
200 }
201 }
202}
203
204#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207 ToolsEnd,
209 TranscriptItemEnd { index: usize },
211 TranscriptPartEnd {
217 item_index: usize,
218 part_index: usize,
219 },
220}
221
222impl PromptCacheBreakpoint {
223 pub fn tools_end() -> Self {
225 Self::ToolsEnd
226 }
227
228 pub fn transcript_item_end(index: usize) -> Self {
230 Self::TranscriptItemEnd { index }
231 }
232
233 pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235 Self::TranscriptPartEnd {
236 item_index,
237 part_index,
238 }
239 }
240}
241
242#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245 pub mode: PromptCacheMode,
247 pub strategy: PromptCacheStrategy,
249 pub retention: Option<PromptCacheRetention>,
251 pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256 pub fn automatic() -> Self {
258 Self::best_effort(PromptCacheStrategy::automatic())
259 }
260
261 pub fn automatic_required() -> Self {
263 Self::required(PromptCacheStrategy::automatic())
264 }
265
266 pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268 Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269 }
270
271 pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273 Self::required(PromptCacheStrategy::explicit(breakpoints))
274 }
275
276 pub fn disabled() -> Self {
278 Self {
279 mode: PromptCacheMode::Disabled,
280 strategy: PromptCacheStrategy::Automatic,
281 retention: None,
282 key: None,
283 }
284 }
285
286 pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288 Self {
289 mode: PromptCacheMode::BestEffort,
290 strategy,
291 retention: None,
292 key: None,
293 }
294 }
295
296 pub fn required(strategy: PromptCacheStrategy) -> Self {
298 Self {
299 mode: PromptCacheMode::Required,
300 strategy,
301 retention: None,
302 key: None,
303 }
304 }
305
306 pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308 self.mode = mode;
309 self
310 }
311
312 pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314 self.strategy = strategy;
315 self
316 }
317
318 pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320 self.retention = Some(retention);
321 self
322 }
323
324 pub fn with_key(mut self, key: impl Into<String>) -> Self {
326 self.key = Some(key.into());
327 self
328 }
329
330 pub fn without_retention(mut self) -> Self {
332 self.retention = None;
333 self
334 }
335
336 pub fn without_key(mut self) -> Self {
338 self.key = None;
339 self
340 }
341
342 pub fn is_enabled(&self) -> bool {
344 !matches!(self.mode, PromptCacheMode::Disabled)
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355 pub session_id: SessionId,
357 pub turn_id: agentkit_core::TurnId,
359 pub transcript: Vec<Item>,
361 pub available_tools: Vec<ToolSpec>,
363 pub cache: Option<PromptCacheRequest>,
365 pub metadata: MetadataMap,
367}
368
369#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375 pub finish_reason: FinishReason,
377 pub output_items: Vec<Item>,
379 pub usage: Option<Usage>,
381 pub metadata: MetadataMap,
383 #[serde(default)]
387 pub model: Option<String>,
388 #[serde(default)]
392 pub response_id: Option<String>,
393}
394
395#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401pub enum ModelTurnEvent {
402 Delta(Delta),
404 ToolCall(ToolCallPart),
406 Usage(Usage),
408 Finished(ModelTurnResult),
410}
411
412#[async_trait]
449pub trait ModelAdapter: Send + Sync {
450 type Session: ModelSession;
452
453 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
459
460 fn provider_name(&self) -> Option<&str> {
467 None
468 }
469}
470
471#[async_trait]
478pub trait ModelSession: Send {
479 type Turn: ModelTurn;
481
482 async fn begin_turn(
495 &mut self,
496 request: TurnRequest,
497 cancellation: Option<TurnCancellation>,
498 ) -> Result<Self::Turn, LoopError>;
499
500 fn model_name(&self) -> Option<&str> {
506 None
507 }
508}
509
510#[async_trait]
516pub trait ModelTurn: Send {
517 async fn next_event(
526 &mut self,
527 cancellation: Option<TurnCancellation>,
528 ) -> Result<Option<ModelTurnEvent>, LoopError>;
529}
530
531pub trait LoopObserver: Send + Sync {
551 fn handle_event(&self, event: AgentEvent);
556}
557
558pub trait TranscriptObserver: Send + Sync {
593 fn on_item_appended(&self, item: &Item);
598}
599
600#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
605#[non_exhaustive]
606pub enum MutationPoint {
607 AfterToolResult,
610 AfterTurnEnded,
613}
614
615pub trait EventEmitter: Send + Sync {
618 fn emit(&self, event: AgentEvent);
620}
621
622#[non_exhaustive]
624pub struct LoopCtx<'a> {
625 pub session_id: &'a SessionId,
627 pub turn_id: Option<&'a agentkit_core::TurnId>,
629 pub point: MutationPoint,
631 pub cancellation: Option<TurnCancellation>,
633 pub emitter: &'a dyn EventEmitter,
635}
636
637pub struct TranscriptCursor<'a> {
645 items: &'a mut Vec<Item>,
646 pub(crate) dirty: bool,
647}
648
649impl<'a> std::ops::Deref for TranscriptCursor<'a> {
650 type Target = Vec<Item>;
651 fn deref(&self) -> &Vec<Item> {
652 self.items
653 }
654}
655
656impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
657 fn deref_mut(&mut self) -> &mut Vec<Item> {
658 self.dirty = true;
659 self.items
660 }
661}
662
663#[async_trait]
671pub trait LoopMutator: Send + Sync {
672 async fn mutate(
677 &self,
678 cursor: &mut TranscriptCursor<'_>,
679 ctx: LoopCtx<'_>,
680 ) -> Result<(), LoopError> {
681 let _ = (cursor, ctx);
682 Ok(())
683 }
684}
685
686#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
691pub enum AgentEvent {
692 RunStarted { session_id: SessionId },
694 TurnStarted {
696 session_id: SessionId,
697 turn_id: agentkit_core::TurnId,
698 },
699 InputAccepted {
701 session_id: SessionId,
702 items: Vec<Item>,
703 },
704 ContentDelta(Delta),
706 ToolCallRequested(ToolCallPart),
708 ToolResultReceived(ToolResultPart),
720 ApprovalRequired(ApprovalRequest),
722 ApprovalResolved { approved: bool },
724 ToolCatalogChanged(ToolCatalogEvent),
726 MutationStarted {
729 session_id: SessionId,
730 turn_id: Option<agentkit_core::TurnId>,
731 mutator: String,
732 point: MutationPoint,
733 },
734 MutationFinished {
738 session_id: SessionId,
739 turn_id: Option<agentkit_core::TurnId>,
740 mutator: String,
741 dirty: bool,
742 metadata: MetadataMap,
743 },
744 UsageUpdated(Usage),
746 Warning { message: String },
748 RunFailed { message: String },
750 TurnFinished(TurnResult),
752}
753
754#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
776pub struct PendingApproval {
777 pub request: ApprovalRequest,
779}
780
781impl std::ops::Deref for PendingApproval {
782 type Target = ApprovalRequest;
783 fn deref(&self) -> &ApprovalRequest {
784 &self.request
785 }
786}
787
788impl PendingApproval {
789 pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
791 let call_id = self
792 .request
793 .call_id
794 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
795 driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
796 }
797
798 pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
800 let call_id = self
801 .request
802 .call_id
803 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
804 driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
805 }
806
807 pub fn deny_with_reason<S: ModelSession>(
809 self,
810 driver: &mut LoopDriver<S>,
811 reason: impl Into<String>,
812 ) -> Result<(), LoopError> {
813 let call_id = self
814 .request
815 .call_id
816 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
817 driver.resolve_approval_for(
818 call_id,
819 ApprovalDecision::Deny {
820 reason: Some(reason.into()),
821 },
822 )
823 }
824
825 pub fn approve_with_patched_input<S: ModelSession>(
835 self,
836 driver: &mut LoopDriver<S>,
837 input: serde_json::Value,
838 ) -> Result<(), LoopError> {
839 let call_id = self
840 .request
841 .call_id
842 .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
843 driver.resolve_approval_for_with_patched_input(call_id, input)
844 }
845}
846
847#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
872pub struct InputRequest {
873 pub session_id: SessionId,
875 pub reason: String,
877}
878
879impl InputRequest {
880 pub fn submit<S: ModelSession>(
882 self,
883 driver: &mut LoopDriver<S>,
884 items: Vec<Item>,
885 ) -> Result<(), LoopError> {
886 driver.submit_input(items)
887 }
888}
889
890#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
895pub struct TurnResult {
896 pub turn_id: agentkit_core::TurnId,
898 pub finish_reason: FinishReason,
900 pub items: Vec<Item>,
902 pub usage: Option<Usage>,
904 pub metadata: MetadataMap,
906}
907
908#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
944pub enum LoopInterrupt {
945 ApprovalRequest(PendingApproval),
947 AwaitingInput(InputRequest),
949 AfterToolResult(ToolRoundInfo),
959}
960
961impl LoopInterrupt {
962 pub fn is_blocking(&self) -> bool {
968 matches!(self, LoopInterrupt::ApprovalRequest(_))
969 }
970}
971
972#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
975pub struct ToolRoundInfo {
976 pub session_id: SessionId,
978 pub turn_id: agentkit_core::TurnId,
980 pub transcript_len: usize,
982}
983
984impl ToolRoundInfo {
985 pub fn submit<S: ModelSession>(
988 self,
989 driver: &mut LoopDriver<S>,
990 items: Vec<Item>,
991 ) -> Result<(), LoopError> {
992 driver.submit_input(items)
993 }
994}
995
996#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1024pub enum LoopStep {
1025 Interrupt(LoopInterrupt),
1027 Finished(TurnResult),
1029}
1030
1031#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1037pub struct LoopSnapshot {
1038 pub session_id: SessionId,
1040 pub transcript: Vec<Item>,
1042 pub pending_input: Vec<Item>,
1044}
1045
1046#[derive(Clone, Debug)]
1047struct PendingApprovalToolCall {
1048 request: ApprovalRequest,
1049 decision: Option<ApprovalDecision>,
1050 surfaced: bool,
1051 turn_id: agentkit_core::TurnId,
1052 task_id: TaskId,
1053 call: ToolCallPart,
1054 tool_request: ToolRequest,
1055}
1056
1057#[derive(Clone, Debug, Default)]
1058struct ActiveToolRound {
1059 turn_id: agentkit_core::TurnId,
1060 pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1061 background_pending: bool,
1062 foreground_progressed: bool,
1063}
1064
1065pub struct Agent<M>
1107where
1108 M: ModelAdapter,
1109{
1110 model: M,
1111 tool_sources: Vec<Arc<dyn ToolSource>>,
1112 tool_executor: Option<Arc<dyn ToolExecutor>>,
1113 task_manager: Arc<dyn TaskManager>,
1114 permissions: Arc<dyn PermissionChecker>,
1115 resources: Arc<dyn ToolResources>,
1116 cancellation: Option<CancellationHandle>,
1117 mutators: Vec<Arc<dyn LoopMutator>>,
1118 observers: Vec<Arc<dyn LoopObserver>>,
1119 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1120 transcript: Vec<Item>,
1121 input: Vec<Item>,
1122}
1123
1124impl<M> Agent<M>
1125where
1126 M: ModelAdapter,
1127{
1128 pub fn builder() -> AgentBuilder<M> {
1130 AgentBuilder::default()
1131 }
1132
1133 pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1149 let session_id = config.session_id.clone();
1150 let default_cache = config.cache.clone();
1151 let session = self.model.start_session(config).await?;
1152 let tool_executor = self
1153 .tool_executor
1154 .clone()
1155 .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1156 let driver = LoopDriver {
1157 session_id: session_id.clone(),
1158 provider_name: self.model.provider_name().map(str::to_owned),
1159 default_cache,
1160 next_turn_cache: None,
1161 session: Some(session),
1162 tool_executor,
1163 task_manager: self.task_manager.clone(),
1164 permissions: self.permissions.clone(),
1165 resources: self.resources.clone(),
1166 cancellation: self.cancellation.clone(),
1167 mutators: self.mutators.clone(),
1168 observers: self.observers.clone(),
1169 transcript_observers: self.transcript_observers.clone(),
1170 transcript: self.transcript.clone(),
1171 pending_input: self.input.clone(),
1172 pending_approvals: BTreeMap::new(),
1173 pending_approval_order: VecDeque::new(),
1174 active_tool_round: None,
1175 pending_round_resume: None,
1176 next_turn_index: 1,
1177 detached_call_ids: HashSet::new(),
1178 };
1179 driver.emit(AgentEvent::RunStarted { session_id });
1180 Ok(driver)
1181 }
1182}
1183
1184pub struct AgentBuilder<M>
1190where
1191 M: ModelAdapter,
1192{
1193 model: Option<M>,
1194 tool_sources: Vec<Arc<dyn ToolSource>>,
1195 tool_executor: Option<Arc<dyn ToolExecutor>>,
1196 task_manager: Option<Arc<dyn TaskManager>>,
1197 permissions: Arc<dyn PermissionChecker>,
1198 resources: Arc<dyn ToolResources>,
1199 cancellation: Option<CancellationHandle>,
1200 mutators: Vec<Arc<dyn LoopMutator>>,
1201 observers: Vec<Arc<dyn LoopObserver>>,
1202 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1203 transcript: Vec<Item>,
1204 input: Vec<Item>,
1205}
1206
1207impl<M> Default for AgentBuilder<M>
1208where
1209 M: ModelAdapter,
1210{
1211 fn default() -> Self {
1212 Self {
1213 model: None,
1214 tool_sources: Vec::new(),
1215 tool_executor: None,
1216 task_manager: None,
1217 permissions: Arc::new(AllowAllPermissions),
1218 resources: Arc::new(()),
1219 cancellation: None,
1220 mutators: Vec::new(),
1221 observers: Vec::new(),
1222 transcript_observers: Vec::new(),
1223 transcript: Vec::new(),
1224 input: Vec::new(),
1225 }
1226 }
1227}
1228
1229impl<M> AgentBuilder<M>
1230where
1231 M: ModelAdapter,
1232{
1233 pub fn model(mut self, model: M) -> Self {
1235 self.model = Some(model);
1236 self
1237 }
1238
1239 pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1252 self.tool_sources.push(Arc::new(source));
1253 self
1254 }
1255
1256 pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1262 self.tool_executor = Some(Arc::new(executor));
1263 self
1264 }
1265
1266 pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1271 self.task_manager = Some(Arc::new(manager));
1272 self
1273 }
1274
1275 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1279 self.permissions = Arc::new(permissions);
1280 self
1281 }
1282
1283 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1285 self.resources = Arc::new(resources);
1286 self
1287 }
1288
1289 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1291 self.cancellation = Some(handle);
1292 self
1293 }
1294
1295 pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1303 self.mutators.push(Arc::new(mutator));
1304 self
1305 }
1306
1307 pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1311 self.observers.push(Arc::new(observer));
1312 self
1313 }
1314
1315 pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1324 self.transcript_observers.push(Arc::new(observer));
1325 self
1326 }
1327
1328 pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1336 self.transcript = transcript;
1337 self
1338 }
1339
1340 pub fn input(mut self, input: Vec<Item>) -> Self {
1349 self.input = input;
1350 self
1351 }
1352
1353 pub fn build(self) -> Result<Agent<M>, LoopError> {
1359 let model = self
1360 .model
1361 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1362 Ok(Agent {
1363 model,
1364 tool_sources: self.tool_sources,
1365 tool_executor: self.tool_executor,
1366 task_manager: self
1367 .task_manager
1368 .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1369 permissions: self.permissions,
1370 resources: self.resources,
1371 cancellation: self.cancellation,
1372 mutators: self.mutators,
1373 observers: self.observers,
1374 transcript_observers: self.transcript_observers,
1375 transcript: self.transcript,
1376 input: self.input,
1377 })
1378 }
1379}
1380
1381pub struct LoopDriver<S>
1412where
1413 S: ModelSession,
1414{
1415 session_id: SessionId,
1416 provider_name: Option<String>,
1417 default_cache: Option<PromptCacheRequest>,
1418 next_turn_cache: Option<PromptCacheRequest>,
1419 session: Option<S>,
1420 tool_executor: Arc<dyn ToolExecutor>,
1421 task_manager: Arc<dyn TaskManager>,
1422 permissions: Arc<dyn PermissionChecker>,
1423 resources: Arc<dyn ToolResources>,
1424 cancellation: Option<CancellationHandle>,
1425 mutators: Vec<Arc<dyn LoopMutator>>,
1426 observers: Vec<Arc<dyn LoopObserver>>,
1427 transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1428 transcript: Vec<Item>,
1429 pending_input: Vec<Item>,
1430 pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1431 pending_approval_order: VecDeque<ToolCallId>,
1432 active_tool_round: Option<ActiveToolRound>,
1433 pending_round_resume: Option<agentkit_core::TurnId>,
1434 next_turn_index: u64,
1435 detached_call_ids: HashSet<ToolCallId>,
1443}
1444
1445impl<S> LoopDriver<S>
1446where
1447 S: ModelSession,
1448{
1449 fn execute_tool_span(
1450 &self,
1451 request: &ToolRequest,
1452 turn_id: &agentkit_core::TurnId,
1453 launch_kind: &'static str,
1454 ) -> tracing::Span {
1455 tracing::info_span!(
1456 "agent.execute_tool",
1457 "otel.name" = %format!("execute_tool {}", request.tool_name),
1458 "gen_ai.operation.name" = "execute_tool",
1459 "gen_ai.tool.name" = %request.tool_name,
1460 "gen_ai.tool.call.id" = %request.call_id,
1461 "gen_ai.conversation.id" = %self.session_id,
1462 "error.type" = tracing::field::Empty,
1463 session.id = %self.session_id,
1464 turn.id = %turn_id,
1465 launch_kind = launch_kind,
1466 )
1467 }
1468
1469 fn start_task_via_manager(
1470 &self,
1471 task_id: Option<TaskId>,
1472 tool_request: ToolRequest,
1473 kind: TaskLaunchKind,
1474 cancellation: Option<TurnCancellation>,
1475 ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1476 {
1477 let task_manager = self.task_manager.clone();
1478 let tool_executor = self.tool_executor.clone();
1479 let permissions = self.permissions.clone();
1480 let resources = self.resources.clone();
1481 let session_id = self.session_id.clone();
1482 let turn_id = tool_request.turn_id.clone();
1483 let metadata = tool_request.metadata.clone();
1484
1485 async move {
1486 task_manager
1487 .start_task(
1488 TaskLaunchRequest {
1489 task_id,
1490 request: tool_request.clone(),
1491 kind,
1492 },
1493 TaskStartContext {
1494 executor: tool_executor.clone(),
1495 tool_context: {
1496 let execution_scope = ToolExecutionScope {
1497 executor: tool_executor,
1498 session_id: session_id.clone(),
1499 turn_id: turn_id.clone(),
1500 permissions: permissions.clone(),
1501 resources: resources.clone(),
1502 cancellation: cancellation.clone(),
1503 };
1504 OwnedToolContext {
1505 session_id,
1506 turn_id,
1507 metadata,
1508 permissions,
1509 resources,
1510 cancellation,
1511 execution_scope: Some(execution_scope),
1512 approved_request: None,
1513 }
1514 },
1515 },
1516 )
1517 .await
1518 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1519 }
1520 }
1521
1522 fn has_pending_interrupts(&self) -> bool {
1523 !self.pending_approvals.is_empty()
1524 }
1525
1526 fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1527 for event in events {
1528 self.emit(AgentEvent::ToolCatalogChanged(event));
1529 }
1530 }
1531
1532 fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1533 let call_id = task.tool_request.call_id.clone();
1534 let call = ToolCallPart {
1535 id: call_id.clone(),
1536 name: task.tool_request.tool_name.to_string(),
1537 input: task.tool_request.input.clone(),
1538 metadata: task.tool_request.metadata.clone(),
1539 };
1540 let mut request = task.approval;
1541 request.call_id = Some(call_id.clone());
1542 let pending = PendingApprovalToolCall {
1543 request: request.clone(),
1544 decision: None,
1545 surfaced: false,
1546 turn_id: turn_id.clone(),
1547 task_id: task.task_id,
1548 call,
1549 tool_request: task.tool_request,
1550 };
1551 self.pending_approvals.insert(call_id.clone(), pending);
1552 if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1553 self.pending_approval_order.push_back(call_id);
1554 }
1555 self.emit(AgentEvent::ApprovalRequired(request));
1556 }
1557
1558 fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1559 for call_id in self.pending_approval_order.clone() {
1560 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1561 continue;
1562 };
1563 if pending.decision.is_none() && !pending.surfaced {
1564 pending.surfaced = true;
1565 return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1566 PendingApproval {
1567 request: pending.request.clone(),
1568 },
1569 )));
1570 }
1571 }
1572 None
1573 }
1574
1575 fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1576 self.pending_approval_order.iter().find_map(|call_id| {
1577 self.pending_approvals.get(call_id).and_then(|pending| {
1578 pending.decision.is_none().then(|| {
1579 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1580 request: pending.request.clone(),
1581 }))
1582 })
1583 })
1584 })
1585 }
1586
1587 fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1588 let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1589 self.pending_approvals
1590 .get(call_id)
1591 .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1592 })?;
1593 self.pending_approval_order.retain(|id| id != &call_id);
1594 self.pending_approvals.remove(&call_id)
1595 }
1596
1597 fn queue_resolution_interrupt(
1598 &mut self,
1599 turn_id: &agentkit_core::TurnId,
1600 resolution: TaskResolution,
1601 ) -> Option<LoopStep> {
1602 match resolution {
1603 TaskResolution::Item(item) => {
1604 self.append_tool_result_item(item);
1605 None
1606 }
1607 TaskResolution::Approval(task) => {
1608 self.enqueue_pending_approval(turn_id, task);
1609 self.take_next_unsurfaced_approval_interrupt()
1610 }
1611 }
1612 }
1613
1614 async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1615 let PendingLoopUpdates { mut resolutions } = self
1616 .task_manager
1617 .take_pending_loop_updates()
1618 .await
1619 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1620 let mut saw_items = false;
1621 while let Some(resolution) = resolutions.pop_front() {
1622 match resolution {
1623 TaskResolution::Item(item) => {
1624 self.append_tool_result_item(item);
1625 saw_items = true;
1626 }
1627 TaskResolution::Approval(task) => {
1628 self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1629 }
1630 }
1631 }
1632 Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1633 }
1634
1635 async fn run_mutators(
1636 &mut self,
1637 point: MutationPoint,
1638 turn_id: Option<&agentkit_core::TurnId>,
1639 cancellation: Option<TurnCancellation>,
1640 ) -> Result<(), LoopError> {
1641 if self.mutators.is_empty() {
1642 return Ok(());
1643 }
1644 if cancellation
1645 .as_ref()
1646 .is_some_and(TurnCancellation::is_cancelled)
1647 {
1648 return Err(LoopError::Cancelled);
1649 }
1650 let mutators = self.mutators.clone();
1651 let session_id = self.session_id.clone();
1652 let observers = self.observers.clone();
1653 let emitter = DriverEmitter {
1654 observers: &observers,
1655 };
1656 let mut cursor = TranscriptCursor {
1657 items: &mut self.transcript,
1658 dirty: false,
1659 };
1660 for mutator in &mutators {
1661 if cancellation
1662 .as_ref()
1663 .is_some_and(TurnCancellation::is_cancelled)
1664 {
1665 return Err(LoopError::Cancelled);
1666 }
1667 let ctx = LoopCtx {
1668 session_id: &session_id,
1669 turn_id,
1670 point,
1671 cancellation: cancellation.clone(),
1672 emitter: &emitter,
1673 };
1674 mutator.mutate(&mut cursor, ctx).await?;
1675 }
1676 if cursor.dirty {
1677 validate_transcript_invariants(cursor.items)?;
1678 }
1679 Ok(())
1680 }
1681
1682 async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1683 let Some(_) = self.active_tool_round.as_ref() else {
1684 return Ok(None);
1685 };
1686 loop {
1687 let cancellation = self
1688 .cancellation
1689 .as_ref()
1690 .map(CancellationHandle::checkpoint);
1691 let turn_id = self
1692 .active_tool_round
1693 .as_ref()
1694 .map(|active| active.turn_id.clone())
1695 .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1696
1697 if cancellation
1698 .as_ref()
1699 .is_some_and(TurnCancellation::is_cancelled)
1700 {
1701 self.task_manager
1702 .on_turn_interrupted(&turn_id)
1703 .await
1704 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1705 self.active_tool_round = None;
1706 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1707 }
1708
1709 let next_call = self
1710 .active_tool_round
1711 .as_mut()
1712 .and_then(|active| active.pending_calls.pop_front());
1713 if let Some((_call, tool_request)) = next_call {
1714 use tracing::Instrument;
1715 let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1716 match self
1717 .start_task_via_manager(
1718 None,
1719 tool_request.clone(),
1720 TaskLaunchKind::Plain,
1721 cancellation.clone(),
1722 )
1723 .instrument(dispatch_span.clone())
1724 .await?
1725 {
1726 TaskStartOutcome::Ready(resolution) => {
1727 let resolution = *resolution;
1728 match resolution {
1729 TaskResolution::Item(item) => {
1730 if tool_result_is_error(&item) {
1731 dispatch_span.record("error.type", "tool_error");
1732 }
1733 if let Some(active) = self.active_tool_round.as_mut() {
1734 active.foreground_progressed = true;
1735 }
1736 self.append_tool_result_item(item);
1737 }
1738 TaskResolution::Approval(task) => {
1739 self.enqueue_pending_approval(&turn_id, task);
1740 }
1741 }
1742 continue;
1743 }
1744 TaskStartOutcome::Pending { kind, .. } => {
1745 if kind == agentkit_task_manager::TaskKind::Background
1746 && let Some(active) = self.active_tool_round.as_mut()
1747 {
1748 active.background_pending = true;
1749 }
1750 continue;
1751 }
1752 }
1753 }
1754
1755 match self
1756 .task_manager
1757 .wait_for_turn(&turn_id, cancellation.clone())
1758 .await
1759 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1760 {
1761 Some(TurnTaskUpdate::Resolution(resolution)) => {
1762 let resolution = *resolution;
1763 match resolution {
1764 TaskResolution::Item(item) => {
1765 if let Some(active) = self.active_tool_round.as_mut() {
1766 active.foreground_progressed = true;
1767 }
1768 self.append_tool_result_item(item);
1769 }
1770 TaskResolution::Approval(task) => {
1771 self.enqueue_pending_approval(&turn_id, task);
1772 }
1773 }
1774 }
1775 Some(TurnTaskUpdate::Detached(snapshot)) => {
1776 let detached_call_id = snapshot.call_id.clone();
1794 self.append_tool_result_item(Item {
1795 id: None,
1796 kind: ItemKind::Tool,
1797 parts: vec![Part::ToolResult(ToolResultPart {
1798 call_id: detached_call_id.clone(),
1799 output: ToolOutput::Text(format!(
1800 "Tool {} is now running in the background. \
1801 The result will be delivered when it completes.",
1802 snapshot.tool_name,
1803 )),
1804 is_error: false,
1805 metadata: MetadataMap::new(),
1806 })],
1807 metadata: MetadataMap::new(),
1808 usage: None,
1809 finish_reason: None,
1810 created_at: None,
1811 });
1812 self.detached_call_ids.insert(detached_call_id);
1813 if let Some(active) = self.active_tool_round.as_mut() {
1814 active.background_pending = true;
1815 active.foreground_progressed = true;
1816 }
1817 }
1818 None => {
1819 if cancellation
1820 .as_ref()
1821 .is_some_and(TurnCancellation::is_cancelled)
1822 {
1823 self.task_manager
1824 .on_turn_interrupted(&turn_id)
1825 .await
1826 .map_err(|error| {
1827 LoopError::Tool(ToolError::Internal(error.to_string()))
1828 })?;
1829 self.active_tool_round = None;
1830 return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1831 }
1832 let active = self.active_tool_round.take().ok_or_else(|| {
1833 LoopError::InvalidState("missing active tool round".into())
1834 })?;
1835 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1836 return Ok(Some(step));
1837 }
1838 if let Some(step) = self.next_unresolved_approval_interrupt() {
1839 return Ok(Some(step));
1840 }
1841 if active.background_pending && !active.foreground_progressed {
1842 return Ok(None);
1843 }
1844 let info = ToolRoundInfo {
1851 session_id: self.session_id.clone(),
1852 turn_id: turn_id.clone(),
1853 transcript_len: self.transcript.len(),
1854 };
1855 self.pending_round_resume = Some(turn_id);
1856 return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1857 info,
1858 ))));
1859 }
1860 }
1861 }
1862 }
1863
1864 #[tracing::instrument(
1865 name = "agent.turn",
1866 skip_all,
1867 fields(
1868 otel.name = "invoke_agent",
1869 gen_ai.operation.name = "invoke_agent",
1870 gen_ai.conversation.id = %self.session_id,
1871 gen_ai.provider.name = tracing::field::Empty,
1872 gen_ai.usage.input_tokens = tracing::field::Empty,
1873 gen_ai.usage.output_tokens = tracing::field::Empty,
1874 session.id = %self.session_id,
1875 turn.id = %turn_id,
1876 transcript.len = self.transcript.len(),
1877 saw_tool_call = tracing::field::Empty,
1878 finish_reason = tracing::field::Empty,
1879 ),
1880 )]
1881 async fn drive_turn(
1882 &mut self,
1883 turn_id: agentkit_core::TurnId,
1884 emit_started: bool,
1885 ) -> Result<LoopStep, LoopError> {
1886 if let Some(provider) = &self.provider_name {
1887 tracing::Span::current().record("gen_ai.provider.name", provider.as_str());
1888 }
1889 let cancellation = self
1890 .cancellation
1891 .as_ref()
1892 .map(CancellationHandle::checkpoint);
1893 let mutation_point = if self.pending_round_resume.is_some() {
1894 MutationPoint::AfterToolResult
1895 } else {
1896 MutationPoint::AfterTurnEnded
1897 };
1898 match self
1899 .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1900 .await
1901 {
1902 Ok(()) => {}
1903 Err(LoopError::Cancelled) => {
1904 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1905 }
1906 Err(error) => return Err(error),
1907 }
1908 if emit_started {
1909 self.emit(AgentEvent::TurnStarted {
1910 session_id: self.session_id.clone(),
1911 turn_id: turn_id.clone(),
1912 });
1913 }
1914 if cancellation
1915 .as_ref()
1916 .is_some_and(TurnCancellation::is_cancelled)
1917 {
1918 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1919 }
1920
1921 let catalog_events = self.tool_executor.drain_catalog_events();
1922 self.emit_tool_catalog_events(catalog_events);
1923
1924 let request = TurnRequest {
1925 session_id: self.session_id.clone(),
1926 turn_id: turn_id.clone(),
1927 transcript: self.transcript.clone(),
1928 available_tools: self.tool_executor.specs(),
1929 cache: self
1930 .next_turn_cache
1931 .take()
1932 .or_else(|| self.default_cache.clone()),
1933 metadata: MetadataMap::new(),
1934 };
1935
1936 let session = self
1937 .session
1938 .as_mut()
1939 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1940
1941 let chat_span = tracing::info_span!(
1948 "chat",
1949 "otel.name" = tracing::field::Empty,
1950 "otel.kind" = "client",
1951 "gen_ai.operation.name" = "chat",
1952 "gen_ai.provider.name" = tracing::field::Empty,
1953 "gen_ai.conversation.id" = %self.session_id,
1954 "gen_ai.request.model" = tracing::field::Empty,
1955 "gen_ai.response.model" = tracing::field::Empty,
1956 "gen_ai.response.id" = tracing::field::Empty,
1957 "gen_ai.response.finish_reasons" = tracing::field::Empty,
1958 "gen_ai.usage.input_tokens" = tracing::field::Empty,
1959 "gen_ai.usage.output_tokens" = tracing::field::Empty,
1960 );
1961 if let Some(provider) = &self.provider_name {
1962 chat_span.record("gen_ai.provider.name", provider.as_str());
1963 }
1964 match session.model_name() {
1965 Some(model) => {
1966 chat_span.record("gen_ai.request.model", model);
1967 chat_span.record("otel.name", format!("chat {model}").as_str());
1968 }
1969 None => {
1970 chat_span.record("otel.name", "chat");
1971 }
1972 }
1973
1974 use tracing::Instrument;
1975 let mut turn = match session
1976 .begin_turn(request, cancellation.clone())
1977 .instrument(chat_span.clone())
1978 .await
1979 {
1980 Ok(turn) => turn,
1981 Err(LoopError::Cancelled) => {
1982 self.task_manager
1983 .on_turn_interrupted(&turn_id)
1984 .await
1985 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1986 return self.finish_cancelled(turn_id, interrupted_assistant_items());
1987 }
1988 Err(error) => return Err(error),
1989 };
1990 let mut saw_tool_call = false;
1991 let mut finished_result = None;
1992
1993 while let Some(event) = match turn
1994 .next_event(cancellation.clone())
1995 .instrument(chat_span.clone())
1996 .await
1997 {
1998 Ok(event) => event,
1999 Err(LoopError::Cancelled) => {
2000 self.task_manager
2001 .on_turn_interrupted(&turn_id)
2002 .await
2003 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2004 return self.finish_cancelled(turn_id, interrupted_assistant_items());
2005 }
2006 Err(error) => return Err(error),
2007 } {
2008 if cancellation
2009 .as_ref()
2010 .is_some_and(TurnCancellation::is_cancelled)
2011 {
2012 self.task_manager
2013 .on_turn_interrupted(&turn_id)
2014 .await
2015 .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2016 return self.finish_cancelled(turn_id, interrupted_assistant_items());
2017 }
2018 match event {
2019 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
2020 ModelTurnEvent::Usage(usage) => {
2021 if let Some(tokens) = &usage.tokens {
2022 chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2023 chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2024 }
2025 self.emit(AgentEvent::UsageUpdated(usage));
2026 }
2027 ModelTurnEvent::ToolCall(call) => {
2028 saw_tool_call = true;
2029 self.emit(AgentEvent::ToolCallRequested(call.clone()));
2030 }
2031 ModelTurnEvent::Finished(result) => {
2032 finished_result = Some(result);
2033 break;
2034 }
2035 }
2036 }
2037
2038 let mut result = finished_result.ok_or_else(|| {
2039 LoopError::Provider("model turn ended without a Finished event".into())
2040 })?;
2041 if let Some(model) = &result.model {
2042 chat_span.record("gen_ai.response.model", model.as_str());
2043 }
2044 if let Some(id) = &result.response_id {
2045 chat_span.record("gen_ai.response.id", id.as_str());
2046 }
2047 if let Some(tokens) = result
2048 .usage
2049 .as_ref()
2050 .and_then(|usage| usage.tokens.as_ref())
2051 {
2052 chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2053 chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2054 }
2055 chat_span.record(
2056 "gen_ai.response.finish_reasons",
2057 tracing::field::debug(&result.finish_reason),
2058 );
2059 drop(chat_span);
2060 tracing::Span::current().record("saw_tool_call", saw_tool_call);
2061 tracing::Span::current().record(
2062 "finish_reason",
2063 tracing::field::debug(&result.finish_reason),
2064 );
2065 if let Some(tokens) = result
2066 .usage
2067 .as_ref()
2068 .and_then(|usage| usage.tokens.as_ref())
2069 {
2070 tracing::Span::current().record("gen_ai.usage.input_tokens", tokens.input_tokens);
2071 tracing::Span::current().record("gen_ai.usage.output_tokens", tokens.output_tokens);
2072 }
2073 let now = Timestamp::now();
2074 let usage = result.usage.clone();
2075 let finish_reason = result.finish_reason.clone();
2076 let output_items: Vec<Item> = result
2077 .output_items
2078 .drain(..)
2079 .map(|mut item| {
2080 if matches!(item.kind, ItemKind::Assistant) {
2081 if item.usage.is_none() {
2082 item.usage = usage.clone();
2083 }
2084 if item.finish_reason.is_none() {
2085 item.finish_reason = Some(finish_reason.clone());
2086 }
2087 }
2088 if item.created_at.is_none() {
2089 item.created_at = Some(now);
2090 }
2091 item
2092 })
2093 .collect();
2094 self.extend_transcript(output_items.clone());
2095
2096 if saw_tool_call {
2097 let pending_calls = extract_tool_calls(&output_items)
2098 .into_iter()
2099 .map(|call| {
2100 let tool_request = ToolRequest {
2101 call_id: call.id.clone(),
2102 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
2103 input: call.input.clone(),
2104 session_id: self.session_id.clone(),
2105 turn_id: turn_id.clone(),
2106 metadata: call.metadata.clone(),
2107 };
2108 (call, tool_request)
2109 })
2110 .collect();
2111 self.active_tool_round = Some(ActiveToolRound {
2112 turn_id: turn_id.clone(),
2113 pending_calls,
2114 background_pending: false,
2115 foreground_progressed: false,
2116 });
2117 if let Some(step) = self.continue_active_tool_round().await? {
2118 return Ok(step);
2119 }
2120 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2121 InputRequest {
2122 session_id: self.session_id.clone(),
2123 reason: "driver is waiting for input".into(),
2124 },
2125 )));
2126 }
2127
2128 let turn_result = TurnResult {
2129 turn_id,
2130 finish_reason: result.finish_reason,
2131 items: output_items,
2132 usage: result.usage,
2133 metadata: result.metadata,
2134 };
2135 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2136 Ok(LoopStep::Finished(turn_result))
2137 }
2138
2139 async fn resume_after_approval(
2140 &mut self,
2141 pending: PendingApprovalToolCall,
2142 ) -> Result<LoopStep, LoopError> {
2143 let decision = pending
2144 .decision
2145 .clone()
2146 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2147
2148 match decision {
2149 ApprovalDecision::Approve => {
2150 use tracing::Instrument;
2151 let dispatch_span =
2152 self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2153 match self
2154 .start_task_via_manager(
2155 Some(pending.task_id.clone()),
2156 pending.tool_request.clone(),
2157 TaskLaunchKind::Approved(pending.request.clone()),
2158 self.cancellation
2159 .as_ref()
2160 .map(CancellationHandle::checkpoint),
2161 )
2162 .instrument(dispatch_span.clone())
2163 .await?
2164 {
2165 TaskStartOutcome::Ready(resolution) => {
2166 let resolution = *resolution;
2167 if let TaskResolution::Item(item) = &resolution
2168 && tool_result_is_error(item)
2169 {
2170 dispatch_span.record("error.type", "tool_error");
2171 }
2172 if let Some(step) =
2173 self.queue_resolution_interrupt(&pending.turn_id, resolution)
2174 {
2175 return Ok(step);
2176 }
2177 }
2178 TaskStartOutcome::Pending { .. } => {}
2179 }
2180 }
2181 ApprovalDecision::Deny { reason } => {
2182 self.append_tool_result_item(Item {
2183 id: None,
2184 kind: ItemKind::Tool,
2185 parts: vec![Part::ToolResult(ToolResultPart {
2186 call_id: pending.call.id.clone(),
2187 output: ToolOutput::Text(
2188 reason.unwrap_or_else(|| "approval denied".into()),
2189 ),
2190 is_error: true,
2191 metadata: pending.call.metadata.clone(),
2192 })],
2193 metadata: MetadataMap::new(),
2194 usage: None,
2195 finish_reason: None,
2196 created_at: None,
2197 });
2198 }
2199 }
2200
2201 if let Some(step) = self.continue_active_tool_round().await? {
2202 Ok(step)
2203 } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2204 Ok(step)
2205 } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2206 Ok(step)
2207 } else {
2208 self.drive_turn(pending.turn_id, false).await
2209 }
2210 }
2211
2212 fn finish_cancelled(
2213 &mut self,
2214 turn_id: agentkit_core::TurnId,
2215 items: Vec<Item>,
2216 ) -> Result<LoopStep, LoopError> {
2217 self.extend_transcript(items.clone());
2218 let turn_result = TurnResult {
2219 turn_id,
2220 finish_reason: FinishReason::Cancelled,
2221 items,
2222 usage: None,
2223 metadata: interrupted_metadata("turn"),
2224 };
2225 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2226 Ok(LoopStep::Finished(turn_result))
2227 }
2228
2229 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2239 if self.has_pending_interrupts() {
2240 return Err(LoopError::InvalidState(
2241 "cannot submit input while an interrupt is pending".into(),
2242 ));
2243 }
2244 self.emit(AgentEvent::InputAccepted {
2245 session_id: self.session_id.clone(),
2246 items: input.clone(),
2247 });
2248 self.pending_input.extend(input);
2249 Ok(())
2250 }
2251
2252 pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2257 if self.has_pending_interrupts() {
2258 return Err(LoopError::InvalidState(
2259 "cannot update next-turn cache while an interrupt is pending".into(),
2260 ));
2261 }
2262 self.next_turn_cache = Some(cache);
2263 Ok(())
2264 }
2265
2266 #[cfg(test)]
2267 pub(crate) fn submit_input_with_cache(
2268 &mut self,
2269 input: Vec<Item>,
2270 cache: PromptCacheRequest,
2271 ) -> Result<(), LoopError> {
2272 self.set_next_turn_cache(cache)?;
2273 self.submit_input(input)
2274 }
2275
2276 pub fn resolve_approval_for(
2286 &mut self,
2287 call_id: ToolCallId,
2288 decision: ApprovalDecision,
2289 ) -> Result<(), LoopError> {
2290 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2291 return Err(LoopError::InvalidState(format!(
2292 "no approval request is pending for call {}",
2293 call_id.0
2294 )));
2295 };
2296 pending.decision = Some(decision.clone());
2297 self.emit(AgentEvent::ApprovalResolved {
2298 approved: matches!(decision, ApprovalDecision::Approve),
2299 });
2300 Ok(())
2301 }
2302
2303 pub fn resolve_approval_for_with_patched_input(
2316 &mut self,
2317 call_id: ToolCallId,
2318 input: serde_json::Value,
2319 ) -> Result<(), LoopError> {
2320 let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2321 return Err(LoopError::InvalidState(format!(
2322 "no approval request is pending for call {}",
2323 call_id.0
2324 )));
2325 };
2326 pending.tool_request.input = input;
2327 self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2328 }
2329
2330 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2333 let mut unresolved = self
2334 .pending_approval_order
2335 .iter()
2336 .filter(|call_id| {
2337 self.pending_approvals
2338 .get(*call_id)
2339 .is_some_and(|pending| pending.decision.is_none())
2340 })
2341 .cloned();
2342 let Some(call_id) = unresolved.next() else {
2343 return Err(LoopError::InvalidState(
2344 "no approval request is pending".into(),
2345 ));
2346 };
2347 if unresolved.next().is_some() {
2348 return Err(LoopError::InvalidState(
2349 "multiple approvals are pending; use resolve_approval_for".into(),
2350 ));
2351 }
2352 self.resolve_approval_for(call_id, decision)
2353 }
2354
2355 pub fn snapshot(&self) -> LoopSnapshot {
2357 LoopSnapshot {
2358 session_id: self.session_id.clone(),
2359 transcript: self.transcript.clone(),
2360 pending_input: self.pending_input.clone(),
2361 }
2362 }
2363
2364 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2385 if let Some(pending) = self.take_next_resolved_approval() {
2386 return self.resume_after_approval(pending).await;
2387 }
2388
2389 if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2390 return Ok(step);
2391 }
2392
2393 if let Some(step) = self.next_unresolved_approval_interrupt() {
2394 return Ok(step);
2395 }
2396
2397 if let Some(step) = self.continue_active_tool_round().await? {
2398 return Ok(step);
2399 }
2400
2401 let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2402 if let Some(step) = loop_step {
2403 return Ok(step);
2404 }
2405
2406 if let Some(turn_id) = self.pending_round_resume.take() {
2411 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2412 self.extend_transcript(drained);
2413 return self.drive_turn(turn_id, false).await;
2414 }
2415
2416 if self.pending_input.is_empty() && !had_loop_updates {
2417 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2418 InputRequest {
2419 session_id: self.session_id.clone(),
2420 reason: "driver is waiting for input".into(),
2421 },
2422 )));
2423 }
2424
2425 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2426 self.next_turn_index += 1;
2427 let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2428 self.extend_transcript(drained);
2429 self.drive_turn(turn_id, true).await
2430 }
2431
2432 fn emit(&self, event: AgentEvent) {
2433 for observer in &self.observers {
2434 observer.handle_event(event.clone());
2435 }
2436 }
2437
2438 fn append_item(&mut self, mut item: Item) {
2443 if item.created_at.is_none() {
2444 item.created_at = Some(Timestamp::now());
2445 }
2446 for observer in &self.transcript_observers {
2447 observer.on_item_appended(&item);
2448 }
2449 self.transcript.push(item);
2450 }
2451
2452 fn append_tool_result_item(&mut self, item: Item) {
2465 for part in &item.parts {
2466 if let Part::ToolResult(result) = part {
2467 self.emit(AgentEvent::ToolResultReceived(result.clone()));
2468 }
2469 }
2470 let item = self.maybe_convert_detached(item);
2471 self.append_item(item);
2472 }
2473
2474 fn maybe_convert_detached(&mut self, item: Item) -> Item {
2475 if !matches!(item.kind, ItemKind::Tool) {
2476 return item;
2477 }
2478 let results: Vec<&ToolResultPart> = item
2479 .parts
2480 .iter()
2481 .filter_map(|p| match p {
2482 Part::ToolResult(r) => Some(r),
2483 _ => None,
2484 })
2485 .collect();
2486 if results.is_empty()
2487 || !results
2488 .iter()
2489 .all(|r| self.detached_call_ids.contains(&r.call_id))
2490 {
2491 return item;
2492 }
2493 let mut text = String::new();
2494 for result in &results {
2495 self.detached_call_ids.remove(&result.call_id);
2496 if !text.is_empty() {
2497 text.push_str("\n\n");
2498 }
2499 let label = if result.is_error {
2500 "failed"
2501 } else {
2502 "completed"
2503 };
2504 let body = render_tool_output_brief(&result.output);
2505 text.push_str(&format!(
2506 "Background tool call {} {}: {body}",
2507 result.call_id.0, label
2508 ));
2509 }
2510 Item::notification(text)
2511 }
2512
2513 fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2517 let now = Timestamp::now();
2518 for mut item in items {
2519 if item.created_at.is_none() {
2520 item.created_at = Some(now);
2521 }
2522 self.append_item(item);
2523 }
2524 }
2525}
2526
2527fn render_tool_output_brief(output: &ToolOutput) -> String {
2528 match output {
2529 ToolOutput::Text(t) => t.clone(),
2530 ToolOutput::Structured(value) => value.to_string(),
2531 ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2532 ToolOutput::Files(files) => format!("[{} files]", files.len()),
2533 }
2534}
2535
2536fn interrupted_metadata(stage: &str) -> MetadataMap {
2537 let mut metadata = MetadataMap::new();
2538 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2539 metadata.insert(
2540 INTERRUPT_REASON_METADATA_KEY.into(),
2541 USER_CANCELLED_REASON.into(),
2542 );
2543 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2544 metadata
2545}
2546
2547fn interrupted_assistant_items() -> Vec<Item> {
2548 vec![Item {
2549 id: None,
2550 kind: ItemKind::Assistant,
2551 parts: vec![Part::Text(TextPart {
2552 text: "Previous assistant response was interrupted by the user before completion."
2553 .into(),
2554 metadata: interrupted_metadata("assistant"),
2555 })],
2556 metadata: interrupted_metadata("assistant"),
2557 usage: None,
2558 finish_reason: None,
2559 created_at: None,
2560 }]
2561}
2562
2563fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2564 let mut calls = Vec::new();
2565 for item in items {
2566 for part in &item.parts {
2567 if let Part::ToolCall(call) = part {
2568 calls.push(call.clone());
2569 }
2570 }
2571 }
2572 calls
2573}
2574
2575fn tool_result_is_error(item: &Item) -> bool {
2576 item.parts
2577 .iter()
2578 .any(|part| matches!(part, Part::ToolResult(result) if result.is_error))
2579}
2580
2581#[derive(Debug, Error)]
2583pub enum LoopError {
2584 #[error("invalid driver state: {0}")]
2586 InvalidState(String),
2587 #[error("turn cancelled")]
2589 Cancelled,
2590 #[error("provider error: {0}")]
2592 Provider(String),
2593 #[error("tool error: {0}")]
2595 Tool(#[from] ToolError),
2596 #[error("mutator error: {0}")]
2598 Mutator(String),
2599 #[error("unsupported operation: {0}")]
2601 Unsupported(String),
2602}
2603
2604struct DriverEmitter<'a> {
2609 observers: &'a [Arc<dyn LoopObserver>],
2610}
2611
2612impl<'a> EventEmitter for DriverEmitter<'a> {
2613 fn emit(&self, event: AgentEvent) {
2614 for observer in self.observers {
2615 observer.handle_event(event.clone());
2616 }
2617 }
2618}
2619
2620fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2625 let mut pending: HashSet<ToolCallId> = HashSet::new();
2626 let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2627 let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2628 for item in transcript {
2629 for part in &item.parts {
2630 match part {
2631 Part::ToolCall(call) => {
2632 if !seen_calls.insert(call.id.clone()) {
2633 return Err(LoopError::Mutator(format!(
2634 "transcript invariant violation: duplicate tool_use: {}",
2635 call.id.0
2636 )));
2637 }
2638 pending.insert(call.id.clone());
2639 }
2640 Part::ToolResult(result) => {
2641 if !pending.remove(&result.call_id) {
2642 let kind = if seen_results.contains(&result.call_id) {
2643 "duplicate"
2644 } else {
2645 "orphaned"
2646 };
2647 return Err(LoopError::Mutator(format!(
2648 "transcript invariant violation: {kind} tool_result: {}",
2649 result.call_id.0
2650 )));
2651 }
2652 seen_results.insert(result.call_id.clone());
2653 }
2654 _ => {}
2655 }
2656 }
2657 }
2658 if !pending.is_empty() {
2659 let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2660 return Err(LoopError::Mutator(format!(
2661 "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2662 missing.join(", ")
2663 )));
2664 }
2665 Ok(())
2666}
2667
2668#[cfg(test)]
2669mod tests {
2670 use std::collections::VecDeque;
2671 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2672 use std::sync::{Arc as StdArc, Mutex as StdMutex};
2673
2674 use agentkit_core::{
2675 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2676 ToolResultPart,
2677 };
2678 use agentkit_task_manager::{
2679 AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2680 TaskRoutingPolicy,
2681 };
2682 use agentkit_tools_core::{
2683 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2684 ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2685 ToolResult, ToolSpec,
2686 };
2687 use serde_json::{Value, json};
2688 use tokio::sync::Notify;
2689 use tokio::time::{Duration, timeout};
2690
2691 use super::*;
2692
2693 struct FakeAdapter;
2694 struct SlowAdapter;
2695 struct RecordingAdapter {
2696 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2697 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2698 }
2699 struct MultiToolAdapter;
2700 struct DualApprovalAdapter;
2701
2702 struct FakeSession;
2703 struct SlowSession;
2704 struct RecordingSession {
2705 seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2706 seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2707 }
2708 struct MultiToolSession;
2709 struct DualApprovalSession;
2710
2711 struct FakeTurn {
2712 events: VecDeque<ModelTurnEvent>,
2713 }
2714
2715 struct SlowTurn {
2716 emitted: bool,
2717 }
2718
2719 struct RecordingTurn {
2720 emitted: bool,
2721 }
2722 struct MultiToolTurn {
2723 events: VecDeque<ModelTurnEvent>,
2724 }
2725 struct DualApprovalTurn {
2726 events: VecDeque<ModelTurnEvent>,
2727 }
2728
2729 #[async_trait]
2730 impl ModelAdapter for FakeAdapter {
2731 type Session = FakeSession;
2732
2733 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2734 Ok(FakeSession)
2735 }
2736 }
2737
2738 #[async_trait]
2739 impl ModelAdapter for SlowAdapter {
2740 type Session = SlowSession;
2741
2742 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2743 Ok(SlowSession)
2744 }
2745 }
2746
2747 #[async_trait]
2748 impl ModelAdapter for RecordingAdapter {
2749 type Session = RecordingSession;
2750
2751 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2752 Ok(RecordingSession {
2753 seen_descriptions: self.seen_descriptions.clone(),
2754 seen_caches: self.seen_caches.clone(),
2755 })
2756 }
2757 }
2758
2759 #[async_trait]
2760 impl ModelAdapter for MultiToolAdapter {
2761 type Session = MultiToolSession;
2762
2763 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2764 Ok(MultiToolSession)
2765 }
2766 }
2767
2768 #[async_trait]
2769 impl ModelAdapter for DualApprovalAdapter {
2770 type Session = DualApprovalSession;
2771
2772 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2773 Ok(DualApprovalSession)
2774 }
2775 }
2776
2777 #[async_trait]
2778 impl ModelSession for FakeSession {
2779 type Turn = FakeTurn;
2780
2781 async fn begin_turn(
2782 &mut self,
2783 request: TurnRequest,
2784 _cancellation: Option<TurnCancellation>,
2785 ) -> Result<Self::Turn, LoopError> {
2786 let has_tool_result = request.transcript.iter().any(|item| {
2787 item.kind == ItemKind::Tool
2788 && item
2789 .parts
2790 .iter()
2791 .any(|part| matches!(part, Part::ToolResult(_)))
2792 });
2793 let tool_name = request
2794 .available_tools
2795 .first()
2796 .map(|tool| tool.name.0.clone())
2797 .unwrap_or_else(|| "echo".into());
2798
2799 let events = if has_tool_result {
2800 let result_text = request
2801 .transcript
2802 .iter()
2803 .rev()
2804 .find_map(|item| {
2805 item.parts.iter().find_map(|part| match part {
2806 Part::ToolResult(ToolResultPart {
2807 output: ToolOutput::Text(text),
2808 ..
2809 }) => Some(text.clone()),
2810 _ => None,
2811 })
2812 })
2813 .unwrap_or_else(|| "missing".into());
2814
2815 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2816 model: None,
2817 response_id: None,
2818 finish_reason: FinishReason::Completed,
2819 output_items: vec![Item {
2820 id: None,
2821 kind: ItemKind::Assistant,
2822 parts: vec![Part::Text(TextPart {
2823 text: format!("tool said: {result_text}"),
2824 metadata: MetadataMap::new(),
2825 })],
2826 metadata: MetadataMap::new(),
2827 usage: None,
2828 finish_reason: None,
2829 created_at: None,
2830 }],
2831 usage: None,
2832 metadata: MetadataMap::new(),
2833 })])
2834 } else {
2835 VecDeque::from([
2836 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2837 id: ToolCallId::new("call-1"),
2838 name: tool_name.clone(),
2839 input: json!({ "value": "pong" }),
2840 metadata: MetadataMap::new(),
2841 }),
2842 ModelTurnEvent::Finished(ModelTurnResult {
2843 model: None,
2844 response_id: None,
2845 finish_reason: FinishReason::ToolCall,
2846 output_items: vec![Item {
2847 id: None,
2848 kind: ItemKind::Assistant,
2849 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2850 id: ToolCallId::new("call-1"),
2851 name: tool_name,
2852 input: json!({ "value": "pong" }),
2853 metadata: MetadataMap::new(),
2854 })],
2855 metadata: MetadataMap::new(),
2856 usage: None,
2857 finish_reason: None,
2858 created_at: None,
2859 }],
2860 usage: None,
2861 metadata: MetadataMap::new(),
2862 }),
2863 ])
2864 };
2865
2866 Ok(FakeTurn { events })
2867 }
2868 }
2869
2870 #[async_trait]
2871 impl ModelSession for SlowSession {
2872 type Turn = SlowTurn;
2873
2874 async fn begin_turn(
2875 &mut self,
2876 request: TurnRequest,
2877 cancellation: Option<TurnCancellation>,
2878 ) -> Result<Self::Turn, LoopError> {
2879 let should_block = request
2880 .transcript
2881 .iter()
2882 .rev()
2883 .find(|item| item.kind == ItemKind::User)
2884 .is_some_and(|item| {
2885 item.parts.iter().any(|part| match part {
2886 Part::Text(text) => text.text == "do the long task",
2887 _ => false,
2888 })
2889 });
2890
2891 if should_block && let Some(cancellation) = cancellation {
2892 cancellation.cancelled().await;
2893 return Err(LoopError::Cancelled);
2894 }
2895
2896 Ok(SlowTurn { emitted: false })
2897 }
2898 }
2899
2900 #[async_trait]
2901 impl ModelSession for RecordingSession {
2902 type Turn = RecordingTurn;
2903
2904 async fn begin_turn(
2905 &mut self,
2906 request: TurnRequest,
2907 _cancellation: Option<TurnCancellation>,
2908 ) -> Result<Self::Turn, LoopError> {
2909 let descriptions = request
2910 .available_tools
2911 .iter()
2912 .map(|tool| tool.description.clone())
2913 .collect::<Vec<_>>();
2914 self.seen_descriptions.lock().unwrap().push(descriptions);
2915 self.seen_caches.lock().unwrap().push(request.cache.clone());
2916
2917 Ok(RecordingTurn { emitted: false })
2918 }
2919 }
2920
2921 #[async_trait]
2922 impl ModelSession for MultiToolSession {
2923 type Turn = MultiToolTurn;
2924
2925 async fn begin_turn(
2926 &mut self,
2927 request: TurnRequest,
2928 _cancellation: Option<TurnCancellation>,
2929 ) -> Result<Self::Turn, LoopError> {
2930 let has_tool_result = request.transcript.iter().any(|item| {
2931 item.kind == ItemKind::Tool
2932 && item
2933 .parts
2934 .iter()
2935 .any(|part| matches!(part, Part::ToolResult(_)))
2936 });
2937
2938 let events = if has_tool_result {
2939 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2940 model: None,
2941 response_id: None,
2942 finish_reason: FinishReason::Completed,
2943 output_items: vec![Item {
2944 id: None,
2945 kind: ItemKind::Assistant,
2946 parts: vec![Part::Text(TextPart {
2947 text: "mixed tools finished".into(),
2948 metadata: MetadataMap::new(),
2949 })],
2950 metadata: MetadataMap::new(),
2951 usage: None,
2952 finish_reason: None,
2953 created_at: None,
2954 }],
2955 usage: None,
2956 metadata: MetadataMap::new(),
2957 })])
2958 } else {
2959 let foreground = agentkit_core::ToolCallPart {
2960 id: ToolCallId::new("call-foreground"),
2961 name: "foreground-wait".into(),
2962 input: json!({}),
2963 metadata: MetadataMap::new(),
2964 };
2965 let background = agentkit_core::ToolCallPart {
2966 id: ToolCallId::new("call-background"),
2967 name: "background-wait".into(),
2968 input: json!({}),
2969 metadata: MetadataMap::new(),
2970 };
2971 VecDeque::from([
2972 ModelTurnEvent::ToolCall(foreground.clone()),
2973 ModelTurnEvent::ToolCall(background.clone()),
2974 ModelTurnEvent::Finished(ModelTurnResult {
2975 model: None,
2976 response_id: None,
2977 finish_reason: FinishReason::ToolCall,
2978 output_items: vec![Item {
2979 id: None,
2980 kind: ItemKind::Assistant,
2981 parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2982 metadata: MetadataMap::new(),
2983 usage: None,
2984 finish_reason: None,
2985 created_at: None,
2986 }],
2987 usage: None,
2988 metadata: MetadataMap::new(),
2989 }),
2990 ])
2991 };
2992
2993 Ok(MultiToolTurn { events })
2994 }
2995 }
2996
2997 #[async_trait]
2998 impl ModelSession for DualApprovalSession {
2999 type Turn = DualApprovalTurn;
3000
3001 async fn begin_turn(
3002 &mut self,
3003 request: TurnRequest,
3004 _cancellation: Option<TurnCancellation>,
3005 ) -> Result<Self::Turn, LoopError> {
3006 let tool_results = request
3007 .transcript
3008 .iter()
3009 .flat_map(|item| item.parts.iter())
3010 .filter(|part| matches!(part, Part::ToolResult(_)))
3011 .count();
3012
3013 let events = if tool_results >= 2 {
3014 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3015 model: None,
3016 response_id: None,
3017 finish_reason: FinishReason::Completed,
3018 output_items: vec![Item {
3019 id: None,
3020 kind: ItemKind::Assistant,
3021 parts: vec![Part::Text(TextPart {
3022 text: "both approvals finished".into(),
3023 metadata: MetadataMap::new(),
3024 })],
3025 metadata: MetadataMap::new(),
3026 usage: None,
3027 finish_reason: None,
3028 created_at: None,
3029 }],
3030 usage: None,
3031 metadata: MetadataMap::new(),
3032 })])
3033 } else {
3034 let first = agentkit_core::ToolCallPart {
3035 id: ToolCallId::new("call-1"),
3036 name: "echo".into(),
3037 input: json!({ "value": "first" }),
3038 metadata: MetadataMap::new(),
3039 };
3040 let second = agentkit_core::ToolCallPart {
3041 id: ToolCallId::new("call-2"),
3042 name: "echo".into(),
3043 input: json!({ "value": "second" }),
3044 metadata: MetadataMap::new(),
3045 };
3046 VecDeque::from([
3047 ModelTurnEvent::ToolCall(first.clone()),
3048 ModelTurnEvent::ToolCall(second.clone()),
3049 ModelTurnEvent::Finished(ModelTurnResult {
3050 model: None,
3051 response_id: None,
3052 finish_reason: FinishReason::ToolCall,
3053 output_items: vec![Item {
3054 id: None,
3055 kind: ItemKind::Assistant,
3056 parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
3057 metadata: MetadataMap::new(),
3058 usage: None,
3059 finish_reason: None,
3060 created_at: None,
3061 }],
3062 usage: None,
3063 metadata: MetadataMap::new(),
3064 }),
3065 ])
3066 };
3067
3068 Ok(DualApprovalTurn { events })
3069 }
3070 }
3071
3072 #[async_trait]
3073 impl ModelTurn for FakeTurn {
3074 async fn next_event(
3075 &mut self,
3076 _cancellation: Option<TurnCancellation>,
3077 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3078 Ok(self.events.pop_front())
3079 }
3080 }
3081
3082 #[async_trait]
3083 impl ModelTurn for SlowTurn {
3084 async fn next_event(
3085 &mut self,
3086 cancellation: Option<TurnCancellation>,
3087 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3088 if let Some(cancellation) = cancellation
3089 && cancellation.is_cancelled()
3090 {
3091 return Err(LoopError::Cancelled);
3092 }
3093
3094 if self.emitted {
3095 Ok(None)
3096 } else {
3097 self.emitted = true;
3098 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3099 model: None,
3100 response_id: None,
3101 finish_reason: FinishReason::Completed,
3102 output_items: vec![Item {
3103 id: None,
3104 kind: ItemKind::Assistant,
3105 parts: vec![Part::Text(TextPart {
3106 text: "done".into(),
3107 metadata: MetadataMap::new(),
3108 })],
3109 metadata: MetadataMap::new(),
3110 usage: None,
3111 finish_reason: None,
3112 created_at: None,
3113 }],
3114 usage: None,
3115 metadata: MetadataMap::new(),
3116 })))
3117 }
3118 }
3119 }
3120
3121 #[async_trait]
3122 impl ModelTurn for RecordingTurn {
3123 async fn next_event(
3124 &mut self,
3125 _cancellation: Option<TurnCancellation>,
3126 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3127 if self.emitted {
3128 Ok(None)
3129 } else {
3130 self.emitted = true;
3131 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3132 model: None,
3133 response_id: None,
3134 finish_reason: FinishReason::Completed,
3135 output_items: vec![Item {
3136 id: None,
3137 kind: ItemKind::Assistant,
3138 parts: vec![Part::Text(TextPart {
3139 text: "done".into(),
3140 metadata: MetadataMap::new(),
3141 })],
3142 metadata: MetadataMap::new(),
3143 usage: None,
3144 finish_reason: None,
3145 created_at: None,
3146 }],
3147 usage: None,
3148 metadata: MetadataMap::new(),
3149 })))
3150 }
3151 }
3152 }
3153
3154 #[async_trait]
3155 impl ModelTurn for MultiToolTurn {
3156 async fn next_event(
3157 &mut self,
3158 _cancellation: Option<TurnCancellation>,
3159 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3160 Ok(self.events.pop_front())
3161 }
3162 }
3163
3164 #[async_trait]
3165 impl ModelTurn for DualApprovalTurn {
3166 async fn next_event(
3167 &mut self,
3168 _cancellation: Option<TurnCancellation>,
3169 ) -> Result<Option<ModelTurnEvent>, LoopError> {
3170 Ok(self.events.pop_front())
3171 }
3172 }
3173
3174 #[derive(Clone)]
3175 struct EchoTool {
3176 spec: ToolSpec,
3177 }
3178
3179 impl Default for EchoTool {
3180 fn default() -> Self {
3181 Self {
3182 spec: ToolSpec {
3183 name: ToolName::new("echo"),
3184 description: "Echo back a value".into(),
3185 input_schema: json!({
3186 "type": "object",
3187 "properties": {
3188 "value": { "type": "string" }
3189 },
3190 "required": ["value"],
3191 "additionalProperties": false
3192 }),
3193 output_schema: None,
3194 annotations: ToolAnnotations::default(),
3195 metadata: MetadataMap::new(),
3196 },
3197 }
3198 }
3199 }
3200
3201 #[derive(Clone)]
3202 struct DynamicSpecTool {
3203 spec: ToolSpec,
3204 version: StdArc<AtomicUsize>,
3205 }
3206
3207 impl DynamicSpecTool {
3208 fn new(version: StdArc<AtomicUsize>) -> Self {
3209 Self {
3210 spec: ToolSpec {
3211 name: ToolName::new("dynamic"),
3212 description: "dynamic version 0".into(),
3213 input_schema: json!({
3214 "type": "object",
3215 "properties": {},
3216 "additionalProperties": false
3217 }),
3218 output_schema: None,
3219 annotations: ToolAnnotations::default(),
3220 metadata: MetadataMap::new(),
3221 },
3222 version,
3223 }
3224 }
3225 }
3226
3227 #[async_trait]
3228 impl Tool for EchoTool {
3229 fn spec(&self) -> &ToolSpec {
3230 &self.spec
3231 }
3232
3233 fn proposed_requests(
3234 &self,
3235 request: &agentkit_tools_core::ToolRequest,
3236 ) -> Result<
3237 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3238 agentkit_tools_core::ToolError,
3239 > {
3240 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3241 path: "/tmp/echo".into(),
3242 metadata: request.metadata.clone(),
3243 })])
3244 }
3245
3246 async fn invoke(
3247 &self,
3248 request: agentkit_tools_core::ToolRequest,
3249 _ctx: &mut ToolContext<'_>,
3250 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3251 let value = request
3252 .input
3253 .get("value")
3254 .and_then(Value::as_str)
3255 .ok_or_else(|| {
3256 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3257 })?;
3258
3259 Ok(ToolResult {
3260 result: ToolResultPart {
3261 call_id: request.call_id,
3262 output: ToolOutput::Text(value.into()),
3263 is_error: false,
3264 metadata: MetadataMap::new(),
3265 },
3266 duration: None,
3267 metadata: MetadataMap::new(),
3268 })
3269 }
3270 }
3271
3272 #[async_trait]
3273 impl Tool for DynamicSpecTool {
3274 fn spec(&self) -> &ToolSpec {
3275 &self.spec
3276 }
3277
3278 fn current_spec(&self) -> Option<ToolSpec> {
3279 let mut spec = self.spec.clone();
3280 spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3281 Some(spec)
3282 }
3283
3284 async fn invoke(
3285 &self,
3286 request: agentkit_tools_core::ToolRequest,
3287 _ctx: &mut ToolContext<'_>,
3288 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3289 Ok(ToolResult {
3290 result: ToolResultPart {
3291 call_id: request.call_id,
3292 output: ToolOutput::Text("ok".into()),
3293 is_error: false,
3294 metadata: MetadataMap::new(),
3295 },
3296 duration: None,
3297 metadata: MetadataMap::new(),
3298 })
3299 }
3300 }
3301
3302 struct DenyFsReads;
3303
3304 impl PermissionChecker for DenyFsReads {
3305 fn evaluate(
3306 &self,
3307 request: &dyn agentkit_tools_core::PermissionRequest,
3308 ) -> PermissionDecision {
3309 if request.kind() == "filesystem.read" {
3310 return PermissionDecision::Deny(PermissionDenial {
3311 code: PermissionCode::PathNotAllowed,
3312 message: "reads denied in test".into(),
3313 metadata: MetadataMap::new(),
3314 });
3315 }
3316
3317 PermissionDecision::Allow
3318 }
3319 }
3320
3321 struct ApproveFsReads;
3322
3323 impl PermissionChecker for ApproveFsReads {
3324 fn evaluate(
3325 &self,
3326 request: &dyn agentkit_tools_core::PermissionRequest,
3327 ) -> PermissionDecision {
3328 if request.kind() == "filesystem.read" {
3329 return PermissionDecision::RequireApproval(ApprovalRequest {
3330 task_id: None,
3331 call_id: None,
3332 id: "approval:fs-read".into(),
3333 request_kind: request.kind().into(),
3334 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3335 summary: request.summary(),
3336 metadata: request.metadata().clone(),
3337 });
3338 }
3339
3340 PermissionDecision::Allow
3341 }
3342 }
3343
3344 struct KeepRecentMutator {
3345 keep: usize,
3346 }
3347
3348 #[async_trait]
3349 impl LoopMutator for KeepRecentMutator {
3350 async fn mutate(
3351 &self,
3352 cursor: &mut TranscriptCursor<'_>,
3353 ctx: LoopCtx<'_>,
3354 ) -> Result<(), LoopError> {
3355 if cursor.len() < 2 {
3356 return Ok(());
3357 }
3358 let drop = cursor.len().saturating_sub(self.keep);
3359 ctx.emitter.emit(AgentEvent::MutationStarted {
3360 session_id: ctx.session_id.clone(),
3361 turn_id: ctx.turn_id.cloned(),
3362 mutator: "keep-recent".into(),
3363 point: ctx.point,
3364 });
3365 cursor.drain(..drop);
3366 ctx.emitter.emit(AgentEvent::MutationFinished {
3367 session_id: ctx.session_id.clone(),
3368 turn_id: ctx.turn_id.cloned(),
3369 mutator: "keep-recent".into(),
3370 dirty: true,
3371 metadata: MetadataMap::new(),
3372 });
3373 Ok(())
3374 }
3375 }
3376
3377 struct RecordingObserver {
3378 events: StdArc<StdMutex<Vec<AgentEvent>>>,
3379 }
3380
3381 impl LoopObserver for RecordingObserver {
3382 fn handle_event(&self, event: AgentEvent) {
3383 self.events.lock().unwrap().push(event);
3384 }
3385 }
3386
3387 struct CatalogExecutor {
3388 version: AtomicUsize,
3389 events: StdMutex<Vec<ToolCatalogEvent>>,
3390 }
3391
3392 impl CatalogExecutor {
3393 fn new() -> Self {
3394 Self {
3395 version: AtomicUsize::new(0),
3396 events: StdMutex::new(Vec::new()),
3397 }
3398 }
3399
3400 fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3401 self.version.store(version, Ordering::SeqCst);
3402 self.events.lock().unwrap().push(event);
3403 }
3404 }
3405
3406 #[async_trait]
3407 impl ToolExecutor for CatalogExecutor {
3408 fn specs(&self) -> Vec<ToolSpec> {
3409 vec![ToolSpec {
3410 name: ToolName::new("dynamic"),
3411 description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3412 input_schema: json!({
3413 "type": "object",
3414 "properties": {},
3415 "additionalProperties": false
3416 }),
3417 output_schema: None,
3418 annotations: ToolAnnotations::default(),
3419 metadata: MetadataMap::new(),
3420 }]
3421 }
3422
3423 fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3424 std::mem::take(&mut *self.events.lock().unwrap())
3425 }
3426
3427 async fn execute(
3428 &self,
3429 request: ToolRequest,
3430 _ctx: &mut ToolContext<'_>,
3431 ) -> ToolExecutionOutcome {
3432 ToolExecutionOutcome::Completed(ToolResult {
3433 result: ToolResultPart {
3434 call_id: request.call_id,
3435 output: ToolOutput::Text("dynamic-ok".into()),
3436 is_error: false,
3437 metadata: MetadataMap::new(),
3438 },
3439 duration: None,
3440 metadata: MetadataMap::new(),
3441 })
3442 }
3443 }
3444
3445 #[derive(Clone)]
3446 struct BlockingTool {
3447 spec: ToolSpec,
3448 entered: StdArc<AtomicBool>,
3449 release: StdArc<Notify>,
3450 output: &'static str,
3451 }
3452
3453 impl BlockingTool {
3454 fn new(
3455 name: &str,
3456 entered: StdArc<AtomicBool>,
3457 release: StdArc<Notify>,
3458 output: &'static str,
3459 ) -> Self {
3460 Self {
3461 spec: ToolSpec {
3462 name: ToolName::new(name),
3463 description: format!("blocking tool {name}"),
3464 input_schema: json!({
3465 "type": "object",
3466 "properties": {},
3467 "additionalProperties": false
3468 }),
3469 output_schema: None,
3470 annotations: ToolAnnotations::default(),
3471 metadata: MetadataMap::new(),
3472 },
3473 entered,
3474 release,
3475 output,
3476 }
3477 }
3478 }
3479
3480 #[async_trait]
3481 impl Tool for BlockingTool {
3482 fn spec(&self) -> &ToolSpec {
3483 &self.spec
3484 }
3485
3486 async fn invoke(
3487 &self,
3488 request: agentkit_tools_core::ToolRequest,
3489 _ctx: &mut ToolContext<'_>,
3490 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3491 self.entered.store(true, Ordering::SeqCst);
3492 self.release.notified().await;
3493 Ok(ToolResult {
3494 result: ToolResultPart {
3495 call_id: request.call_id,
3496 output: ToolOutput::Text(self.output.into()),
3497 is_error: false,
3498 metadata: MetadataMap::new(),
3499 },
3500 duration: None,
3501 metadata: MetadataMap::new(),
3502 })
3503 }
3504 }
3505
3506 struct NameRoutingPolicy {
3507 routes: Vec<(String, RoutingDecision)>,
3508 }
3509
3510 impl NameRoutingPolicy {
3511 fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3512 Self {
3513 routes: routes
3514 .into_iter()
3515 .map(|(name, decision)| (name.into(), decision))
3516 .collect(),
3517 }
3518 }
3519 }
3520
3521 impl TaskRoutingPolicy for NameRoutingPolicy {
3522 fn route(&self, request: &ToolRequest) -> RoutingDecision {
3523 self.routes
3524 .iter()
3525 .find(|(name, _)| name == &request.tool_name.0)
3526 .map(|(_, decision)| *decision)
3527 .unwrap_or(RoutingDecision::Foreground)
3528 }
3529 }
3530
3531 async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3532 timeout(Duration::from_secs(1), handle.next_event())
3533 .await
3534 .expect("timed out waiting for task event")
3535 .expect("task event stream ended unexpectedly")
3536 }
3537
3538 async fn wait_until_entered(flag: &AtomicBool) {
3539 timeout(Duration::from_secs(1), async {
3540 while !flag.load(Ordering::SeqCst) {
3541 tokio::task::yield_now().await;
3542 }
3543 })
3544 .await
3545 .expect("task never entered execution");
3546 }
3547
3548 #[tokio::test]
3549 async fn loop_continues_after_completed_tool_call() {
3550 let tools = ToolRegistry::new().with(EchoTool::default());
3551 let agent = Agent::builder()
3552 .model(FakeAdapter)
3553 .add_tool_source(tools)
3554 .permissions(AllowAllPermissions)
3555 .build()
3556 .unwrap();
3557
3558 let mut driver = agent
3559 .start(SessionConfig {
3560 session_id: SessionId::new("session-1"),
3561 metadata: MetadataMap::new(),
3562 cache: None,
3563 })
3564 .await
3565 .unwrap();
3566
3567 driver
3568 .submit_input(vec![Item {
3569 id: None,
3570 kind: ItemKind::User,
3571 parts: vec![Part::Text(TextPart {
3572 text: "ping".into(),
3573 metadata: MetadataMap::new(),
3574 })],
3575 metadata: MetadataMap::new(),
3576 usage: None,
3577 finish_reason: None,
3578 created_at: None,
3579 }])
3580 .unwrap();
3581
3582 let result = run_until_finished(&mut driver).await;
3583
3584 match result {
3585 LoopStep::Finished(turn) => {
3586 assert_eq!(turn.finish_reason, FinishReason::Completed);
3587 assert_eq!(turn.items.len(), 1);
3588 match &turn.items[0].parts[0] {
3589 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3590 other => panic!("unexpected part: {other:?}"),
3591 }
3592 }
3593 other => panic!("unexpected loop step: {other:?}"),
3594 }
3595 }
3596
3597 async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3601 loop {
3602 match driver.next().await.unwrap() {
3603 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3604 step => return step,
3605 }
3606 }
3607 }
3608
3609 #[tokio::test]
3610 async fn loop_uses_injected_permission_checker() {
3611 let tools = ToolRegistry::new().with(EchoTool::default());
3612 let agent = Agent::builder()
3613 .model(FakeAdapter)
3614 .add_tool_source(tools)
3615 .permissions(DenyFsReads)
3616 .build()
3617 .unwrap();
3618
3619 let mut driver = agent
3620 .start(SessionConfig {
3621 session_id: SessionId::new("session-2"),
3622 metadata: MetadataMap::new(),
3623 cache: None,
3624 })
3625 .await
3626 .unwrap();
3627
3628 driver
3629 .submit_input(vec![Item {
3630 id: None,
3631 kind: ItemKind::User,
3632 parts: vec![Part::Text(TextPart {
3633 text: "ping".into(),
3634 metadata: MetadataMap::new(),
3635 })],
3636 metadata: MetadataMap::new(),
3637 usage: None,
3638 finish_reason: None,
3639 created_at: None,
3640 }])
3641 .unwrap();
3642
3643 let result = run_until_finished(&mut driver).await;
3644
3645 match result {
3646 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3647 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3648 other => panic!("unexpected part: {other:?}"),
3649 },
3650 other => panic!("unexpected loop step: {other:?}"),
3651 }
3652 }
3653
3654 #[tokio::test]
3655 async fn async_task_manager_background_round_requires_explicit_continue() {
3656 let entered = StdArc::new(AtomicBool::new(false));
3657 let release = StdArc::new(Notify::new());
3658 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3659 "background-wait",
3660 RoutingDecision::Background,
3661 )]));
3662 let handle = task_manager.handle();
3663 let tools = ToolRegistry::new().with(BlockingTool::new(
3664 "background-wait",
3665 entered.clone(),
3666 release.clone(),
3667 "background-done",
3668 ));
3669 let agent = Agent::builder()
3670 .model(FakeAdapter)
3671 .add_tool_source(tools)
3672 .permissions(AllowAllPermissions)
3673 .task_manager(task_manager)
3674 .build()
3675 .unwrap();
3676
3677 let mut driver = agent
3678 .start(SessionConfig {
3679 session_id: SessionId::new("session-background"),
3680 metadata: MetadataMap::new(),
3681 cache: None,
3682 })
3683 .await
3684 .unwrap();
3685
3686 driver
3687 .submit_input(vec![Item {
3688 id: None,
3689 kind: ItemKind::User,
3690 parts: vec![Part::Text(TextPart {
3691 text: "ping".into(),
3692 metadata: MetadataMap::new(),
3693 })],
3694 metadata: MetadataMap::new(),
3695 usage: None,
3696 finish_reason: None,
3697 created_at: None,
3698 }])
3699 .unwrap();
3700
3701 let first = driver.next().await.unwrap();
3702 match first {
3703 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3704 other => panic!("unexpected first loop step: {other:?}"),
3705 }
3706
3707 match wait_for_task_event(&handle).await {
3708 TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3709 other => panic!("unexpected task event: {other:?}"),
3710 }
3711 wait_until_entered(entered.as_ref()).await;
3712 release.notify_waiters();
3713
3714 match wait_for_task_event(&handle).await {
3715 TaskEvent::Completed(_, result) => {
3716 assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3717 }
3718 other => panic!("unexpected completion event: {other:?}"),
3719 }
3720
3721 let resumed = driver.next().await.unwrap();
3722 match resumed {
3723 LoopStep::Finished(turn) => {
3724 assert_eq!(turn.finish_reason, FinishReason::Completed);
3725 match &turn.items[0].parts[0] {
3726 Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3727 other => panic!("unexpected part after resume: {other:?}"),
3728 }
3729 }
3730 other => panic!("unexpected resumed step: {other:?}"),
3731 }
3732 }
3733
3734 #[tokio::test]
3735 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3736 let controller = CancellationController::new();
3737 let agent = Agent::builder()
3738 .model(SlowAdapter)
3739 .cancellation(controller.handle())
3740 .build()
3741 .unwrap();
3742
3743 let mut driver = agent
3744 .start(SessionConfig {
3745 session_id: SessionId::new("session-cancel"),
3746 metadata: MetadataMap::new(),
3747 cache: None,
3748 })
3749 .await
3750 .unwrap();
3751
3752 driver
3753 .submit_input(vec![Item {
3754 id: None,
3755 kind: ItemKind::User,
3756 parts: vec![Part::Text(TextPart {
3757 text: "do the long task".into(),
3758 metadata: MetadataMap::new(),
3759 })],
3760 metadata: MetadataMap::new(),
3761 usage: None,
3762 finish_reason: None,
3763 created_at: None,
3764 }])
3765 .unwrap();
3766
3767 let cancelled = tokio::join!(async { driver.next().await }, async {
3768 tokio::task::yield_now().await;
3769 controller.interrupt();
3770 })
3771 .0
3772 .unwrap();
3773
3774 match cancelled {
3775 LoopStep::Finished(turn) => {
3776 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3777 assert_eq!(turn.items.len(), 1);
3778 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3779 assert_eq!(
3780 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3781 Some(&Value::Bool(true))
3782 );
3783 }
3784 other => panic!("unexpected loop step: {other:?}"),
3785 }
3786
3787 driver
3788 .submit_input(vec![Item {
3789 id: None,
3790 kind: ItemKind::User,
3791 parts: vec![Part::Text(TextPart {
3792 text: "try again".into(),
3793 metadata: MetadataMap::new(),
3794 })],
3795 metadata: MetadataMap::new(),
3796 usage: None,
3797 finish_reason: None,
3798 created_at: None,
3799 }])
3800 .unwrap();
3801
3802 let result = driver.next().await.unwrap();
3803 match result {
3804 LoopStep::Finished(turn) => {
3805 assert_eq!(turn.finish_reason, FinishReason::Completed);
3806 }
3807 other => panic!("unexpected loop step after retry: {other:?}"),
3808 }
3809 }
3810
3811 #[tokio::test]
3812 async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3813 let controller = CancellationController::new();
3814 let fg_entered = StdArc::new(AtomicBool::new(false));
3815 let fg_release = StdArc::new(Notify::new());
3816 let bg_entered = StdArc::new(AtomicBool::new(false));
3817 let bg_release = StdArc::new(Notify::new());
3818 let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3819 ("foreground-wait", RoutingDecision::Foreground),
3820 ("background-wait", RoutingDecision::Background),
3821 ]));
3822 let handle = task_manager.handle();
3823 let tools = ToolRegistry::new()
3824 .with(BlockingTool::new(
3825 "foreground-wait",
3826 fg_entered.clone(),
3827 fg_release,
3828 "foreground-done",
3829 ))
3830 .with(BlockingTool::new(
3831 "background-wait",
3832 bg_entered.clone(),
3833 bg_release.clone(),
3834 "background-done",
3835 ));
3836 let agent = Agent::builder()
3837 .model(MultiToolAdapter)
3838 .add_tool_source(tools)
3839 .permissions(AllowAllPermissions)
3840 .cancellation(controller.handle())
3841 .task_manager(task_manager)
3842 .build()
3843 .unwrap();
3844
3845 let mut driver = agent
3846 .start(SessionConfig {
3847 session_id: SessionId::new("session-mixed-cancel"),
3848 metadata: MetadataMap::new(),
3849 cache: None,
3850 })
3851 .await
3852 .unwrap();
3853
3854 driver
3855 .submit_input(vec![Item {
3856 id: None,
3857 kind: ItemKind::User,
3858 parts: vec![Part::Text(TextPart {
3859 text: "run both".into(),
3860 metadata: MetadataMap::new(),
3861 })],
3862 metadata: MetadataMap::new(),
3863 usage: None,
3864 finish_reason: None,
3865 created_at: None,
3866 }])
3867 .unwrap();
3868
3869 let cancelled = tokio::join!(async { driver.next().await }, async {
3870 let _ = wait_for_task_event(&handle).await;
3871 let _ = wait_for_task_event(&handle).await;
3872 wait_until_entered(fg_entered.as_ref()).await;
3873 wait_until_entered(bg_entered.as_ref()).await;
3874 controller.interrupt();
3875 })
3876 .0
3877 .unwrap();
3878
3879 match cancelled {
3880 LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3881 other => panic!("unexpected loop step after interrupt: {other:?}"),
3882 }
3883
3884 match wait_for_task_event(&handle).await {
3885 TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3886 other => panic!("unexpected post-interrupt event: {other:?}"),
3887 }
3888
3889 let running = handle.list_running().await;
3890 assert_eq!(running.len(), 1);
3891 assert_eq!(running[0].tool_name, "background-wait");
3892
3893 bg_release.notify_waiters();
3894 match wait_for_task_event(&handle).await {
3895 TaskEvent::Completed(snapshot, result) => {
3896 assert_eq!(snapshot.tool_name, "background-wait");
3897 assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3898 }
3899 other => panic!("unexpected background completion event: {other:?}"),
3900 }
3901 }
3902
3903 #[tokio::test]
3904 async fn loop_resumes_after_approved_tool_request() {
3905 let tools = ToolRegistry::new().with(EchoTool::default());
3906 let agent = Agent::builder()
3907 .model(FakeAdapter)
3908 .add_tool_source(tools)
3909 .permissions(ApproveFsReads)
3910 .build()
3911 .unwrap();
3912
3913 let mut driver = agent
3914 .start(SessionConfig {
3915 session_id: SessionId::new("session-approval"),
3916 metadata: MetadataMap::new(),
3917 cache: None,
3918 })
3919 .await
3920 .unwrap();
3921
3922 driver
3923 .submit_input(vec![Item {
3924 id: None,
3925 kind: ItemKind::User,
3926 parts: vec![Part::Text(TextPart {
3927 text: "ping".into(),
3928 metadata: MetadataMap::new(),
3929 })],
3930 metadata: MetadataMap::new(),
3931 usage: None,
3932 finish_reason: None,
3933 created_at: None,
3934 }])
3935 .unwrap();
3936
3937 let first = driver.next().await.unwrap();
3938 match first {
3939 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3940 assert!(pending.request.task_id.is_some());
3941 assert_eq!(pending.request.id.0, "approval:fs-read");
3942 pending.approve(&mut driver).unwrap();
3943 }
3944 other => panic!("unexpected loop step: {other:?}"),
3945 }
3946 let second = driver.next().await.unwrap();
3947 match second {
3948 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3949 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3950 other => panic!("unexpected part: {other:?}"),
3951 },
3952 other => panic!("unexpected loop step after approval: {other:?}"),
3953 }
3954 }
3955
3956 #[tokio::test]
3957 async fn loop_resumes_with_patched_input_on_approval() {
3958 let tools = ToolRegistry::new().with(EchoTool::default());
3959 let agent = Agent::builder()
3960 .model(FakeAdapter)
3961 .add_tool_source(tools)
3962 .permissions(ApproveFsReads)
3963 .build()
3964 .unwrap();
3965
3966 let mut driver = agent
3967 .start(SessionConfig {
3968 session_id: SessionId::new("session-approval-patched"),
3969 metadata: MetadataMap::new(),
3970 cache: None,
3971 })
3972 .await
3973 .unwrap();
3974
3975 driver
3976 .submit_input(vec![Item {
3977 id: None,
3978 kind: ItemKind::User,
3979 parts: vec![Part::Text(TextPart {
3980 text: "ping".into(),
3981 metadata: MetadataMap::new(),
3982 })],
3983 metadata: MetadataMap::new(),
3984 usage: None,
3985 finish_reason: None,
3986 created_at: None,
3987 }])
3988 .unwrap();
3989
3990 match driver.next().await.unwrap() {
3991 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3992 pending
3993 .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3994 .unwrap();
3995 }
3996 other => panic!("unexpected loop step: {other:?}"),
3997 }
3998 match driver.next().await.unwrap() {
3999 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4000 Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
4001 other => panic!("unexpected part: {other:?}"),
4002 },
4003 other => panic!("unexpected loop step after approval: {other:?}"),
4004 }
4005 }
4006
4007 #[tokio::test]
4008 async fn loop_tracks_multiple_pending_approvals_by_call_id() {
4009 let tools = ToolRegistry::new().with(EchoTool::default());
4010 let agent = Agent::builder()
4011 .model(DualApprovalAdapter)
4012 .add_tool_source(tools)
4013 .permissions(ApproveFsReads)
4014 .build()
4015 .unwrap();
4016
4017 let mut driver = agent
4018 .start(SessionConfig {
4019 session_id: SessionId::new("session-dual-approval"),
4020 metadata: MetadataMap::new(),
4021 cache: None,
4022 })
4023 .await
4024 .unwrap();
4025
4026 driver
4027 .submit_input(vec![Item {
4028 id: None,
4029 kind: ItemKind::User,
4030 parts: vec![Part::Text(TextPart {
4031 text: "run both approvals".into(),
4032 metadata: MetadataMap::new(),
4033 })],
4034 metadata: MetadataMap::new(),
4035 usage: None,
4036 finish_reason: None,
4037 created_at: None,
4038 }])
4039 .unwrap();
4040
4041 let pending_first = match driver.next().await.unwrap() {
4042 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4043 assert_eq!(
4044 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4045 Some("call-1")
4046 );
4047 pending
4048 }
4049 other => panic!("unexpected first loop step: {other:?}"),
4050 };
4051
4052 let pending_second = match driver.next().await.unwrap() {
4053 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4054 assert_eq!(
4055 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4056 Some("call-2")
4057 );
4058 pending
4059 }
4060 other => panic!("unexpected second loop step: {other:?}"),
4061 };
4062
4063 pending_second.approve(&mut driver).unwrap();
4064 match driver.next().await.unwrap() {
4065 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4066 assert_eq!(
4067 pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4068 Some("call-1")
4069 );
4070 }
4071 other => panic!("unexpected step after approving second request: {other:?}"),
4072 }
4073
4074 pending_first.approve(&mut driver).unwrap();
4075 match driver.next().await.unwrap() {
4076 LoopStep::Finished(turn) => {
4077 assert_eq!(turn.finish_reason, FinishReason::Completed);
4078 match &turn.items[0].parts[0] {
4079 Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
4080 other => panic!("unexpected final part: {other:?}"),
4081 }
4082 }
4083 other => panic!("unexpected final loop step: {other:?}"),
4084 }
4085 }
4086
4087 #[tokio::test]
4088 async fn loop_compacts_transcript_before_new_turns() {
4089 let events = StdArc::new(StdMutex::new(Vec::new()));
4090 let agent = Agent::builder()
4091 .model(FakeAdapter)
4092 .mutator(KeepRecentMutator { keep: 1 })
4093 .observer(RecordingObserver {
4094 events: events.clone(),
4095 })
4096 .build()
4097 .unwrap();
4098
4099 let mut driver = agent
4100 .start(SessionConfig {
4101 session_id: SessionId::new("session-4"),
4102 metadata: MetadataMap::new(),
4103 cache: None,
4104 })
4105 .await
4106 .unwrap();
4107
4108 for text in ["first", "second"] {
4109 driver
4110 .submit_input(vec![Item {
4111 id: None,
4112 kind: ItemKind::User,
4113 parts: vec![Part::Text(TextPart {
4114 text: text.into(),
4115 metadata: MetadataMap::new(),
4116 })],
4117 metadata: MetadataMap::new(),
4118 usage: None,
4119 finish_reason: None,
4120 created_at: None,
4121 }])
4122 .unwrap();
4123 let _ = driver.next().await.unwrap();
4124 }
4125
4126 let events = events.lock().unwrap();
4127 assert!(
4128 events
4129 .iter()
4130 .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
4131 );
4132 }
4133
4134 #[test]
4135 fn transcript_validation_rejects_orphaned_tool_result() {
4136 let transcript = vec![Item {
4137 id: None,
4138 kind: ItemKind::Tool,
4139 parts: vec![Part::ToolResult(ToolResultPart {
4140 call_id: "call-1".into(),
4141 output: ToolOutput::Text("result".into()),
4142 is_error: false,
4143 metadata: MetadataMap::new(),
4144 })],
4145 metadata: MetadataMap::new(),
4146 usage: None,
4147 finish_reason: None,
4148 created_at: None,
4149 }];
4150
4151 let error = validate_transcript_invariants(&transcript).unwrap_err();
4152 assert!(error.to_string().contains("orphaned tool_result"));
4153 }
4154
4155 #[test]
4156 fn transcript_validation_rejects_duplicate_tool_result() {
4157 let transcript = vec![
4158 Item {
4159 id: None,
4160 kind: ItemKind::Assistant,
4161 parts: vec![Part::ToolCall(ToolCallPart {
4162 id: "call-1".into(),
4163 name: "lookup".into(),
4164 input: serde_json::json!({}),
4165 metadata: MetadataMap::new(),
4166 })],
4167 metadata: MetadataMap::new(),
4168 usage: None,
4169 finish_reason: None,
4170 created_at: None,
4171 },
4172 Item {
4173 id: None,
4174 kind: ItemKind::Tool,
4175 parts: vec![Part::ToolResult(ToolResultPart {
4176 call_id: "call-1".into(),
4177 output: ToolOutput::Text("result".into()),
4178 is_error: false,
4179 metadata: MetadataMap::new(),
4180 })],
4181 metadata: MetadataMap::new(),
4182 usage: None,
4183 finish_reason: None,
4184 created_at: None,
4185 },
4186 Item {
4187 id: None,
4188 kind: ItemKind::Tool,
4189 parts: vec![Part::ToolResult(ToolResultPart {
4190 call_id: "call-1".into(),
4191 output: ToolOutput::Text("again".into()),
4192 is_error: false,
4193 metadata: MetadataMap::new(),
4194 })],
4195 metadata: MetadataMap::new(),
4196 usage: None,
4197 finish_reason: None,
4198 created_at: None,
4199 },
4200 ];
4201
4202 let error = validate_transcript_invariants(&transcript).unwrap_err();
4203 assert!(error.to_string().contains("duplicate tool_result"));
4204 }
4205
4206 #[tokio::test]
4207 async fn loop_refreshes_tool_specs_each_turn() {
4208 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4209 let version = StdArc::new(AtomicUsize::new(1));
4210 let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4211 let agent = Agent::builder()
4212 .model(RecordingAdapter {
4213 seen_descriptions: seen_descriptions.clone(),
4214 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4215 })
4216 .add_tool_source(tools)
4217 .permissions(AllowAllPermissions)
4218 .build()
4219 .unwrap();
4220
4221 let mut driver = agent
4222 .start(SessionConfig {
4223 session_id: SessionId::new("session-dynamic-tools"),
4224 metadata: MetadataMap::new(),
4225 cache: None,
4226 })
4227 .await
4228 .unwrap();
4229
4230 for text in ["first", "second"] {
4231 driver
4232 .submit_input(vec![Item {
4233 id: None,
4234 kind: ItemKind::User,
4235 parts: vec![Part::Text(TextPart {
4236 text: text.into(),
4237 metadata: MetadataMap::new(),
4238 })],
4239 metadata: MetadataMap::new(),
4240 usage: None,
4241 finish_reason: None,
4242 created_at: None,
4243 }])
4244 .unwrap();
4245
4246 let _ = driver.next().await.unwrap();
4247 if text == "first" {
4248 version.store(2, Ordering::SeqCst);
4249 }
4250 }
4251
4252 let seen_descriptions = seen_descriptions.lock().unwrap();
4253 assert_eq!(seen_descriptions.len(), 2);
4254 assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4255 assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4256 }
4257
4258 #[tokio::test]
4259 async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4260 let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4261 let events = StdArc::new(StdMutex::new(Vec::new()));
4262 let executor = StdArc::new(CatalogExecutor::new());
4263 let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4264 let agent = Agent::builder()
4265 .model(RecordingAdapter {
4266 seen_descriptions: seen_descriptions.clone(),
4267 seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4268 })
4269 .tool_executor(executor_for_agent)
4270 .permissions(AllowAllPermissions)
4271 .observer(RecordingObserver {
4272 events: events.clone(),
4273 })
4274 .build()
4275 .unwrap();
4276
4277 let mut driver = agent
4278 .start(SessionConfig {
4279 session_id: SessionId::new("session-catalog-events"),
4280 metadata: MetadataMap::new(),
4281 cache: None,
4282 })
4283 .await
4284 .unwrap();
4285
4286 driver
4287 .submit_input(vec![Item::text(ItemKind::User, "first")])
4288 .unwrap();
4289 let _ = driver.next().await.unwrap();
4290
4291 executor.publish_change(
4292 1,
4293 ToolCatalogEvent {
4294 source: "mcp:mock".into(),
4295 added: vec!["dynamic".into()],
4296 removed: Vec::new(),
4297 changed: Vec::new(),
4298 },
4299 );
4300
4301 driver
4302 .submit_input(vec![Item::text(ItemKind::User, "second")])
4303 .unwrap();
4304 let _ = driver.next().await.unwrap();
4305
4306 let seen_descriptions = seen_descriptions.lock().unwrap();
4307 assert_eq!(seen_descriptions.len(), 2);
4308 assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4309 assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4310
4311 let events = events.lock().unwrap();
4312 assert!(events.iter().any(|event| matches!(
4313 event,
4314 AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4315 source,
4316 added,
4317 removed,
4318 changed,
4319 }) if source == "mcp:mock"
4320 && added == &vec!["dynamic".to_string()]
4321 && removed.is_empty()
4322 && changed.is_empty()
4323 )));
4324 }
4325
4326 #[tokio::test]
4327 async fn loop_passes_session_default_and_next_turn_cache_requests() {
4328 let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4329 let agent = Agent::builder()
4330 .model(RecordingAdapter {
4331 seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4332 seen_caches: seen_caches.clone(),
4333 })
4334 .permissions(AllowAllPermissions)
4335 .build()
4336 .unwrap();
4337
4338 let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4339 .with_retention(PromptCacheRetention::Short);
4340 let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4341 breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4342 });
4343
4344 let mut driver = agent
4345 .start(SessionConfig {
4346 session_id: SessionId::new("session-cache"),
4347 metadata: MetadataMap::new(),
4348 cache: Some(default_cache.clone()),
4349 })
4350 .await
4351 .unwrap();
4352
4353 driver
4354 .submit_input(vec![Item {
4355 id: None,
4356 kind: ItemKind::User,
4357 parts: vec![Part::Text(TextPart {
4358 text: "first".into(),
4359 metadata: MetadataMap::new(),
4360 })],
4361 metadata: MetadataMap::new(),
4362 usage: None,
4363 finish_reason: None,
4364 created_at: None,
4365 }])
4366 .unwrap();
4367 let _ = driver.next().await.unwrap();
4368
4369 driver
4370 .submit_input_with_cache(
4371 vec![Item {
4372 id: None,
4373 kind: ItemKind::User,
4374 parts: vec![Part::Text(TextPart {
4375 text: "second".into(),
4376 metadata: MetadataMap::new(),
4377 })],
4378 metadata: MetadataMap::new(),
4379 usage: None,
4380 finish_reason: None,
4381 created_at: None,
4382 }],
4383 override_cache.clone(),
4384 )
4385 .unwrap();
4386 let _ = driver.next().await.unwrap();
4387
4388 let seen = seen_caches.lock().unwrap();
4389 assert_eq!(seen.len(), 2);
4390 assert_eq!(seen[0], Some(default_cache));
4391 assert_eq!(seen[1], Some(override_cache));
4392 }
4393
4394 #[tokio::test]
4395 async fn loop_yields_after_tool_result_between_rounds() {
4396 let tools = ToolRegistry::new().with(EchoTool::default());
4397 let agent = Agent::builder()
4398 .model(FakeAdapter)
4399 .add_tool_source(tools)
4400 .permissions(AllowAllPermissions)
4401 .build()
4402 .unwrap();
4403
4404 let mut driver = agent
4405 .start(SessionConfig {
4406 session_id: SessionId::new("yield-session"),
4407 metadata: MetadataMap::new(),
4408 cache: None,
4409 })
4410 .await
4411 .unwrap();
4412
4413 driver
4414 .submit_input(vec![Item::text(ItemKind::User, "ping")])
4415 .unwrap();
4416
4417 let step = driver.next().await.unwrap();
4420 let info = match step {
4421 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4422 other => panic!("expected AfterToolResult, got {other:?}"),
4423 };
4424 assert_eq!(info.session_id, SessionId::new("yield-session"));
4425 assert_eq!(info.transcript_len, 3);
4427
4428 let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4430 assert!(!interrupt.is_blocking());
4431
4432 driver
4434 .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4435 .unwrap();
4436
4437 let step = driver.next().await.unwrap();
4440 match step {
4441 LoopStep::Finished(turn) => {
4442 assert_eq!(turn.finish_reason, FinishReason::Completed);
4443 }
4444 other => panic!("expected Finished, got {other:?}"),
4445 }
4446
4447 let snapshot = driver.snapshot();
4449 let has_injected_message = snapshot.transcript.iter().any(|item| {
4450 item.kind == ItemKind::User
4451 && item.parts.iter().any(|part| match part {
4452 Part::Text(text) => text.text == "also: report back",
4453 _ => false,
4454 })
4455 });
4456 assert!(
4457 has_injected_message,
4458 "injected user message should be in transcript, got: {:?}",
4459 snapshot.transcript
4460 );
4461 }
4462
4463 struct RecordingTranscriptObserver {
4464 items: StdArc<StdMutex<Vec<Item>>>,
4465 }
4466
4467 impl TranscriptObserver for RecordingTranscriptObserver {
4468 fn on_item_appended(&self, item: &Item) {
4469 self.items.lock().unwrap().push(item.clone());
4470 }
4471 }
4472
4473 #[tokio::test]
4474 async fn observers_see_full_tool_round() {
4475 let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4481 let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4482 let agent = Agent::builder()
4483 .model(FakeAdapter)
4484 .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4485 .permissions(AllowAllPermissions)
4486 .observer(RecordingObserver {
4487 events: events.clone(),
4488 })
4489 .transcript_observer(RecordingTranscriptObserver {
4490 items: items.clone(),
4491 })
4492 .build()
4493 .unwrap();
4494
4495 let mut driver = agent
4496 .start(SessionConfig {
4497 session_id: SessionId::new("observer-session"),
4498 metadata: MetadataMap::new(),
4499 cache: None,
4500 })
4501 .await
4502 .unwrap();
4503
4504 driver
4505 .submit_input(vec![Item {
4506 id: None,
4507 kind: ItemKind::User,
4508 parts: vec![Part::Text(TextPart {
4509 text: "ping".into(),
4510 metadata: MetadataMap::new(),
4511 })],
4512 metadata: MetadataMap::new(),
4513 usage: None,
4514 finish_reason: None,
4515 created_at: None,
4516 }])
4517 .unwrap();
4518
4519 let result = run_until_finished(&mut driver).await;
4520 assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4521
4522 let events = events.lock().unwrap().clone();
4525 let tool_call_id = events.iter().find_map(|e| match e {
4526 AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4527 _ => None,
4528 });
4529 let tool_results: Vec<_> = events
4530 .iter()
4531 .filter_map(|e| match e {
4532 AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4533 _ => None,
4534 })
4535 .collect();
4536 assert_eq!(tool_results.len(), 1, "events: {events:?}");
4537 assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4538 assert!(!tool_results[0].is_error);
4539
4540 let items = items.lock().unwrap().clone();
4544 assert_eq!(items.len(), 4, "items: {items:?}");
4545 assert_eq!(items[0].kind, ItemKind::User);
4546 assert_eq!(items[1].kind, ItemKind::Assistant);
4547 assert!(
4548 items[1]
4549 .parts
4550 .iter()
4551 .any(|p| matches!(p, Part::ToolCall(_)))
4552 );
4553 assert_eq!(items[2].kind, ItemKind::Tool);
4554 assert!(
4555 items[2]
4556 .parts
4557 .iter()
4558 .any(|p| matches!(p, Part::ToolResult(_)))
4559 );
4560 assert_eq!(items[3].kind, ItemKind::Assistant);
4561 }
4562
4563 #[test]
4564 fn convenience_cache_builders_construct_expected_defaults() {
4565 let cache = PromptCacheRequest::automatic()
4566 .with_retention(PromptCacheRetention::Short)
4567 .with_key("workspace:demo");
4568 let session = SessionConfig::new("demo").with_cache(cache.clone());
4569
4570 assert_eq!(session.session_id, SessionId::new("demo"));
4571 assert_eq!(session.cache, Some(cache));
4572
4573 let explicit = PromptCacheRequest::explicit([
4574 PromptCacheBreakpoint::tools_end(),
4575 PromptCacheBreakpoint::transcript_item_end(2),
4576 PromptCacheBreakpoint::transcript_part_end(3, 1),
4577 ]);
4578
4579 assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4580 assert_eq!(
4581 explicit.strategy,
4582 PromptCacheStrategy::Explicit {
4583 breakpoints: vec![
4584 PromptCacheBreakpoint::ToolsEnd,
4585 PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4586 PromptCacheBreakpoint::TranscriptPartEnd {
4587 item_index: 3,
4588 part_index: 1,
4589 },
4590 ],
4591 }
4592 );
4593 }
4594}