1use std::sync::Arc;
2
3use agentkit_capabilities::CapabilityContext;
4use agentkit_compaction::{
5 CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
6};
7use agentkit_core::{
8 CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId,
9 TextPart, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
10};
11use agentkit_tools_core::{
12 ApprovalDecision, ApprovalRequest, AuthOperation, AuthRequest, AuthResolution,
13 BasicToolExecutor, PermissionChecker, ToolContext, ToolError, ToolExecutionOutcome,
14 ToolExecutor, ToolRegistry, ToolRequest, ToolResources, ToolSpec,
15};
16use async_trait::async_trait;
17use serde::{Deserialize, Serialize};
18use thiserror::Error;
19
20const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
21const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
22const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
23const USER_CANCELLED_REASON: &str = "user_cancelled";
24
25#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
26pub struct SessionConfig {
27 pub session_id: SessionId,
28 pub metadata: MetadataMap,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct TurnRequest {
33 pub session_id: SessionId,
34 pub turn_id: agentkit_core::TurnId,
35 pub transcript: Vec<Item>,
36 pub available_tools: Vec<ToolSpec>,
37 pub metadata: MetadataMap,
38}
39
40#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
41pub struct ModelTurnResult {
42 pub finish_reason: FinishReason,
43 pub output_items: Vec<Item>,
44 pub usage: Option<Usage>,
45 pub metadata: MetadataMap,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49pub enum ModelTurnEvent {
50 Delta(Delta),
51 ToolCall(ToolCallPart),
52 Usage(Usage),
53 Finished(ModelTurnResult),
54}
55
56#[async_trait]
57pub trait ModelAdapter: Send + Sync {
58 type Session: ModelSession;
59
60 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
61}
62
63#[async_trait]
64pub trait ModelSession: Send {
65 type Turn: ModelTurn;
66
67 async fn begin_turn(
68 &mut self,
69 request: TurnRequest,
70 cancellation: Option<TurnCancellation>,
71 ) -> Result<Self::Turn, LoopError>;
72}
73
74#[async_trait]
75pub trait ModelTurn: Send {
76 async fn next_event(
77 &mut self,
78 cancellation: Option<TurnCancellation>,
79 ) -> Result<Option<ModelTurnEvent>, LoopError>;
80}
81
82pub trait LoopObserver: Send {
83 fn handle_event(&mut self, event: AgentEvent);
84}
85
86#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
87pub enum AgentEvent {
88 RunStarted {
89 session_id: SessionId,
90 },
91 TurnStarted {
92 session_id: SessionId,
93 turn_id: agentkit_core::TurnId,
94 },
95 InputAccepted {
96 session_id: SessionId,
97 items: Vec<Item>,
98 },
99 ContentDelta(Delta),
100 ToolCallRequested(ToolCallPart),
101 ApprovalRequired(ApprovalRequest),
102 AuthRequired(AuthRequest),
103 ApprovalResolved {
104 approved: bool,
105 },
106 AuthResolved {
107 provided: bool,
108 },
109 CompactionStarted {
110 session_id: SessionId,
111 turn_id: Option<agentkit_core::TurnId>,
112 reason: CompactionReason,
113 },
114 CompactionFinished {
115 session_id: SessionId,
116 turn_id: Option<agentkit_core::TurnId>,
117 replaced_items: usize,
118 transcript_len: usize,
119 metadata: MetadataMap,
120 },
121 UsageUpdated(Usage),
122 Warning {
123 message: String,
124 },
125 RunFailed {
126 message: String,
127 },
128 TurnFinished(TurnResult),
129}
130
131#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
132pub struct InputRequest {
133 pub session_id: SessionId,
134 pub reason: String,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct TurnResult {
139 pub turn_id: agentkit_core::TurnId,
140 pub finish_reason: FinishReason,
141 pub items: Vec<Item>,
142 pub usage: Option<Usage>,
143 pub metadata: MetadataMap,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
147pub enum LoopInterrupt {
148 ApprovalRequest(ApprovalRequest),
149 AuthRequest(AuthRequest),
150 AwaitingInput(InputRequest),
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub enum LoopStep {
155 Interrupt(LoopInterrupt),
156 Finished(TurnResult),
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160pub struct LoopSnapshot {
161 pub session_id: SessionId,
162 pub transcript: Vec<Item>,
163 pub pending_input: Vec<Item>,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
167enum DriverState {
168 Idle,
169 AwaitingApproval,
170 AwaitingAuth,
171}
172
173#[derive(Clone, Debug)]
174struct PendingApprovalToolCall {
175 request: ApprovalRequest,
176 decision: Option<ApprovalDecision>,
177 turn_id: agentkit_core::TurnId,
178 call: ToolCallPart,
179 tool_request: ToolRequest,
180}
181
182#[derive(Clone, Debug)]
183struct PendingAuthToolCall {
184 request: AuthRequest,
185 resolution: Option<AuthResolution>,
186 turn_id: agentkit_core::TurnId,
187 call: ToolCallPart,
188 tool_request: ToolRequest,
189}
190
191pub struct Agent<M>
192where
193 M: ModelAdapter,
194{
195 model: M,
196 tools: ToolRegistry,
197 permissions: Arc<dyn PermissionChecker>,
198 resources: Arc<dyn ToolResources>,
199 cancellation: Option<CancellationHandle>,
200 compaction: Option<CompactionConfig>,
201 observers: Vec<Box<dyn LoopObserver>>,
202}
203
204impl<M> Agent<M>
205where
206 M: ModelAdapter,
207{
208 pub fn builder() -> AgentBuilder<M> {
209 AgentBuilder::default()
210 }
211
212 pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
213 let session_id = config.session_id.clone();
214 let session = self.model.start_session(config).await?;
215 let tool_executor = Arc::new(BasicToolExecutor::new(self.tools.clone()));
216 let tool_specs = tool_executor.specs();
217 let mut driver = LoopDriver {
218 session_id: session_id.clone(),
219 session: Some(session),
220 tool_executor,
221 tool_specs,
222 permissions: self.permissions,
223 resources: self.resources,
224 cancellation: self.cancellation,
225 compaction: self.compaction,
226 observers: self.observers,
227 transcript: Vec::new(),
228 pending_input: Vec::new(),
229 pending_approval: None,
230 pending_auth: None,
231 next_turn_index: 1,
232 state: DriverState::Idle,
233 };
234 driver.emit(AgentEvent::RunStarted { session_id });
235 Ok(driver)
236 }
237}
238
239pub struct AgentBuilder<M>
240where
241 M: ModelAdapter,
242{
243 model: Option<M>,
244 tools: ToolRegistry,
245 permissions: Arc<dyn PermissionChecker>,
246 resources: Arc<dyn ToolResources>,
247 cancellation: Option<CancellationHandle>,
248 compaction: Option<CompactionConfig>,
249 observers: Vec<Box<dyn LoopObserver>>,
250}
251
252impl<M> Default for AgentBuilder<M>
253where
254 M: ModelAdapter,
255{
256 fn default() -> Self {
257 Self {
258 model: None,
259 tools: ToolRegistry::new(),
260 permissions: Arc::new(AllowAllPermissions),
261 resources: Arc::new(()),
262 cancellation: None,
263 compaction: None,
264 observers: Vec::new(),
265 }
266 }
267}
268
269impl<M> AgentBuilder<M>
270where
271 M: ModelAdapter,
272{
273 pub fn model(mut self, model: M) -> Self {
274 self.model = Some(model);
275 self
276 }
277
278 pub fn tools(mut self, tools: ToolRegistry) -> Self {
279 self.tools = tools;
280 self
281 }
282
283 pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
284 self.permissions = Arc::new(permissions);
285 self
286 }
287
288 pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
289 self.resources = Arc::new(resources);
290 self
291 }
292
293 pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
294 self.cancellation = Some(handle);
295 self
296 }
297
298 pub fn compaction(mut self, config: CompactionConfig) -> Self {
299 self.compaction = Some(config);
300 self
301 }
302
303 pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
304 self.observers.push(Box::new(observer));
305 self
306 }
307
308 pub fn build(self) -> Result<Agent<M>, LoopError> {
309 let model = self
310 .model
311 .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
312 Ok(Agent {
313 model,
314 tools: self.tools,
315 permissions: self.permissions,
316 resources: self.resources,
317 cancellation: self.cancellation,
318 compaction: self.compaction,
319 observers: self.observers,
320 })
321 }
322}
323
324pub struct LoopDriver<S>
325where
326 S: ModelSession,
327{
328 session_id: SessionId,
329 session: Option<S>,
330 tool_executor: Arc<dyn ToolExecutor>,
331 tool_specs: Vec<ToolSpec>,
332 permissions: Arc<dyn PermissionChecker>,
333 resources: Arc<dyn ToolResources>,
334 cancellation: Option<CancellationHandle>,
335 compaction: Option<CompactionConfig>,
336 observers: Vec<Box<dyn LoopObserver>>,
337 transcript: Vec<Item>,
338 pending_input: Vec<Item>,
339 pending_approval: Option<PendingApprovalToolCall>,
340 pending_auth: Option<PendingAuthToolCall>,
341 next_turn_index: u64,
342 state: DriverState,
343}
344
345impl<S> LoopDriver<S>
346where
347 S: ModelSession,
348{
349 async fn maybe_compact(
350 &mut self,
351 turn_id: Option<&agentkit_core::TurnId>,
352 cancellation: Option<TurnCancellation>,
353 ) -> Result<(), LoopError> {
354 let Some(compaction) = self.compaction.as_ref().cloned() else {
355 return Ok(());
356 };
357 if cancellation
358 .as_ref()
359 .is_some_and(TurnCancellation::is_cancelled)
360 {
361 return Err(LoopError::Cancelled);
362 }
363 let Some(reason) =
364 compaction
365 .trigger
366 .should_compact(&self.session_id, turn_id, &self.transcript)
367 else {
368 return Ok(());
369 };
370
371 self.emit(AgentEvent::CompactionStarted {
372 session_id: self.session_id.clone(),
373 turn_id: turn_id.cloned(),
374 reason: reason.clone(),
375 });
376
377 let CompactionResult {
378 transcript,
379 replaced_items,
380 metadata,
381 } = compaction
382 .strategy
383 .apply(
384 agentkit_compaction::CompactionRequest {
385 session_id: self.session_id.clone(),
386 turn_id: turn_id.cloned(),
387 transcript: self.transcript.clone(),
388 reason,
389 metadata: compaction.metadata.clone(),
390 },
391 &mut CompactionContext {
392 backend: compaction.backend.as_deref(),
393 metadata: &compaction.metadata,
394 cancellation,
395 },
396 )
397 .await
398 .map_err(|error| match error {
399 agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
400 other => LoopError::Compaction(other.to_string()),
401 })?;
402
403 self.transcript = transcript;
404 self.emit(AgentEvent::CompactionFinished {
405 session_id: self.session_id.clone(),
406 turn_id: turn_id.cloned(),
407 replaced_items,
408 transcript_len: self.transcript.len(),
409 metadata,
410 });
411 Ok(())
412 }
413
414 async fn drive_turn(
415 &mut self,
416 turn_id: agentkit_core::TurnId,
417 emit_started: bool,
418 ) -> Result<LoopStep, LoopError> {
419 let cancellation = self
420 .cancellation
421 .as_ref()
422 .map(CancellationHandle::checkpoint);
423 match self
424 .maybe_compact(Some(&turn_id), cancellation.clone())
425 .await
426 {
427 Ok(()) => {}
428 Err(LoopError::Cancelled) => {
429 return self.finish_cancelled(turn_id, interrupted_assistant_items());
430 }
431 Err(error) => return Err(error),
432 }
433 if emit_started {
434 self.emit(AgentEvent::TurnStarted {
435 session_id: self.session_id.clone(),
436 turn_id: turn_id.clone(),
437 });
438 }
439 if cancellation
440 .as_ref()
441 .is_some_and(TurnCancellation::is_cancelled)
442 {
443 return self.finish_cancelled(turn_id, interrupted_assistant_items());
444 }
445
446 loop {
447 let request = TurnRequest {
448 session_id: self.session_id.clone(),
449 turn_id: turn_id.clone(),
450 transcript: self.transcript.clone(),
451 available_tools: self.tool_specs.clone(),
452 metadata: MetadataMap::new(),
453 };
454
455 let session = self
456 .session
457 .as_mut()
458 .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
459 let mut turn = match session.begin_turn(request, cancellation.clone()).await {
460 Ok(turn) => turn,
461 Err(LoopError::Cancelled) => {
462 return self.finish_cancelled(turn_id, interrupted_assistant_items());
463 }
464 Err(error) => return Err(error),
465 };
466 let mut saw_tool_call = false;
467 let mut tool_results = Vec::new();
468
469 while let Some(event) = match turn.next_event(cancellation.clone()).await {
470 Ok(event) => event,
471 Err(LoopError::Cancelled) => {
472 return self.finish_cancelled(turn_id, interrupted_assistant_items());
473 }
474 Err(error) => return Err(error),
475 } {
476 if cancellation
477 .as_ref()
478 .is_some_and(TurnCancellation::is_cancelled)
479 {
480 return self.finish_cancelled(turn_id, interrupted_assistant_items());
481 }
482 match event {
483 ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
484 ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
485 ModelTurnEvent::ToolCall(call) => {
486 saw_tool_call = true;
487 self.emit(AgentEvent::ToolCallRequested(call.clone()));
488 if cancellation
489 .as_ref()
490 .is_some_and(TurnCancellation::is_cancelled)
491 {
492 let mut items = tool_results;
493 items.extend(interrupted_tool_items(&call));
494 return self.finish_cancelled(turn_id, items);
495 }
496
497 let tool_request = ToolRequest {
498 call_id: call.id.clone(),
499 tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
500 input: call.input.clone(),
501 session_id: self.session_id.clone(),
502 turn_id: turn_id.clone(),
503 metadata: call.metadata.clone(),
504 };
505 let tool_metadata = tool_request.metadata.clone();
506 let mut tool_ctx = ToolContext {
507 capability: CapabilityContext {
508 session_id: Some(&self.session_id),
509 turn_id: Some(&turn_id),
510 metadata: &tool_metadata,
511 },
512 permissions: self.permissions.as_ref(),
513 resources: self.resources.as_ref(),
514 cancellation: cancellation.clone(),
515 };
516
517 match self
518 .tool_executor
519 .execute(tool_request.clone(), &mut tool_ctx)
520 .await
521 {
522 ToolExecutionOutcome::Completed(result) => {
523 tool_results.push(Item {
524 id: None,
525 kind: ItemKind::Tool,
526 parts: vec![Part::ToolResult(result.result)],
527 metadata: result.metadata,
528 });
529 }
530 ToolExecutionOutcome::Interrupted(
531 agentkit_tools_core::ToolInterruption::ApprovalRequired(request),
532 ) => {
533 self.pending_approval = Some(PendingApprovalToolCall {
534 request: request.clone(),
535 decision: None,
536 turn_id: turn_id.clone(),
537 call,
538 tool_request,
539 });
540 self.state = DriverState::AwaitingApproval;
541 self.emit(AgentEvent::ApprovalRequired(request.clone()));
542 return Ok(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
543 request,
544 )));
545 }
546 ToolExecutionOutcome::Interrupted(
547 agentkit_tools_core::ToolInterruption::AuthRequired(request),
548 ) => {
549 let request = upgrade_auth_request(request, &tool_request, &call);
550 self.pending_auth = Some(PendingAuthToolCall {
551 request: request.clone(),
552 resolution: None,
553 turn_id: turn_id.clone(),
554 call,
555 tool_request,
556 });
557 self.state = DriverState::AwaitingAuth;
558 self.emit(AgentEvent::AuthRequired(request.clone()));
559 return Ok(LoopStep::Interrupt(LoopInterrupt::AuthRequest(
560 request,
561 )));
562 }
563 ToolExecutionOutcome::Failed(error) => {
564 if matches!(error, ToolError::Cancelled) {
565 let mut items = tool_results;
566 items.extend(interrupted_tool_items(&call));
567 return self.finish_cancelled(turn_id, items);
568 }
569 self.emit(AgentEvent::Warning {
570 message: format!("tool {} failed: {}", call.name, error),
571 });
572 tool_results.push(Item {
573 id: None,
574 kind: ItemKind::Tool,
575 parts: vec![Part::ToolResult(ToolResultPart {
576 call_id: call.id.clone(),
577 output: ToolOutput::Text(error.to_string()),
578 is_error: true,
579 metadata: call.metadata.clone(),
580 })],
581 metadata: MetadataMap::new(),
582 });
583 }
584 }
585 }
586 ModelTurnEvent::Finished(result) => {
587 self.transcript.extend(result.output_items.clone());
588
589 if saw_tool_call {
590 self.transcript.append(&mut tool_results);
591 break;
592 }
593
594 let turn_result = TurnResult {
595 turn_id,
596 finish_reason: result.finish_reason,
597 items: result.output_items,
598 usage: result.usage,
599 metadata: result.metadata,
600 };
601 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
602 return Ok(LoopStep::Finished(turn_result));
603 }
604 }
605 }
606
607 if saw_tool_call {
608 continue;
609 }
610
611 return Err(LoopError::Provider(
612 "model turn ended without a Finished event".into(),
613 ));
614 }
615 }
616
617 async fn resume_after_auth(
618 &mut self,
619 pending: PendingAuthToolCall,
620 ) -> Result<LoopStep, LoopError> {
621 let resolution = pending
622 .resolution
623 .clone()
624 .ok_or_else(|| LoopError::InvalidState("pending auth has no resolution".into()))?;
625
626 self.transcript.push(Item {
627 id: None,
628 kind: ItemKind::Assistant,
629 parts: vec![Part::ToolCall(pending.call.clone())],
630 metadata: MetadataMap::new(),
631 });
632
633 let tool_item = match resolution {
634 AuthResolution::Provided { .. } => {
635 let tool_metadata = pending.tool_request.metadata.clone();
636 let mut tool_ctx = ToolContext {
637 capability: CapabilityContext {
638 session_id: Some(&self.session_id),
639 turn_id: Some(&pending.turn_id),
640 metadata: &tool_metadata,
641 },
642 permissions: self.permissions.as_ref(),
643 resources: self.resources.as_ref(),
644 cancellation: self
645 .cancellation
646 .as_ref()
647 .map(CancellationHandle::checkpoint),
648 };
649
650 match self
651 .tool_executor
652 .execute(pending.tool_request.clone(), &mut tool_ctx)
653 .await
654 {
655 ToolExecutionOutcome::Completed(result) => Item {
656 id: None,
657 kind: ItemKind::Tool,
658 parts: vec![Part::ToolResult(result.result)],
659 metadata: result.metadata,
660 },
661 ToolExecutionOutcome::Interrupted(
662 agentkit_tools_core::ToolInterruption::AuthRequired(request),
663 ) => {
664 let request =
665 upgrade_auth_request(request, &pending.tool_request, &pending.call);
666 self.pending_auth = Some(PendingAuthToolCall {
667 request,
668 resolution: None,
669 turn_id: pending.turn_id,
670 call: pending.call,
671 tool_request: pending.tool_request,
672 });
673 self.state = DriverState::AwaitingAuth;
674 let request = self
675 .pending_auth
676 .as_ref()
677 .map(|pending| pending.request.clone())
678 .ok_or_else(|| {
679 LoopError::InvalidState("missing pending auth request".into())
680 })?;
681 self.emit(AgentEvent::AuthRequired(request.clone()));
682 return Ok(LoopStep::Interrupt(LoopInterrupt::AuthRequest(request)));
683 }
684 ToolExecutionOutcome::Interrupted(
685 agentkit_tools_core::ToolInterruption::ApprovalRequired(request),
686 ) => {
687 self.pending_approval = Some(PendingApprovalToolCall {
688 request: request.clone(),
689 decision: None,
690 turn_id: pending.turn_id,
691 call: pending.call,
692 tool_request: pending.tool_request,
693 });
694 self.state = DriverState::AwaitingApproval;
695 self.emit(AgentEvent::ApprovalRequired(request.clone()));
696 return Ok(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(request)));
697 }
698 ToolExecutionOutcome::Failed(error) => {
699 if matches!(error, ToolError::Cancelled) {
700 let items = interrupted_tool_items(&pending.call);
701 return self.finish_cancelled(pending.turn_id, items);
702 }
703 Item {
704 id: None,
705 kind: ItemKind::Tool,
706 parts: vec![Part::ToolResult(ToolResultPart {
707 call_id: pending.call.id.clone(),
708 output: ToolOutput::Text(error.to_string()),
709 is_error: true,
710 metadata: pending.call.metadata.clone(),
711 })],
712 metadata: MetadataMap::new(),
713 }
714 }
715 }
716 }
717 AuthResolution::Cancelled { .. } => Item {
718 id: None,
719 kind: ItemKind::Tool,
720 parts: vec![Part::ToolResult(ToolResultPart {
721 call_id: pending.call.id.clone(),
722 output: ToolOutput::Text("auth cancelled".into()),
723 is_error: true,
724 metadata: pending.call.metadata.clone(),
725 })],
726 metadata: MetadataMap::new(),
727 },
728 };
729
730 self.transcript.push(tool_item);
731 self.drive_turn(pending.turn_id, false).await
732 }
733
734 async fn resume_after_approval(
735 &mut self,
736 pending: PendingApprovalToolCall,
737 ) -> Result<LoopStep, LoopError> {
738 let decision = pending
739 .decision
740 .clone()
741 .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
742
743 self.transcript.push(Item {
744 id: None,
745 kind: ItemKind::Assistant,
746 parts: vec![Part::ToolCall(pending.call.clone())],
747 metadata: MetadataMap::new(),
748 });
749
750 let tool_item = match decision {
751 ApprovalDecision::Approve => {
752 let tool_metadata = pending.tool_request.metadata.clone();
753 let mut tool_ctx = ToolContext {
754 capability: CapabilityContext {
755 session_id: Some(&self.session_id),
756 turn_id: Some(&pending.turn_id),
757 metadata: &tool_metadata,
758 },
759 permissions: self.permissions.as_ref(),
760 resources: self.resources.as_ref(),
761 cancellation: self
762 .cancellation
763 .as_ref()
764 .map(CancellationHandle::checkpoint),
765 };
766
767 match self
768 .tool_executor
769 .execute_approved(
770 pending.tool_request.clone(),
771 &pending.request,
772 &mut tool_ctx,
773 )
774 .await
775 {
776 ToolExecutionOutcome::Completed(result) => Item {
777 id: None,
778 kind: ItemKind::Tool,
779 parts: vec![Part::ToolResult(result.result)],
780 metadata: result.metadata,
781 },
782 ToolExecutionOutcome::Interrupted(
783 agentkit_tools_core::ToolInterruption::ApprovalRequired(request),
784 ) => {
785 self.pending_approval = Some(PendingApprovalToolCall {
786 request: request.clone(),
787 decision: None,
788 turn_id: pending.turn_id,
789 call: pending.call,
790 tool_request: pending.tool_request,
791 });
792 self.state = DriverState::AwaitingApproval;
793 self.emit(AgentEvent::ApprovalRequired(request.clone()));
794 return Ok(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(request)));
795 }
796 ToolExecutionOutcome::Interrupted(
797 agentkit_tools_core::ToolInterruption::AuthRequired(request),
798 ) => {
799 let request =
800 upgrade_auth_request(request, &pending.tool_request, &pending.call);
801 self.pending_auth = Some(PendingAuthToolCall {
802 request: request.clone(),
803 resolution: None,
804 turn_id: pending.turn_id,
805 call: pending.call,
806 tool_request: pending.tool_request,
807 });
808 self.state = DriverState::AwaitingAuth;
809 self.emit(AgentEvent::AuthRequired(request.clone()));
810 return Ok(LoopStep::Interrupt(LoopInterrupt::AuthRequest(request)));
811 }
812 ToolExecutionOutcome::Failed(error) => {
813 if matches!(error, ToolError::Cancelled) {
814 let items = interrupted_tool_items(&pending.call);
815 return self.finish_cancelled(pending.turn_id, items);
816 }
817 Item {
818 id: None,
819 kind: ItemKind::Tool,
820 parts: vec![Part::ToolResult(ToolResultPart {
821 call_id: pending.call.id.clone(),
822 output: ToolOutput::Text(error.to_string()),
823 is_error: true,
824 metadata: pending.call.metadata.clone(),
825 })],
826 metadata: MetadataMap::new(),
827 }
828 }
829 }
830 }
831 ApprovalDecision::Deny { reason } => Item {
832 id: None,
833 kind: ItemKind::Tool,
834 parts: vec![Part::ToolResult(ToolResultPart {
835 call_id: pending.call.id.clone(),
836 output: ToolOutput::Text(reason.unwrap_or_else(|| "approval denied".into())),
837 is_error: true,
838 metadata: pending.call.metadata.clone(),
839 })],
840 metadata: MetadataMap::new(),
841 },
842 };
843
844 self.transcript.push(tool_item);
845 self.drive_turn(pending.turn_id, false).await
846 }
847
848 fn finish_cancelled(
849 &mut self,
850 turn_id: agentkit_core::TurnId,
851 items: Vec<Item>,
852 ) -> Result<LoopStep, LoopError> {
853 self.transcript.extend(items.clone());
854 let turn_result = TurnResult {
855 turn_id,
856 finish_reason: FinishReason::Cancelled,
857 items,
858 usage: None,
859 metadata: interrupted_metadata("turn"),
860 };
861 self.emit(AgentEvent::TurnFinished(turn_result.clone()));
862 Ok(LoopStep::Finished(turn_result))
863 }
864
865 pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
866 if self.state != DriverState::Idle {
867 return Err(LoopError::InvalidState(
868 "cannot submit input while an interrupt is pending".into(),
869 ));
870 }
871 self.emit(AgentEvent::InputAccepted {
872 session_id: self.session_id.clone(),
873 items: input.clone(),
874 });
875 self.pending_input.extend(input);
876 Ok(())
877 }
878
879 pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
880 let Some(pending) = self.pending_approval.as_mut() else {
881 return Err(LoopError::InvalidState(
882 "no approval request is pending".into(),
883 ));
884 };
885 pending.decision = Some(decision.clone());
886 self.state = DriverState::Idle;
887 self.emit(AgentEvent::ApprovalResolved {
888 approved: matches!(decision, ApprovalDecision::Approve),
889 });
890 Ok(())
891 }
892
893 pub fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), LoopError> {
894 let Some(pending) = self.pending_auth.as_mut() else {
895 return Err(LoopError::InvalidState("no auth request is pending".into()));
896 };
897 if pending.request.id != resolution.request().id {
898 return Err(LoopError::InvalidState(
899 "auth resolution does not match the pending request".into(),
900 ));
901 }
902 pending.resolution = Some(resolution.clone());
903 self.state = DriverState::Idle;
904 self.emit(AgentEvent::AuthResolved {
905 provided: matches!(resolution, AuthResolution::Provided { .. }),
906 });
907 Ok(())
908 }
909
910 pub fn snapshot(&self) -> LoopSnapshot {
911 LoopSnapshot {
912 session_id: self.session_id.clone(),
913 transcript: self.transcript.clone(),
914 pending_input: self.pending_input.clone(),
915 }
916 }
917
918 pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
919 if self.state != DriverState::Idle {
920 return Err(LoopError::InvalidState(
921 "cannot advance while an interrupt is pending".into(),
922 ));
923 }
924
925 if self
926 .pending_approval
927 .as_ref()
928 .is_some_and(|pending| pending.decision.is_some())
929 {
930 let pending = self
931 .pending_approval
932 .take()
933 .ok_or_else(|| LoopError::InvalidState("missing pending approval state".into()))?;
934 return self.resume_after_approval(pending).await;
935 }
936
937 if self
938 .pending_auth
939 .as_ref()
940 .is_some_and(|pending| pending.resolution.is_some())
941 {
942 let pending = self
943 .pending_auth
944 .take()
945 .ok_or_else(|| LoopError::InvalidState("missing pending auth state".into()))?;
946 return self.resume_after_auth(pending).await;
947 }
948
949 if self.pending_input.is_empty() {
950 return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
951 InputRequest {
952 session_id: self.session_id.clone(),
953 reason: "driver is waiting for input".into(),
954 },
955 )));
956 }
957
958 let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
959 self.next_turn_index += 1;
960 self.transcript.append(&mut self.pending_input);
961 self.drive_turn(turn_id, true).await
962 }
963
964 fn emit(&mut self, event: AgentEvent) {
965 for observer in &mut self.observers {
966 observer.handle_event(event.clone());
967 }
968 }
969}
970
971fn interrupted_metadata(stage: &str) -> MetadataMap {
972 let mut metadata = MetadataMap::new();
973 metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
974 metadata.insert(
975 INTERRUPT_REASON_METADATA_KEY.into(),
976 USER_CANCELLED_REASON.into(),
977 );
978 metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
979 metadata
980}
981
982fn interrupted_assistant_items() -> Vec<Item> {
983 vec![Item {
984 id: None,
985 kind: ItemKind::Assistant,
986 parts: vec![Part::Text(TextPart {
987 text: "Previous assistant response was interrupted by the user before completion."
988 .into(),
989 metadata: interrupted_metadata("assistant"),
990 })],
991 metadata: interrupted_metadata("assistant"),
992 }]
993}
994
995fn interrupted_tool_items(call: &ToolCallPart) -> Vec<Item> {
996 vec![
997 Item {
998 id: None,
999 kind: ItemKind::Assistant,
1000 parts: vec![Part::ToolCall(call.clone())],
1001 metadata: interrupted_metadata("tool_call"),
1002 },
1003 Item {
1004 id: None,
1005 kind: ItemKind::Tool,
1006 parts: vec![Part::ToolResult(ToolResultPart {
1007 call_id: call.id.clone(),
1008 output: ToolOutput::Text("tool execution interrupted by user".into()),
1009 is_error: true,
1010 metadata: interrupted_metadata("tool_result"),
1011 })],
1012 metadata: interrupted_metadata("tool_result"),
1013 },
1014 ]
1015}
1016
1017fn upgrade_auth_request(
1018 mut request: AuthRequest,
1019 tool_request: &ToolRequest,
1020 _call: &ToolCallPart,
1021) -> AuthRequest {
1022 if matches!(request.operation, AuthOperation::ToolCall { .. }) {
1023 return request;
1024 }
1025
1026 let prior_server_id = request.challenge.get("server_id").cloned();
1027 let mut metadata = tool_request.metadata.clone();
1028 if let Some(server_id) = prior_server_id {
1029 metadata.entry("server_id".into()).or_insert(server_id);
1030 }
1031 request.operation = AuthOperation::ToolCall {
1032 tool_name: tool_request.tool_name.0.clone(),
1033 input: tool_request.input.clone(),
1034 call_id: Some(tool_request.call_id.clone()),
1035 session_id: Some(tool_request.session_id.clone()),
1036 turn_id: Some(tool_request.turn_id.clone()),
1037 metadata,
1038 };
1039 request
1040}
1041
1042struct AllowAllPermissions;
1043
1044impl PermissionChecker for AllowAllPermissions {
1045 fn evaluate(
1046 &self,
1047 _request: &dyn agentkit_tools_core::PermissionRequest,
1048 ) -> agentkit_tools_core::PermissionDecision {
1049 agentkit_tools_core::PermissionDecision::Allow
1050 }
1051}
1052
1053#[derive(Debug, Error)]
1054pub enum LoopError {
1055 #[error("invalid driver state: {0}")]
1056 InvalidState(String),
1057 #[error("turn cancelled")]
1058 Cancelled,
1059 #[error("provider error: {0}")]
1060 Provider(String),
1061 #[error("tool error: {0}")]
1062 Tool(#[from] ToolError),
1063 #[error("compaction error: {0}")]
1064 Compaction(String),
1065 #[error("unsupported operation: {0}")]
1066 Unsupported(String),
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use std::collections::VecDeque;
1072 use std::sync::{Arc as StdArc, Mutex as StdMutex};
1073
1074 use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
1075 use agentkit_core::{
1076 CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
1077 };
1078 use agentkit_tools_core::{
1079 FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
1080 ToolAnnotations, ToolName, ToolResult, ToolSpec,
1081 };
1082 use serde_json::{Value, json};
1083
1084 use super::*;
1085
1086 struct FakeAdapter;
1087 struct SlowAdapter;
1088
1089 struct FakeSession;
1090 struct SlowSession;
1091
1092 struct FakeTurn {
1093 events: VecDeque<ModelTurnEvent>,
1094 }
1095
1096 struct SlowTurn {
1097 emitted: bool,
1098 }
1099
1100 #[async_trait]
1101 impl ModelAdapter for FakeAdapter {
1102 type Session = FakeSession;
1103
1104 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
1105 Ok(FakeSession)
1106 }
1107 }
1108
1109 #[async_trait]
1110 impl ModelAdapter for SlowAdapter {
1111 type Session = SlowSession;
1112
1113 async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
1114 Ok(SlowSession)
1115 }
1116 }
1117
1118 #[async_trait]
1119 impl ModelSession for FakeSession {
1120 type Turn = FakeTurn;
1121
1122 async fn begin_turn(
1123 &mut self,
1124 request: TurnRequest,
1125 _cancellation: Option<TurnCancellation>,
1126 ) -> Result<Self::Turn, LoopError> {
1127 let has_tool_result = request.transcript.iter().any(|item| {
1128 item.kind == ItemKind::Tool
1129 && item
1130 .parts
1131 .iter()
1132 .any(|part| matches!(part, Part::ToolResult(_)))
1133 });
1134 let tool_name = request
1135 .available_tools
1136 .first()
1137 .map(|tool| tool.name.0.clone())
1138 .unwrap_or_else(|| "echo".into());
1139
1140 let events = if has_tool_result {
1141 let result_text = request
1142 .transcript
1143 .iter()
1144 .rev()
1145 .find_map(|item| {
1146 item.parts.iter().find_map(|part| match part {
1147 Part::ToolResult(ToolResultPart {
1148 output: ToolOutput::Text(text),
1149 ..
1150 }) => Some(text.clone()),
1151 _ => None,
1152 })
1153 })
1154 .unwrap_or_else(|| "missing".into());
1155
1156 VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
1157 finish_reason: FinishReason::Completed,
1158 output_items: vec![Item {
1159 id: None,
1160 kind: ItemKind::Assistant,
1161 parts: vec![Part::Text(TextPart {
1162 text: format!("tool said: {result_text}"),
1163 metadata: MetadataMap::new(),
1164 })],
1165 metadata: MetadataMap::new(),
1166 }],
1167 usage: None,
1168 metadata: MetadataMap::new(),
1169 })])
1170 } else {
1171 VecDeque::from([
1172 ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
1173 id: ToolCallId::new("call-1"),
1174 name: tool_name.clone(),
1175 input: json!({ "value": "pong" }),
1176 metadata: MetadataMap::new(),
1177 }),
1178 ModelTurnEvent::Finished(ModelTurnResult {
1179 finish_reason: FinishReason::ToolCall,
1180 output_items: vec![Item {
1181 id: None,
1182 kind: ItemKind::Assistant,
1183 parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
1184 id: ToolCallId::new("call-1"),
1185 name: tool_name,
1186 input: json!({ "value": "pong" }),
1187 metadata: MetadataMap::new(),
1188 })],
1189 metadata: MetadataMap::new(),
1190 }],
1191 usage: None,
1192 metadata: MetadataMap::new(),
1193 }),
1194 ])
1195 };
1196
1197 Ok(FakeTurn { events })
1198 }
1199 }
1200
1201 #[async_trait]
1202 impl ModelSession for SlowSession {
1203 type Turn = SlowTurn;
1204
1205 async fn begin_turn(
1206 &mut self,
1207 request: TurnRequest,
1208 cancellation: Option<TurnCancellation>,
1209 ) -> Result<Self::Turn, LoopError> {
1210 let should_block = request
1211 .transcript
1212 .iter()
1213 .rev()
1214 .find(|item| item.kind == ItemKind::User)
1215 .is_some_and(|item| {
1216 item.parts.iter().any(|part| match part {
1217 Part::Text(text) => text.text == "do the long task",
1218 _ => false,
1219 })
1220 });
1221
1222 if should_block && let Some(cancellation) = cancellation {
1223 cancellation.cancelled().await;
1224 return Err(LoopError::Cancelled);
1225 }
1226
1227 Ok(SlowTurn { emitted: false })
1228 }
1229 }
1230
1231 #[async_trait]
1232 impl ModelTurn for FakeTurn {
1233 async fn next_event(
1234 &mut self,
1235 _cancellation: Option<TurnCancellation>,
1236 ) -> Result<Option<ModelTurnEvent>, LoopError> {
1237 Ok(self.events.pop_front())
1238 }
1239 }
1240
1241 #[async_trait]
1242 impl ModelTurn for SlowTurn {
1243 async fn next_event(
1244 &mut self,
1245 cancellation: Option<TurnCancellation>,
1246 ) -> Result<Option<ModelTurnEvent>, LoopError> {
1247 if let Some(cancellation) = cancellation
1248 && cancellation.is_cancelled()
1249 {
1250 return Err(LoopError::Cancelled);
1251 }
1252
1253 if self.emitted {
1254 Ok(None)
1255 } else {
1256 self.emitted = true;
1257 Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
1258 finish_reason: FinishReason::Completed,
1259 output_items: vec![Item {
1260 id: None,
1261 kind: ItemKind::Assistant,
1262 parts: vec![Part::Text(TextPart {
1263 text: "done".into(),
1264 metadata: MetadataMap::new(),
1265 })],
1266 metadata: MetadataMap::new(),
1267 }],
1268 usage: None,
1269 metadata: MetadataMap::new(),
1270 })))
1271 }
1272 }
1273 }
1274
1275 #[derive(Clone)]
1276 struct EchoTool {
1277 spec: ToolSpec,
1278 }
1279
1280 impl Default for EchoTool {
1281 fn default() -> Self {
1282 Self {
1283 spec: ToolSpec {
1284 name: ToolName::new("echo"),
1285 description: "Echo back a value".into(),
1286 input_schema: json!({
1287 "type": "object",
1288 "properties": {
1289 "value": { "type": "string" }
1290 },
1291 "required": ["value"],
1292 "additionalProperties": false
1293 }),
1294 annotations: ToolAnnotations::default(),
1295 metadata: MetadataMap::new(),
1296 },
1297 }
1298 }
1299 }
1300
1301 #[async_trait]
1302 impl Tool for EchoTool {
1303 fn spec(&self) -> &ToolSpec {
1304 &self.spec
1305 }
1306
1307 fn proposed_requests(
1308 &self,
1309 request: &agentkit_tools_core::ToolRequest,
1310 ) -> Result<
1311 Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
1312 agentkit_tools_core::ToolError,
1313 > {
1314 Ok(vec![Box::new(FileSystemPermissionRequest::Read {
1315 path: "/tmp/echo".into(),
1316 metadata: request.metadata.clone(),
1317 })])
1318 }
1319
1320 async fn invoke(
1321 &self,
1322 request: agentkit_tools_core::ToolRequest,
1323 _ctx: &mut ToolContext<'_>,
1324 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
1325 let value = request
1326 .input
1327 .get("value")
1328 .and_then(Value::as_str)
1329 .ok_or_else(|| {
1330 agentkit_tools_core::ToolError::InvalidInput("missing value".into())
1331 })?;
1332
1333 Ok(ToolResult {
1334 result: ToolResultPart {
1335 call_id: request.call_id,
1336 output: ToolOutput::Text(value.into()),
1337 is_error: false,
1338 metadata: MetadataMap::new(),
1339 },
1340 duration: None,
1341 metadata: MetadataMap::new(),
1342 })
1343 }
1344 }
1345
1346 struct DenyFsReads;
1347
1348 impl PermissionChecker for DenyFsReads {
1349 fn evaluate(
1350 &self,
1351 request: &dyn agentkit_tools_core::PermissionRequest,
1352 ) -> PermissionDecision {
1353 if request.kind() == "filesystem.read" {
1354 return PermissionDecision::Deny(PermissionDenial {
1355 code: PermissionCode::PathNotAllowed,
1356 message: "reads denied in test".into(),
1357 metadata: MetadataMap::new(),
1358 });
1359 }
1360
1361 PermissionDecision::Allow
1362 }
1363 }
1364
1365 struct ApproveFsReads;
1366
1367 impl PermissionChecker for ApproveFsReads {
1368 fn evaluate(
1369 &self,
1370 request: &dyn agentkit_tools_core::PermissionRequest,
1371 ) -> PermissionDecision {
1372 if request.kind() == "filesystem.read" {
1373 return PermissionDecision::RequireApproval(ApprovalRequest {
1374 id: "approval:fs-read".into(),
1375 request_kind: request.kind().into(),
1376 reason: agentkit_tools_core::ApprovalReason::SensitivePath,
1377 summary: request.summary(),
1378 metadata: request.metadata().clone(),
1379 });
1380 }
1381
1382 PermissionDecision::Allow
1383 }
1384 }
1385
1386 struct CountTrigger;
1387
1388 impl CompactionTrigger for CountTrigger {
1389 fn should_compact(
1390 &self,
1391 _session_id: &SessionId,
1392 _turn_id: Option<&agentkit_core::TurnId>,
1393 transcript: &[Item],
1394 ) -> Option<agentkit_compaction::CompactionReason> {
1395 (transcript.len() >= 2)
1396 .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
1397 }
1398 }
1399
1400 struct RecordingObserver {
1401 events: StdArc<StdMutex<Vec<AgentEvent>>>,
1402 }
1403
1404 impl LoopObserver for RecordingObserver {
1405 fn handle_event(&mut self, event: AgentEvent) {
1406 self.events.lock().unwrap().push(event);
1407 }
1408 }
1409
1410 #[derive(Clone)]
1411 struct AuthTool {
1412 spec: ToolSpec,
1413 }
1414
1415 impl Default for AuthTool {
1416 fn default() -> Self {
1417 Self {
1418 spec: ToolSpec {
1419 name: ToolName::new("auth-tool"),
1420 description: "Always requires auth".into(),
1421 input_schema: json!({
1422 "type": "object",
1423 "properties": {},
1424 "additionalProperties": false
1425 }),
1426 annotations: ToolAnnotations::default(),
1427 metadata: MetadataMap::new(),
1428 },
1429 }
1430 }
1431 }
1432
1433 #[async_trait]
1434 impl Tool for AuthTool {
1435 fn spec(&self) -> &ToolSpec {
1436 &self.spec
1437 }
1438
1439 async fn invoke(
1440 &self,
1441 request: agentkit_tools_core::ToolRequest,
1442 _ctx: &mut ToolContext<'_>,
1443 ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
1444 let mut challenge = MetadataMap::new();
1445 challenge.insert("server_id".into(), json!("mock"));
1446 challenge.insert("scope".into(), json!("secret.read"));
1447
1448 Err(agentkit_tools_core::ToolError::AuthRequired(Box::new(
1449 AuthRequest {
1450 id: "auth-1".into(),
1451 provider: "mcp.mock".into(),
1452 operation: AuthOperation::ToolCall {
1453 tool_name: request.tool_name.0,
1454 input: request.input,
1455 call_id: Some(request.call_id),
1456 session_id: Some(request.session_id),
1457 turn_id: Some(request.turn_id),
1458 metadata: request.metadata,
1459 },
1460 challenge,
1461 },
1462 )))
1463 }
1464 }
1465
1466 #[tokio::test]
1467 async fn loop_continues_after_completed_tool_call() {
1468 let tools = ToolRegistry::new().with(EchoTool::default());
1469 let agent = Agent::builder()
1470 .model(FakeAdapter)
1471 .tools(tools)
1472 .permissions(AllowAllPermissions)
1473 .build()
1474 .unwrap();
1475
1476 let mut driver = agent
1477 .start(SessionConfig {
1478 session_id: SessionId::new("session-1"),
1479 metadata: MetadataMap::new(),
1480 })
1481 .await
1482 .unwrap();
1483
1484 driver
1485 .submit_input(vec![Item {
1486 id: None,
1487 kind: ItemKind::User,
1488 parts: vec![Part::Text(TextPart {
1489 text: "ping".into(),
1490 metadata: MetadataMap::new(),
1491 })],
1492 metadata: MetadataMap::new(),
1493 }])
1494 .unwrap();
1495
1496 let result = driver.next().await.unwrap();
1497
1498 match result {
1499 LoopStep::Finished(turn) => {
1500 assert_eq!(turn.finish_reason, FinishReason::Completed);
1501 assert_eq!(turn.items.len(), 1);
1502 match &turn.items[0].parts[0] {
1503 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
1504 other => panic!("unexpected part: {other:?}"),
1505 }
1506 }
1507 other => panic!("unexpected loop step: {other:?}"),
1508 }
1509 }
1510
1511 #[tokio::test]
1512 async fn loop_uses_injected_permission_checker() {
1513 let tools = ToolRegistry::new().with(EchoTool::default());
1514 let agent = Agent::builder()
1515 .model(FakeAdapter)
1516 .tools(tools)
1517 .permissions(DenyFsReads)
1518 .build()
1519 .unwrap();
1520
1521 let mut driver = agent
1522 .start(SessionConfig {
1523 session_id: SessionId::new("session-2"),
1524 metadata: MetadataMap::new(),
1525 })
1526 .await
1527 .unwrap();
1528
1529 driver
1530 .submit_input(vec![Item {
1531 id: None,
1532 kind: ItemKind::User,
1533 parts: vec![Part::Text(TextPart {
1534 text: "ping".into(),
1535 metadata: MetadataMap::new(),
1536 })],
1537 metadata: MetadataMap::new(),
1538 }])
1539 .unwrap();
1540
1541 let result = driver.next().await.unwrap();
1542
1543 match result {
1544 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
1545 Part::Text(text) => assert!(text.text.contains("tool permission denied")),
1546 other => panic!("unexpected part: {other:?}"),
1547 },
1548 other => panic!("unexpected loop step: {other:?}"),
1549 }
1550 }
1551
1552 #[tokio::test]
1553 async fn loop_surfaces_auth_interruptions_from_tools() {
1554 let tools = ToolRegistry::new().with(AuthTool::default());
1555 let agent = Agent::builder()
1556 .model(FakeAdapter)
1557 .tools(tools)
1558 .permissions(AllowAllPermissions)
1559 .build()
1560 .unwrap();
1561
1562 let mut driver = agent
1563 .start(SessionConfig {
1564 session_id: SessionId::new("session-3"),
1565 metadata: MetadataMap::new(),
1566 })
1567 .await
1568 .unwrap();
1569
1570 driver
1571 .submit_input(vec![Item {
1572 id: None,
1573 kind: ItemKind::User,
1574 parts: vec![Part::Text(TextPart {
1575 text: "ping".into(),
1576 metadata: MetadataMap::new(),
1577 })],
1578 metadata: MetadataMap::new(),
1579 }])
1580 .unwrap();
1581
1582 let result = driver.next().await.unwrap();
1583
1584 match result {
1585 LoopStep::Interrupt(LoopInterrupt::AuthRequest(request)) => {
1586 assert_eq!(request.provider, "mcp.mock");
1587 assert_eq!(request.challenge.get("scope"), Some(&json!("secret.read")));
1588 match request.operation {
1589 AuthOperation::ToolCall { tool_name, .. } => {
1590 assert_eq!(tool_name, "auth-tool");
1591 }
1592 other => panic!("unexpected auth operation: {other:?}"),
1593 }
1594 }
1595 other => panic!("unexpected loop step: {other:?}"),
1596 }
1597 }
1598
1599 #[tokio::test]
1600 async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
1601 let controller = CancellationController::new();
1602 let agent = Agent::builder()
1603 .model(SlowAdapter)
1604 .cancellation(controller.handle())
1605 .build()
1606 .unwrap();
1607
1608 let mut driver = agent
1609 .start(SessionConfig {
1610 session_id: SessionId::new("session-cancel"),
1611 metadata: MetadataMap::new(),
1612 })
1613 .await
1614 .unwrap();
1615
1616 driver
1617 .submit_input(vec![Item {
1618 id: None,
1619 kind: ItemKind::User,
1620 parts: vec![Part::Text(TextPart {
1621 text: "do the long task".into(),
1622 metadata: MetadataMap::new(),
1623 })],
1624 metadata: MetadataMap::new(),
1625 }])
1626 .unwrap();
1627
1628 let task = tokio::spawn(async move {
1629 let result = driver.next().await;
1630 (driver, result)
1631 });
1632 tokio::task::yield_now().await;
1633 controller.interrupt();
1634 let (mut driver, cancelled) = task.await.unwrap();
1635 let cancelled = cancelled.unwrap();
1636
1637 match cancelled {
1638 LoopStep::Finished(turn) => {
1639 assert_eq!(turn.finish_reason, FinishReason::Cancelled);
1640 assert_eq!(turn.items.len(), 1);
1641 assert_eq!(turn.items[0].kind, ItemKind::Assistant);
1642 assert_eq!(
1643 turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
1644 Some(&Value::Bool(true))
1645 );
1646 }
1647 other => panic!("unexpected loop step: {other:?}"),
1648 }
1649
1650 driver
1651 .submit_input(vec![Item {
1652 id: None,
1653 kind: ItemKind::User,
1654 parts: vec![Part::Text(TextPart {
1655 text: "try again".into(),
1656 metadata: MetadataMap::new(),
1657 })],
1658 metadata: MetadataMap::new(),
1659 }])
1660 .unwrap();
1661
1662 let result = driver.next().await.unwrap();
1663 match result {
1664 LoopStep::Finished(turn) => {
1665 assert_eq!(turn.finish_reason, FinishReason::Completed);
1666 }
1667 other => panic!("unexpected loop step after retry: {other:?}"),
1668 }
1669 }
1670
1671 #[tokio::test]
1672 async fn loop_resumes_after_approved_tool_request() {
1673 let tools = ToolRegistry::new().with(EchoTool::default());
1674 let agent = Agent::builder()
1675 .model(FakeAdapter)
1676 .tools(tools)
1677 .permissions(ApproveFsReads)
1678 .build()
1679 .unwrap();
1680
1681 let mut driver = agent
1682 .start(SessionConfig {
1683 session_id: SessionId::new("session-approval"),
1684 metadata: MetadataMap::new(),
1685 })
1686 .await
1687 .unwrap();
1688
1689 driver
1690 .submit_input(vec![Item {
1691 id: None,
1692 kind: ItemKind::User,
1693 parts: vec![Part::Text(TextPart {
1694 text: "ping".into(),
1695 metadata: MetadataMap::new(),
1696 })],
1697 metadata: MetadataMap::new(),
1698 }])
1699 .unwrap();
1700
1701 let first = driver.next().await.unwrap();
1702 match first {
1703 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(request)) => {
1704 assert_eq!(request.id.0, "approval:fs-read");
1705 }
1706 other => panic!("unexpected loop step: {other:?}"),
1707 }
1708
1709 driver.resolve_approval(ApprovalDecision::Approve).unwrap();
1710 let second = driver.next().await.unwrap();
1711 match second {
1712 LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
1713 Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
1714 other => panic!("unexpected part: {other:?}"),
1715 },
1716 other => panic!("unexpected loop step after approval: {other:?}"),
1717 }
1718 }
1719
1720 #[tokio::test]
1721 async fn loop_compacts_transcript_before_new_turns() {
1722 let events = StdArc::new(StdMutex::new(Vec::new()));
1723 let agent = Agent::builder()
1724 .model(FakeAdapter)
1725 .compaction(CompactionConfig::new(
1726 CountTrigger,
1727 CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
1728 ))
1729 .observer(RecordingObserver {
1730 events: events.clone(),
1731 })
1732 .build()
1733 .unwrap();
1734
1735 let mut driver = agent
1736 .start(SessionConfig {
1737 session_id: SessionId::new("session-4"),
1738 metadata: MetadataMap::new(),
1739 })
1740 .await
1741 .unwrap();
1742
1743 for text in ["first", "second"] {
1744 driver
1745 .submit_input(vec![Item {
1746 id: None,
1747 kind: ItemKind::User,
1748 parts: vec![Part::Text(TextPart {
1749 text: text.into(),
1750 metadata: MetadataMap::new(),
1751 })],
1752 metadata: MetadataMap::new(),
1753 }])
1754 .unwrap();
1755 let _ = driver.next().await.unwrap();
1756 }
1757
1758 let events = events.lock().unwrap();
1759 assert!(events.iter().any(|event| matches!(
1760 event,
1761 AgentEvent::CompactionFinished {
1762 replaced_items,
1763 ..
1764 } if *replaced_items > 0
1765 )));
1766 }
1767}