1use crate::context::{CompactionConfig, Compactor, TokenTracker};
2use crate::events::{AgentMessage, UserMessage};
3use crate::mcp::run_mcp_task::{McpCommand, ToolExecutionEvent};
4use futures::Stream;
5use llm::types::IsoString;
6use llm::{
7 AssistantReasoning, ChatMessage, Context, EncryptedReasoningContent, LlmError, LlmResponse, StopReason,
8 StreamingModelProvider, TokenUsage, ToolCallError, ToolCallRequest, ToolCallResult,
9};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio_stream::StreamExt;
16use tokio_stream::StreamMap;
17use tokio_stream::wrappers::ReceiverStream;
18
19#[derive(Debug)]
21enum StreamEvent {
22 Llm(Result<LlmResponse, LlmError>),
23 ToolExecution(ToolExecutionEvent),
24 UserMessage(UserMessage),
25}
26
27type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
28
29const USER_STREAM_KEY: &str = "user";
30const LLM_STREAM_KEY: &str = "llm";
31
32pub(crate) struct AgentConfig {
33 pub llm: Arc<dyn StreamingModelProvider>,
34 pub context: Context,
35 pub mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
36 pub tool_timeout: Duration,
37 pub compaction_config: Option<CompactionConfig>,
38 pub auto_continue: AutoContinue,
39}
40
41pub struct Agent {
42 llm: Arc<dyn StreamingModelProvider>,
43 context: Context,
44 mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
45 message_tx: mpsc::Sender<AgentMessage>,
46 streams: StreamMap<String, EventStream>,
47 tool_timeout: Duration,
48 token_tracker: TokenTracker,
49 compaction_config: Option<CompactionConfig>,
50 auto_continue: AutoContinue,
51 active_requests: HashMap<String, ToolCallRequest>,
52 queued_user_messages: VecDeque<Vec<llm::ContentBlock>>,
53}
54
55impl Agent {
56 pub(crate) fn new(
57 config: AgentConfig,
58 user_message_rx: mpsc::Receiver<UserMessage>,
59 message_tx: mpsc::Sender<AgentMessage>,
60 ) -> Self {
61 let mut streams: StreamMap<String, EventStream> = StreamMap::new();
62 streams.insert(
63 USER_STREAM_KEY.to_string(),
64 Box::pin(ReceiverStream::new(user_message_rx).map(StreamEvent::UserMessage)),
65 );
66
67 let context_limit = config.llm.context_window();
68
69 Self {
70 llm: config.llm,
71 context: config.context,
72 mcp_command_tx: config.mcp_command_tx,
73 message_tx,
74 streams,
75 tool_timeout: config.tool_timeout,
76 token_tracker: TokenTracker::new(context_limit),
77 compaction_config: config.compaction_config,
78 auto_continue: config.auto_continue,
79 active_requests: HashMap::new(),
80 queued_user_messages: VecDeque::new(),
81 }
82 }
83
84 pub fn current_model_display_name(&self) -> String {
85 self.llm.display_name()
86 }
87
88 pub fn token_tracker(&self) -> &TokenTracker {
90 &self.token_tracker
91 }
92
93 pub async fn run(mut self) {
94 let mut state = IterationState::new();
95
96 while let Some((_, event)) = self.streams.next().await {
97 use UserMessage::{Cancel, ClearContext, SetReasoningEffort, SwitchModel, Text, UpdateTools};
98 match event {
99 StreamEvent::UserMessage(Cancel) => {
100 self.on_user_cancel(&mut state).await;
101 }
102
103 StreamEvent::UserMessage(ClearContext) => {
104 self.on_user_clear_context(&mut state).await;
105 }
106
107 StreamEvent::UserMessage(Text { content }) => {
108 if self.is_busy() {
109 self.queued_user_messages.push_back(content);
110 } else {
111 state = IterationState::new();
112 self.on_user_text(content);
113 }
114 }
115
116 StreamEvent::UserMessage(SwitchModel(new_provider)) => {
117 self.on_switch_model(new_provider).await;
118 }
119
120 StreamEvent::UserMessage(UpdateTools(tools)) => {
121 self.context.set_tools(tools);
122 }
123
124 StreamEvent::UserMessage(SetReasoningEffort(effort)) => {
125 self.context.set_reasoning_effort(effort);
126 }
127
128 StreamEvent::Llm(llm_event) => {
129 self.on_llm_event(llm_event, &mut state).await;
130 }
131
132 StreamEvent::ToolExecution(tool_event) => {
133 self.on_tool_execution_event(tool_event, &mut state).await;
134 }
135 }
136
137 if state.is_complete() {
138 let Some(id) = state.current_message_id.take() else {
139 continue;
140 };
141 let iteration = std::mem::replace(&mut state, IterationState::new());
142 self.on_iteration_complete(id, iteration).await;
143 }
144 }
145
146 tracing::debug!("Agent task shutting down - input channel closed");
147 }
148
149 async fn on_iteration_complete(&mut self, id: String, iteration: IterationState) {
150 let IterationState {
151 message_content,
152 reasoning_summary_text,
153 encrypted_reasoning,
154 completed_tool_calls,
155 stop_reason,
156 ..
157 } = iteration;
158 let has_tool_calls = !completed_tool_calls.is_empty();
159 let has_content = !message_content.is_empty() || has_tool_calls;
160 let should_auto_continue = self.auto_continue.should_continue(stop_reason.as_ref());
161
162 if has_content {
163 let reasoning = AssistantReasoning::from_parts(reasoning_summary_text.clone(), encrypted_reasoning);
164 self.update_context(&message_content, reasoning, completed_tool_calls);
165
166 let _ = self
167 .message_tx
168 .send(AgentMessage::Text {
169 message_id: id.clone(),
170 chunk: message_content.clone(),
171 is_complete: true,
172 model_name: self.llm.display_name(),
173 })
174 .await;
175
176 if !reasoning_summary_text.is_empty() {
177 let _ = self
178 .message_tx
179 .send(AgentMessage::Thought {
180 message_id: id.clone(),
181 chunk: reasoning_summary_text,
182 is_complete: true,
183 model_name: self.llm.display_name(),
184 })
185 .await;
186 }
187 }
188
189 let has_queued_text = !self.queued_user_messages.is_empty();
190 if has_queued_text {
191 let content: Vec<_> = self.queued_user_messages.drain(..).flatten().collect();
192 self.context.add_message(ChatMessage::User { content, timestamp: IsoString::now() });
193 }
194
195 if has_queued_text || has_tool_calls {
196 self.auto_continue.reset();
197 self.start_next_turn().await;
198 } else if should_auto_continue {
199 self.auto_continue.advance();
200 tracing::info!(
201 "LLM stopped with {:?}, auto-continuing (attempt {}/{})",
202 stop_reason,
203 self.auto_continue.count(),
204 self.auto_continue.max()
205 );
206
207 let _ = self
208 .message_tx
209 .send(AgentMessage::AutoContinue {
210 attempt: self.auto_continue.count(),
211 max_attempts: self.auto_continue.max(),
212 })
213 .await;
214
215 self.inject_continuation_prompt(&message_content, stop_reason.as_ref());
216 self.start_next_turn().await;
217 } else {
218 tracing::debug!("LLM completed turn with stop reason: {:?}", stop_reason);
219 self.auto_continue.reset();
220 if let Err(e) = self.message_tx.send(AgentMessage::Done).await {
221 tracing::warn!("Failed to send Done message: {:?}", e);
222 }
223 }
224 }
225
226 async fn start_next_turn(&mut self) {
227 self.maybe_preflight_compact().await;
228 self.start_llm_stream();
229 }
230
231 async fn on_user_cancel(&mut self, state: &mut IterationState) {
232 self.abort_in_flight_work();
233 *state = IterationState::new();
234 let _ = self.message_tx.send(AgentMessage::Cancelled { message: "Processing cancelled".to_string() }).await;
235 let _ = self.message_tx.send(AgentMessage::Done).await;
236 }
237
238 async fn on_user_clear_context(&mut self, state: &mut IterationState) {
239 self.abort_in_flight_work();
240 self.context.clear_conversation();
241 self.token_tracker.reset_current_usage();
242 self.auto_continue.reset();
243 *state = IterationState::new();
244
245 let _ = self.message_tx.send(AgentMessage::ContextCleared).await;
246 }
247
248 fn on_user_text(&mut self, content: Vec<llm::ContentBlock>) {
249 self.context.add_message(ChatMessage::User { content, timestamp: IsoString::now() });
250 self.auto_continue.reset();
251 self.start_llm_stream();
252 }
253
254 async fn on_switch_model(&mut self, new_provider: Box<dyn StreamingModelProvider>) {
255 let previous = self.llm.display_name();
256 let new_context_limit = new_provider.context_window();
257 self.llm = Arc::from(new_provider);
258 self.token_tracker.reset_current_usage();
259 self.token_tracker.set_context_limit(new_context_limit);
260 let new = self.llm.display_name();
261 let _ = self.message_tx.send(AgentMessage::ModelSwitched { previous, new }).await;
262
263 let _ = self.message_tx.send(self.context_usage_message()).await;
264 }
265
266 fn start_llm_stream(&mut self) {
267 self.streams.remove(LLM_STREAM_KEY);
268 let llm_stream = self.llm.stream_response(&self.context).map(StreamEvent::Llm);
269 self.streams.insert(LLM_STREAM_KEY.to_string(), Box::pin(llm_stream));
270 }
271
272 fn is_busy(&self) -> bool {
273 self.streams.contains_key(LLM_STREAM_KEY) || !self.active_requests.is_empty()
274 }
275
276 fn abort_in_flight_work(&mut self) {
277 self.streams.remove(LLM_STREAM_KEY);
278 for stream_key in self.active_requests.keys().cloned().collect::<Vec<_>>() {
279 self.streams.remove(&stream_key);
280 }
281 self.active_requests.clear();
282 self.queued_user_messages.clear();
283 }
284
285 fn inject_continuation_prompt(&mut self, previous_response: &str, stop_reason: Option<&StopReason>) {
287 if !previous_response.is_empty() {
288 self.context.add_message(ChatMessage::Assistant {
289 content: previous_response.to_string(),
290 reasoning: AssistantReasoning::default(),
291 timestamp: IsoString::now(),
292 tool_calls: Vec::new(),
293 });
294 }
295
296 let reason = stop_reason.map_or_else(|| "Unknown".to_string(), |reason| format!("{reason:?}"));
297
298 self.context.add_message(ChatMessage::User {
299 content: vec![llm::ContentBlock::text(format!(
300 "<system-notification>The LLM API stopped with reason '{reason}'. Continue from where you left off and finish your task.</system-notification>"
301 ))],
302 timestamp: IsoString::now(),
303 });
304 }
305
306 async fn on_llm_event(&mut self, result: Result<LlmResponse, LlmError>, state: &mut IterationState) {
307 use LlmResponse::{
308 Done, EncryptedReasoning, Error, Reasoning, Start, Text, ToolRequestArg, ToolRequestComplete,
309 ToolRequestStart, Usage,
310 };
311
312 let response = match result {
313 Ok(response) => response,
314 Err(e) => {
315 let _ = self.message_tx.send(AgentMessage::Error { message: e.to_string() }).await;
316 return;
317 }
318 };
319
320 match response {
321 Start { message_id } => {
322 state.on_llm_start(message_id);
323 }
324
325 Text { chunk } => {
326 self.handle_llm_text(chunk, state).await;
327 }
328
329 Reasoning { chunk } => {
330 state.reasoning_summary_text.push_str(&chunk);
331 if let Some(id) = &state.current_message_id {
332 let _ = self
333 .message_tx
334 .send(AgentMessage::Thought {
335 message_id: id.clone(),
336 chunk,
337 is_complete: false,
338 model_name: self.llm.display_name(),
339 })
340 .await;
341 }
342 }
343
344 EncryptedReasoning { id, content } => {
345 if let Some(model) = self.llm.model() {
346 state.encrypted_reasoning = Some(EncryptedReasoningContent { id, model, content });
347 }
348 }
349
350 ToolRequestStart { id, name } => {
351 self.handle_tool_request_start(id, name).await;
352 }
353
354 ToolRequestArg { id, chunk } => {
355 self.handle_tool_request_arg(id, chunk).await;
356 }
357
358 ToolRequestComplete { tool_call } => {
359 self.handle_tool_completion(tool_call, state).await;
360 }
361
362 Done { stop_reason } => {
363 state.llm_done = true;
364 state.stop_reason = stop_reason;
365 }
366
367 Error { message } => {
368 let _ = self.message_tx.send(AgentMessage::Error { message }).await;
369 }
370
371 Usage { tokens: sample } => {
372 self.handle_llm_usage(sample).await;
373 }
374 }
375 }
376
377 async fn handle_llm_text(&mut self, chunk: String, state: &mut IterationState) {
378 state.message_content.push_str(&chunk);
379
380 if let Some(id) = &state.current_message_id {
381 let _ = self
382 .message_tx
383 .send(AgentMessage::Text {
384 message_id: id.clone(),
385 chunk,
386 is_complete: false,
387 model_name: self.llm.display_name(),
388 })
389 .await;
390 }
391 }
392
393 async fn handle_tool_request_start(&mut self, id: String, name: String) {
394 let request = ToolCallRequest { id: id.clone(), name, arguments: String::new() };
395 self.active_requests.insert(id, request.clone());
396
397 let _ = self.message_tx.send(AgentMessage::ToolCall { request, model_name: self.llm.display_name() }).await;
398 }
399
400 async fn handle_tool_request_arg(&mut self, id: String, chunk: String) {
401 let Some(request) = self.active_requests.get_mut(&id) else {
402 return;
403 };
404 request.arguments.push_str(&chunk);
405
406 let _ = self
407 .message_tx
408 .send(AgentMessage::ToolCallUpdate { tool_call_id: id, chunk, model_name: self.llm.display_name() })
409 .await;
410 }
411
412 async fn handle_tool_completion(&mut self, tool_call: ToolCallRequest, state: &mut IterationState) {
413 state.pending_tool_ids.insert(tool_call.id.clone());
414 debug_assert!(
415 self.active_requests.contains_key(&tool_call.id),
416 "tool call {} should already be in active_requests from handle_tool_request_start",
417 tool_call.id
418 );
419
420 let (tx, rx) = mpsc::channel(100);
421 let stream = ReceiverStream::new(rx).map(StreamEvent::ToolExecution);
422 let stream_key = tool_call.id.clone();
423 self.streams.insert(stream_key, Box::pin(stream));
424
425 if let Some(ref mcp_command_tx) = self.mcp_command_tx {
426 let mcp_future =
427 mcp_command_tx.send(McpCommand::ExecuteTool { request: tool_call, timeout: self.tool_timeout, tx });
428 if let Err(e) = mcp_future.await {
429 tracing::warn!("Failed to send tool request to MCP task: {:?}", e);
430 }
431 }
432 }
433
434 async fn handle_llm_usage(&mut self, sample: TokenUsage) {
435 self.token_tracker.record_usage(sample);
436 let ratio_pct = self.token_tracker.usage_ratio().map(|r| r * 100.0);
437 let remaining = self.token_tracker.tokens_remaining();
438 tracing::debug!(?sample, ?ratio_pct, ?remaining, "Token usage");
439
440 let _ = self.message_tx.send(self.context_usage_message()).await;
441
442 self.maybe_compact_context().await;
443 }
444
445 fn context_usage_message(&self) -> AgentMessage {
446 let last = self.token_tracker.last_usage();
447 AgentMessage::ContextUsageUpdate {
448 usage_ratio: self.token_tracker.usage_ratio(),
449 context_limit: self.token_tracker.context_limit(),
450 input_tokens: last.input_tokens,
451 output_tokens: last.output_tokens,
452 cache_read_tokens: last.cache_read_tokens,
453 cache_creation_tokens: last.cache_creation_tokens,
454 reasoning_tokens: last.reasoning_tokens,
455 total_input_tokens: self.token_tracker.total_input_tokens(),
456 total_output_tokens: self.token_tracker.total_output_tokens(),
457 total_cache_read_tokens: self.token_tracker.total_cache_read_tokens(),
458 total_cache_creation_tokens: self.token_tracker.total_cache_creation_tokens(),
459 total_reasoning_tokens: self.token_tracker.total_reasoning_tokens(),
460 }
461 }
462
463 async fn maybe_preflight_compact(&mut self) {
467 let Some(context_limit) = self.token_tracker.context_limit() else {
468 return;
469 };
470 let Some(config) = self.compaction_config.as_ref() else {
471 return;
472 };
473 let estimated = self.context.estimated_token_count();
474 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
475 let threshold = (f64::from(context_limit) * config.threshold).ceil() as u32;
476 if estimated >= threshold {
477 tracing::info!(
478 "Pre-flight compaction triggered: estimated {estimated} tokens >= {:.1}% of {context_limit} limit",
479 config.threshold * 100.0
480 );
481 if let CompactionOutcome::Failed(e) = self.compact_context().await {
482 tracing::warn!("Pre-flight compaction failed: {e}");
483 }
484 }
485 }
486
487 async fn maybe_compact_context(&mut self) {
489 if !self.compaction_config.as_ref().is_some_and(|config| self.token_tracker.should_compact(config.threshold)) {
490 return;
491 }
492
493 if let CompactionOutcome::Failed(error_message) = self.compact_context().await {
494 tracing::warn!("Context compaction failed: {}", error_message);
495 }
496 }
497
498 async fn compact_context(&mut self) -> CompactionOutcome {
499 let Some(ref _config) = self.compaction_config else {
500 tracing::warn!("Context compaction requested but compaction is disabled");
501 return CompactionOutcome::SkippedDisabled;
502 };
503
504 match self.token_tracker.usage_ratio() {
505 Some(usage_ratio) => {
506 tracing::info!(
507 "Starting context compaction - {} messages, {:.1}% of context limit",
508 self.context.message_count(),
509 usage_ratio * 100.0
510 );
511 }
512 None => {
513 tracing::info!(
514 "Starting context compaction - {} messages (context limit unknown)",
515 self.context.message_count(),
516 );
517 }
518 }
519
520 let _ = self
521 .message_tx
522 .send(AgentMessage::ContextCompactionStarted { message_count: self.context.message_count() })
523 .await;
524
525 let compactor = Compactor::new(self.llm.clone());
526
527 match compactor.compact(&self.context).await {
528 Ok(result) => {
529 tracing::info!("Context compacted: {} messages removed", result.messages_removed);
530
531 self.context = result.context;
532 self.token_tracker.reset_current_usage();
533
534 let _ = self
535 .message_tx
536 .send(AgentMessage::ContextCompactionResult {
537 summary: result.summary,
538 messages_removed: result.messages_removed,
539 })
540 .await;
541 CompactionOutcome::Compacted
542 }
543 Err(e) => CompactionOutcome::Failed(e.to_string()),
544 }
545 }
546
547 async fn on_tool_execution_event(&mut self, event: ToolExecutionEvent, state: &mut IterationState) {
548 match event {
549 ToolExecutionEvent::Started { tool_id, tool_name } => {
550 tracing::debug!("Tool execution started: {} ({})", tool_name, tool_id);
551 }
552
553 ToolExecutionEvent::Progress { tool_id, progress } => {
554 tracing::debug!(
555 "Tool progress for {}: {}/{}",
556 tool_id,
557 progress.progress,
558 progress.total.unwrap_or(0.0)
559 );
560
561 if let Some(request) = self.active_requests.get(&tool_id) {
562 let _ = self
563 .message_tx
564 .send(AgentMessage::ToolProgress {
565 request: request.clone(),
566 progress: progress.progress,
567 total: progress.total,
568 message: progress.message.clone(),
569 })
570 .await;
571 }
572 }
573
574 ToolExecutionEvent::Complete { tool_id: _, result, result_meta } => match result {
575 Ok(tool_result) => {
576 tracing::debug!("Tool result received: {} -> {}", tool_result.name, tool_result.result.len());
577
578 if state.pending_tool_ids.remove(&tool_result.id) {
579 self.active_requests.remove(&tool_result.id);
580 state.completed_tool_calls.push(Ok(tool_result.clone()));
581
582 let msg = AgentMessage::ToolResult {
583 result: tool_result,
584 result_meta,
585 model_name: self.llm.display_name(),
586 };
587
588 if let Err(e) = self.message_tx.send(msg).await {
589 tracing::warn!("Failed to send ToolCall completion message: {:?}", e);
590 }
591 } else {
592 tracing::debug!("Ignoring stale tool result for id: {}", tool_result.id);
593 }
594 }
595
596 Err(tool_error) => {
597 if state.pending_tool_ids.remove(&tool_error.id) {
598 self.active_requests.remove(&tool_error.id);
599 state.completed_tool_calls.push(Err(tool_error.clone()));
600
601 let _ = self
602 .message_tx
603 .send(AgentMessage::ToolError { error: tool_error, model_name: self.llm.display_name() })
604 .await;
605 }
606 }
607 },
608 }
609 }
610
611 fn update_context(
612 &mut self,
613 message_content: &str,
614 reasoning: AssistantReasoning,
615 completed_tools: Vec<Result<ToolCallResult, ToolCallError>>,
616 ) {
617 self.context.push_assistant_turn(message_content, reasoning, completed_tools);
618 }
619}
620
621#[derive(Debug, Clone, PartialEq, Eq)]
622enum CompactionOutcome {
623 Compacted,
624 SkippedDisabled,
625 Failed(String),
626}
627
628pub(crate) struct AutoContinue {
629 max: u32,
630 count: u32,
631}
632
633impl AutoContinue {
634 pub(crate) fn new(max: u32) -> Self {
635 Self { max, count: 0 }
636 }
637
638 fn reset(&mut self) {
639 self.count = 0;
640 }
641
642 fn should_continue(&self, stop_reason: Option<&StopReason>) -> bool {
643 matches!(stop_reason, Some(StopReason::Length)) && self.count < self.max
644 }
645
646 fn advance(&mut self) {
647 self.count += 1;
648 }
649
650 fn count(&self) -> u32 {
651 self.count
652 }
653
654 fn max(&self) -> u32 {
655 self.max
656 }
657}
658
659#[derive(Debug)]
660struct IterationState {
661 current_message_id: Option<String>,
662 message_content: String,
663 reasoning_summary_text: String,
664 encrypted_reasoning: Option<EncryptedReasoningContent>,
665 pending_tool_ids: HashSet<String>,
666 completed_tool_calls: Vec<Result<ToolCallResult, ToolCallError>>,
667 llm_done: bool,
668 stop_reason: Option<StopReason>,
669}
670
671impl IterationState {
672 fn new() -> Self {
673 Self {
674 current_message_id: None,
675 message_content: String::new(),
676 reasoning_summary_text: String::new(),
677 encrypted_reasoning: None,
678 pending_tool_ids: HashSet::new(),
679 completed_tool_calls: Vec::new(),
680 llm_done: false,
681 stop_reason: None,
682 }
683 }
684
685 fn on_llm_start(&mut self, message_id: String) {
686 self.current_message_id = Some(message_id);
687 self.message_content.clear();
688 self.reasoning_summary_text.clear();
689 self.encrypted_reasoning = None;
690 self.stop_reason = None;
691 }
692
693 fn is_complete(&self) -> bool {
694 self.llm_done && self.pending_tool_ids.is_empty()
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use llm::testing::FakeLlmProvider;
702 use tokio::sync::mpsc;
703
704 #[tokio::test]
705 async fn test_preflight_compaction_uses_configured_threshold() {
706 let llm = Arc::new(
707 FakeLlmProvider::with_single_response(vec![
708 LlmResponse::start("summary"),
709 LlmResponse::text("summary"),
710 LlmResponse::done(),
711 ])
712 .with_context_window(Some(100)),
713 );
714 let context = Context::new(
715 vec![ChatMessage::User {
716 content: vec![llm::ContentBlock::text("x".repeat(344))],
717 timestamp: IsoString::now(),
718 }],
719 vec![],
720 );
721 let (user_tx, user_rx) = mpsc::channel(1);
722 let (message_tx, _message_rx) = mpsc::channel(8);
723 drop(user_tx);
724
725 let mut agent = Agent::new(
726 AgentConfig {
727 llm,
728 context,
729 mcp_command_tx: None,
730 tool_timeout: Duration::from_secs(1),
731 compaction_config: Some(CompactionConfig::with_threshold(0.85)),
732 auto_continue: AutoContinue::new(0),
733 },
734 user_rx,
735 message_tx,
736 );
737
738 agent.maybe_preflight_compact().await;
739
740 assert!(
741 matches!(
742 agent.context.messages().as_slice(),
743 [ChatMessage::Summary { content, .. }] if content == "summary"
744 ),
745 "expected context to be compacted, got {:?}",
746 agent.context.messages()
747 );
748 }
749}