1use crate::context::{CompactionConfig, Compactor, TokenTracker};
2use crate::events::{AgentMessage, UserMessage};
3use crate::mcp::run_mcp_task::{McpCommand, ToolExecutionEvent};
4use futures::Stream;
5use llm::types::IsoString;
6use llm::{
7 AssistantReasoning, ChatMessage, Context, EncryptedReasoningContent, LlmError, LlmResponse,
8 StopReason, StreamingModelProvider, ToolCallError, ToolCallRequest, ToolCallResult,
9};
10use std::collections::{HashMap, HashSet};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio_stream::StreamExt;
16use tokio_stream::StreamMap;
17use tokio_stream::wrappers::ReceiverStream;
18
19#[derive(Debug)]
21enum StreamEvent {
22 Llm(Result<LlmResponse, LlmError>),
23 ToolExecution(ToolExecutionEvent),
24 UserMessage(UserMessage),
25}
26
27type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
28
29pub(crate) struct AgentConfig {
30 pub llm: Arc<dyn StreamingModelProvider>,
31 pub context: Context,
32 pub mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
33 pub tool_timeout: Duration,
34 pub compaction_config: Option<CompactionConfig>,
35 pub auto_continue: AutoContinue,
36}
37
38pub struct Agent {
39 llm: Arc<dyn StreamingModelProvider>,
40 context: Context,
41 mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
42 message_tx: mpsc::Sender<AgentMessage>,
43 streams: StreamMap<String, EventStream>,
44 tool_timeout: Duration,
45 token_tracker: TokenTracker,
46 compaction_config: Option<CompactionConfig>,
47 auto_continue: AutoContinue,
48 active_requests: HashMap<String, ToolCallRequest>,
49}
50
51impl Agent {
52 pub(crate) fn new(
53 config: AgentConfig,
54 user_message_rx: mpsc::Receiver<UserMessage>,
55 message_tx: mpsc::Sender<AgentMessage>,
56 ) -> Self {
57 let mut streams: StreamMap<String, EventStream> = StreamMap::new();
58 streams.insert(
59 "user".to_string(),
60 Box::pin(ReceiverStream::new(user_message_rx).map(StreamEvent::UserMessage)),
61 );
62
63 let context_limit = config.llm.context_window();
64
65 Self {
66 llm: config.llm,
67 context: config.context,
68 mcp_command_tx: config.mcp_command_tx,
69 message_tx,
70 streams,
71 tool_timeout: config.tool_timeout,
72 token_tracker: TokenTracker::new(context_limit),
73 compaction_config: config.compaction_config,
74 auto_continue: config.auto_continue,
75 active_requests: HashMap::new(),
76 }
77 }
78
79 pub fn current_model_display_name(&self) -> String {
80 self.llm.display_name()
81 }
82
83 pub fn token_tracker(&self) -> &TokenTracker {
85 &self.token_tracker
86 }
87
88 pub async fn run(mut self) {
89 let mut state = IterationState::new();
90
91 while let Some((_, event)) = self.streams.next().await {
92 use UserMessage::{
93 Cancel, ClearContext, SetReasoningEffort, SwitchModel, Text, UpdateTools,
94 };
95 match event {
96 StreamEvent::UserMessage(Cancel) => {
97 self.on_user_cancel(&mut state).await;
98 }
99
100 StreamEvent::UserMessage(ClearContext) => {
101 self.on_user_clear_context(&mut state).await;
102 }
103
104 StreamEvent::UserMessage(Text { content }) => {
105 state = IterationState::new();
106 self.on_user_text(content);
107 }
108
109 StreamEvent::UserMessage(SwitchModel(new_provider)) => {
110 self.on_switch_model(new_provider).await;
111 }
112
113 StreamEvent::UserMessage(UpdateTools(tools)) => {
114 self.context.set_tools(tools);
115 }
116
117 StreamEvent::UserMessage(SetReasoningEffort(effort)) => {
118 self.context.set_reasoning_effort(effort);
119 }
120
121 StreamEvent::Llm(llm_event) => {
122 if !state.cancelled {
123 self.on_llm_event(llm_event, &mut state).await;
124 }
125 }
126
127 StreamEvent::ToolExecution(tool_event) => {
128 if !state.cancelled {
129 self.on_tool_execution_event(tool_event, &mut state).await;
130 }
131 }
132 }
133
134 if state.is_complete() {
135 let Some(id) = state.current_message_id.take() else {
136 continue;
137 };
138 let iteration = std::mem::replace(&mut state, IterationState::new());
139 self.on_iteration_complete(id, iteration).await;
140 }
141 }
142
143 tracing::debug!("Agent task shutting down - input channel closed");
144 }
145
146 async fn on_iteration_complete(&mut self, id: String, iteration: IterationState) {
147 let IterationState {
148 message_content,
149 reasoning_summary_text,
150 encrypted_reasoning,
151 completed_tool_calls,
152 stop_reason,
153 ..
154 } = iteration;
155 let has_tool_calls = !completed_tool_calls.is_empty();
156 let has_content = !message_content.is_empty() || has_tool_calls;
157
158 if !has_content && !self.auto_continue.should_continue(stop_reason.as_ref()) {
160 let _ = self.message_tx.send(AgentMessage::Done).await;
161 return;
162 }
163
164 let reasoning =
165 AssistantReasoning::from_parts(reasoning_summary_text.clone(), encrypted_reasoning);
166 self.update_context(&message_content, reasoning, completed_tool_calls);
167
168 let _ = self
169 .message_tx
170 .send(AgentMessage::Text {
171 message_id: id.clone(),
172 chunk: message_content.clone(),
173 is_complete: true,
174 model_name: self.llm.display_name(),
175 })
176 .await;
177
178 if !reasoning_summary_text.is_empty() {
179 let _ = self
180 .message_tx
181 .send(AgentMessage::Thought {
182 message_id: id.clone(),
183 chunk: reasoning_summary_text,
184 is_complete: true,
185 model_name: self.llm.display_name(),
186 })
187 .await;
188 }
189
190 if has_tool_calls {
191 self.auto_continue.on_tool_calls();
192 self.maybe_preflight_compact().await;
193 self.start_llm_stream();
194 } else if self.auto_continue.should_continue(stop_reason.as_ref()) {
195 self.auto_continue.advance();
196 tracing::info!(
197 "LLM stopped with {:?}, auto-continuing (attempt {}/{})",
198 stop_reason,
199 self.auto_continue.count(),
200 self.auto_continue.max()
201 );
202
203 let _ = self
204 .message_tx
205 .send(AgentMessage::AutoContinue {
206 attempt: self.auto_continue.count(),
207 max_attempts: self.auto_continue.max(),
208 })
209 .await;
210
211 self.inject_continuation_prompt(&message_content, stop_reason.as_ref());
212 self.maybe_preflight_compact().await;
213 self.start_llm_stream();
214 } else {
215 tracing::debug!("LLM completed turn with stop reason: {:?}", stop_reason);
216 self.auto_continue.on_completion();
217 if let Err(e) = self.message_tx.send(AgentMessage::Done).await {
218 tracing::warn!("Failed to send Done message: {:?}", e);
219 }
220 }
221 }
222
223 async fn on_user_cancel(&mut self, state: &mut IterationState) {
224 state.cancelled = true;
225 self.streams.remove("llm");
226 let _ = self
227 .message_tx
228 .send(AgentMessage::Cancelled {
229 message: "Processing cancelled".to_string(),
230 })
231 .await;
232 let _ = self.message_tx.send(AgentMessage::Done).await;
233 }
234
235 async fn on_user_clear_context(&mut self, state: &mut IterationState) {
236 self.clear_active_streams();
237 self.active_requests.clear();
238 self.context.clear_conversation();
239 self.token_tracker.reset_current_usage();
240 self.auto_continue.on_completion();
241 *state = IterationState::new();
242
243 let _ = self.message_tx.send(AgentMessage::ContextCleared).await;
244 }
245
246 fn on_user_text(&mut self, content: Vec<llm::ContentBlock>) {
247 self.context.add_message(ChatMessage::User {
248 content,
249 timestamp: IsoString::now(),
250 });
251
252 self.start_llm_stream();
253 }
254
255 async fn on_switch_model(&mut self, new_provider: Box<dyn StreamingModelProvider>) {
256 let previous = self.llm.display_name();
257 let new_context_limit = new_provider.context_window();
258 self.llm = Arc::from(new_provider);
259 self.token_tracker.reset_current_usage();
260 self.token_tracker.set_context_limit(new_context_limit);
261 let new = self.llm.display_name();
262 let _ = self
263 .message_tx
264 .send(AgentMessage::ModelSwitched { previous, new })
265 .await;
266
267 let _ = self
268 .message_tx
269 .send(AgentMessage::ContextUsageUpdate {
270 usage_ratio: self.token_tracker.usage_ratio(),
271 tokens_used: self.token_tracker.last_input_tokens(),
272 context_limit: self.token_tracker.context_limit(),
273 })
274 .await;
275 }
276
277 fn start_llm_stream(&mut self) {
278 self.streams.remove("llm");
279 let llm_stream = self
280 .llm
281 .stream_response(&self.context)
282 .map(StreamEvent::Llm);
283 self.streams.insert("llm".to_string(), Box::pin(llm_stream));
284 }
285
286 fn clear_active_streams(&mut self) {
287 self.streams.remove("llm");
288 for stream_key in self.active_requests.keys().cloned().collect::<Vec<_>>() {
289 self.streams.remove(&stream_key);
290 }
291 }
292
293 fn inject_continuation_prompt(
295 &mut self,
296 previous_response: &str,
297 stop_reason: Option<&StopReason>,
298 ) {
299 if !previous_response.is_empty() {
300 self.context.add_message(ChatMessage::Assistant {
301 content: previous_response.to_string(),
302 reasoning: AssistantReasoning::default(),
303 timestamp: IsoString::now(),
304 tool_calls: Vec::new(),
305 });
306 }
307
308 let reason =
309 stop_reason.map_or_else(|| "Unknown".to_string(), |reason| format!("{reason:?}"));
310
311 self.context.add_message(ChatMessage::User {
312 content: vec![llm::ContentBlock::text(format!(
313 "<system-notification>The LLM API stopped with reason '{reason}'. Continue from where you left off and finish your task.</system-notification>"
314 ))],
315 timestamp: IsoString::now(),
316 });
317 }
318
319 async fn on_llm_event(
320 &mut self,
321 result: Result<LlmResponse, LlmError>,
322 state: &mut IterationState,
323 ) {
324 use LlmResponse::{
325 Done, EncryptedReasoning, Error, Reasoning, Start, Text, ToolRequestArg,
326 ToolRequestComplete, ToolRequestStart, Usage,
327 };
328
329 let response = match result {
330 Ok(response) => response,
331 Err(e) => {
332 let _ = self
333 .message_tx
334 .send(AgentMessage::Error {
335 message: e.to_string(),
336 })
337 .await;
338 return;
339 }
340 };
341
342 match response {
343 Start { message_id } => {
344 state.on_llm_start(message_id);
345 }
346
347 Text { chunk } => {
348 self.handle_llm_text(chunk, state).await;
349 }
350
351 Reasoning { chunk } => {
352 state.reasoning_summary_text.push_str(&chunk);
353 if let Some(id) = &state.current_message_id {
354 let _ = self
355 .message_tx
356 .send(AgentMessage::Thought {
357 message_id: id.clone(),
358 chunk,
359 is_complete: false,
360 model_name: self.llm.display_name(),
361 })
362 .await;
363 }
364 }
365
366 EncryptedReasoning { id, content } => {
367 if let Some(model) = self.llm.model() {
368 state.encrypted_reasoning =
369 Some(EncryptedReasoningContent { id, model, content });
370 }
371 }
372
373 ToolRequestStart { id, name } => {
374 self.handle_tool_request_start(id, name).await;
375 }
376
377 ToolRequestArg { id, chunk } => {
378 self.handle_tool_request_arg(id, chunk).await;
379 }
380
381 ToolRequestComplete { tool_call } => {
382 self.handle_tool_completion(tool_call, state).await;
383 }
384
385 Done { stop_reason } => {
386 state.llm_done = true;
387 state.stop_reason = stop_reason;
388 }
389
390 Error { message } => {
391 let _ = self.message_tx.send(AgentMessage::Error { message }).await;
392 }
393
394 Usage {
395 input_tokens,
396 output_tokens,
397 cached_input_tokens,
398 } => {
399 self.handle_llm_usage(input_tokens, output_tokens, cached_input_tokens)
400 .await;
401 }
402 }
403 }
404
405 async fn handle_llm_text(&mut self, chunk: String, state: &mut IterationState) {
406 state.message_content.push_str(&chunk);
407
408 if let Some(id) = &state.current_message_id {
409 let _ = self
410 .message_tx
411 .send(AgentMessage::Text {
412 message_id: id.clone(),
413 chunk,
414 is_complete: false,
415 model_name: self.llm.display_name(),
416 })
417 .await;
418 }
419 }
420
421 async fn handle_tool_request_start(&mut self, id: String, name: String) {
422 let request = ToolCallRequest {
423 id: id.clone(),
424 name,
425 arguments: String::new(),
426 };
427 self.active_requests.insert(id, request.clone());
428
429 let _ = self
430 .message_tx
431 .send(AgentMessage::ToolCall {
432 request,
433 model_name: self.llm.display_name(),
434 })
435 .await;
436 }
437
438 async fn handle_tool_request_arg(&mut self, id: String, chunk: String) {
439 let Some(request) = self.active_requests.get_mut(&id) else {
440 return;
441 };
442 request.arguments.push_str(&chunk);
443
444 let _ = self
445 .message_tx
446 .send(AgentMessage::ToolCallUpdate {
447 tool_call_id: id,
448 chunk,
449 model_name: self.llm.display_name(),
450 })
451 .await;
452 }
453
454 async fn handle_tool_completion(
455 &mut self,
456 tool_call: ToolCallRequest,
457 state: &mut IterationState,
458 ) {
459 state.pending_tool_ids.insert(tool_call.id.clone());
460 debug_assert!(
461 self.active_requests.contains_key(&tool_call.id),
462 "tool call {} should already be in active_requests from handle_tool_request_start",
463 tool_call.id
464 );
465
466 let (tx, rx) = mpsc::channel(100);
467 let stream = ReceiverStream::new(rx).map(StreamEvent::ToolExecution);
468 let stream_key = tool_call.id.clone();
469 self.streams.insert(stream_key, Box::pin(stream));
470
471 if let Some(ref mcp_command_tx) = self.mcp_command_tx {
472 let mcp_future = mcp_command_tx.send(McpCommand::ExecuteTool {
473 request: tool_call,
474 timeout: self.tool_timeout,
475 tx,
476 });
477 if let Err(e) = mcp_future.await {
478 tracing::warn!("Failed to send tool request to MCP task: {:?}", e);
479 }
480 }
481 }
482
483 async fn handle_llm_usage(
484 &mut self,
485 input_tokens: u32,
486 output_tokens: u32,
487 cached_input_tokens: Option<u32>,
488 ) {
489 self.token_tracker
490 .record_usage(input_tokens, output_tokens, cached_input_tokens);
491 match (
492 self.token_tracker.usage_ratio(),
493 self.token_tracker.tokens_remaining(),
494 ) {
495 (Some(usage_ratio), Some(tokens_remaining)) => {
496 tracing::debug!(
497 "Token usage - input: {}, output: {}, ratio: {:.2}%, remaining: {}",
498 input_tokens,
499 output_tokens,
500 usage_ratio * 100.0,
501 tokens_remaining
502 );
503 }
504 _ => {
505 tracing::debug!(
506 "Token usage - input: {}, output: {}, ratio: unknown (context limit unavailable)",
507 input_tokens,
508 output_tokens
509 );
510 }
511 }
512
513 let _ = self
514 .message_tx
515 .send(AgentMessage::ContextUsageUpdate {
516 usage_ratio: self.token_tracker.usage_ratio(),
517 tokens_used: self.token_tracker.last_input_tokens(),
518 context_limit: self.token_tracker.context_limit(),
519 })
520 .await;
521
522 self.maybe_compact_context().await;
523 }
524
525 async fn maybe_preflight_compact(&mut self) {
529 let Some(context_limit) = self.token_tracker.context_limit() else {
530 return;
531 };
532 let Some(config) = self.compaction_config.as_ref() else {
533 return;
534 };
535 let estimated = self.context.estimated_token_count();
536 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
537 let threshold = (f64::from(context_limit) * config.threshold).ceil() as u32;
538 if estimated >= threshold {
539 tracing::info!(
540 "Pre-flight compaction triggered: estimated {estimated} tokens >= {:.1}% of {context_limit} limit",
541 config.threshold * 100.0
542 );
543 if let CompactionOutcome::Failed(e) = self.compact_context().await {
544 tracing::warn!("Pre-flight compaction failed: {e}");
545 }
546 }
547 }
548
549 async fn maybe_compact_context(&mut self) {
551 if !self
552 .compaction_config
553 .as_ref()
554 .is_some_and(|config| self.token_tracker.should_compact(config.threshold))
555 {
556 return;
557 }
558
559 if let CompactionOutcome::Failed(error_message) = self.compact_context().await {
560 tracing::warn!("Context compaction failed: {}", error_message);
561 }
562 }
563
564 async fn compact_context(&mut self) -> CompactionOutcome {
565 let Some(ref _config) = self.compaction_config else {
566 tracing::warn!("Context compaction requested but compaction is disabled");
567 return CompactionOutcome::SkippedDisabled;
568 };
569
570 match self.token_tracker.usage_ratio() {
571 Some(usage_ratio) => {
572 tracing::info!(
573 "Starting context compaction - {} messages, {:.1}% of context limit",
574 self.context.message_count(),
575 usage_ratio * 100.0
576 );
577 }
578 None => {
579 tracing::info!(
580 "Starting context compaction - {} messages (context limit unknown)",
581 self.context.message_count(),
582 );
583 }
584 }
585
586 let _ = self
587 .message_tx
588 .send(AgentMessage::ContextCompactionStarted {
589 message_count: self.context.message_count(),
590 })
591 .await;
592
593 let compactor = Compactor::new(self.llm.clone());
594
595 match compactor.compact(&self.context).await {
596 Ok(result) => {
597 tracing::info!(
598 "Context compacted: {} messages removed",
599 result.messages_removed
600 );
601
602 self.context = result.context;
603 self.token_tracker.reset_current_usage();
604
605 let _ = self
606 .message_tx
607 .send(AgentMessage::ContextCompactionResult {
608 summary: result.summary,
609 messages_removed: result.messages_removed,
610 })
611 .await;
612 CompactionOutcome::Compacted
613 }
614 Err(e) => CompactionOutcome::Failed(e.to_string()),
615 }
616 }
617
618 async fn on_tool_execution_event(
619 &mut self,
620 event: ToolExecutionEvent,
621 state: &mut IterationState,
622 ) {
623 match event {
624 ToolExecutionEvent::Started { tool_id, tool_name } => {
625 tracing::debug!("Tool execution started: {} ({})", tool_name, tool_id);
626 }
627
628 ToolExecutionEvent::Progress { tool_id, progress } => {
629 tracing::debug!(
630 "Tool progress for {}: {}/{}",
631 tool_id,
632 progress.progress,
633 progress.total.unwrap_or(0.0)
634 );
635
636 if let Some(request) = self.active_requests.get(&tool_id) {
637 let _ = self
638 .message_tx
639 .send(AgentMessage::ToolProgress {
640 request: request.clone(),
641 progress: progress.progress,
642 total: progress.total,
643 message: progress.message.clone(),
644 })
645 .await;
646 }
647 }
648
649 ToolExecutionEvent::Complete {
650 tool_id: _,
651 result,
652 result_meta,
653 } => match result {
654 Ok(tool_result) => {
655 tracing::debug!(
656 "Tool result received: {} -> {}",
657 tool_result.name,
658 tool_result.result.len()
659 );
660
661 if state.pending_tool_ids.remove(&tool_result.id) {
662 self.active_requests.remove(&tool_result.id);
663 state.completed_tool_calls.push(Ok(tool_result.clone()));
664
665 let msg = AgentMessage::ToolResult {
666 result: tool_result,
667 result_meta,
668 model_name: self.llm.display_name(),
669 };
670
671 if let Err(e) = self.message_tx.send(msg).await {
672 tracing::warn!("Failed to send ToolCall completion message: {:?}", e);
673 }
674 } else {
675 tracing::debug!("Ignoring stale tool result for id: {}", tool_result.id);
676 }
677 }
678
679 Err(tool_error) => {
680 if state.pending_tool_ids.remove(&tool_error.id) {
681 self.active_requests.remove(&tool_error.id);
682 state.completed_tool_calls.push(Err(tool_error.clone()));
683
684 let _ = self
685 .message_tx
686 .send(AgentMessage::ToolError {
687 error: tool_error,
688 model_name: self.llm.display_name(),
689 })
690 .await;
691 }
692 }
693 },
694 }
695 }
696
697 fn update_context(
698 &mut self,
699 message_content: &str,
700 reasoning: AssistantReasoning,
701 completed_tools: Vec<Result<ToolCallResult, ToolCallError>>,
702 ) {
703 self.context
704 .push_assistant_turn(message_content, reasoning, completed_tools);
705 }
706}
707
708#[derive(Debug, Clone, PartialEq, Eq)]
709enum CompactionOutcome {
710 Compacted,
711 SkippedDisabled,
712 Failed(String),
713}
714
715pub(crate) struct AutoContinue {
716 max: u32,
717 count: u32,
718}
719
720impl AutoContinue {
721 pub(crate) fn new(max: u32) -> Self {
722 Self { max, count: 0 }
723 }
724
725 fn on_tool_calls(&mut self) {
726 self.count = 0;
727 }
728
729 fn on_completion(&mut self) {
730 self.count = 0;
731 }
732
733 fn should_continue(&self, stop_reason: Option<&StopReason>) -> bool {
734 matches!(stop_reason, Some(StopReason::Length)) && self.count < self.max
735 }
736
737 fn advance(&mut self) {
738 self.count += 1;
739 }
740
741 fn count(&self) -> u32 {
742 self.count
743 }
744
745 fn max(&self) -> u32 {
746 self.max
747 }
748}
749
750#[derive(Debug)]
751struct IterationState {
752 current_message_id: Option<String>,
753 message_content: String,
754 reasoning_summary_text: String,
755 encrypted_reasoning: Option<EncryptedReasoningContent>,
756 pending_tool_ids: HashSet<String>,
757 completed_tool_calls: Vec<Result<ToolCallResult, ToolCallError>>,
758 llm_done: bool,
759 stop_reason: Option<StopReason>,
760 cancelled: bool,
761}
762
763impl IterationState {
764 fn new() -> Self {
765 Self {
766 current_message_id: None,
767 message_content: String::new(),
768 reasoning_summary_text: String::new(),
769 encrypted_reasoning: None,
770 pending_tool_ids: HashSet::new(),
771 completed_tool_calls: Vec::new(),
772 llm_done: false,
773 stop_reason: None,
774 cancelled: false,
775 }
776 }
777
778 fn on_llm_start(&mut self, message_id: String) {
779 self.current_message_id = Some(message_id);
780 self.message_content.clear();
781 self.reasoning_summary_text.clear();
782 self.encrypted_reasoning = None;
783 self.stop_reason = None;
784 }
785
786 fn is_complete(&self) -> bool {
787 self.llm_done && self.pending_tool_ids.is_empty()
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use llm::testing::FakeLlmProvider;
795 use tokio::sync::mpsc;
796
797 #[tokio::test]
798 async fn test_preflight_compaction_uses_configured_threshold() {
799 let llm = Arc::new(
800 FakeLlmProvider::with_single_response(vec![
801 LlmResponse::start("summary"),
802 LlmResponse::text("summary"),
803 LlmResponse::done(),
804 ])
805 .with_context_window(Some(100)),
806 );
807 let context = Context::new(
808 vec![ChatMessage::User {
809 content: vec![llm::ContentBlock::text("x".repeat(344))],
810 timestamp: IsoString::now(),
811 }],
812 vec![],
813 );
814 let (user_tx, user_rx) = mpsc::channel(1);
815 let (message_tx, _message_rx) = mpsc::channel(8);
816 drop(user_tx);
817
818 let mut agent = Agent::new(
819 AgentConfig {
820 llm,
821 context,
822 mcp_command_tx: None,
823 tool_timeout: Duration::from_secs(1),
824 compaction_config: Some(CompactionConfig::with_threshold(0.85)),
825 auto_continue: AutoContinue::new(0),
826 },
827 user_rx,
828 message_tx,
829 );
830
831 agent.maybe_preflight_compact().await;
832
833 assert!(
834 matches!(
835 agent.context.messages().as_slice(),
836 [ChatMessage::Summary { content, .. }] if content == "summary"
837 ),
838 "expected context to be compacted, got {:?}",
839 agent.context.messages()
840 );
841 }
842}