Skip to main content

aether_core/core/
agent.rs

1use crate::context::{CompactionConfig, Compactor, TokenTracker};
2use crate::events::{AgentMessage, UserMessage};
3use crate::mcp::run_mcp_task::{McpCommand, ToolExecutionEvent};
4use futures::Stream;
5use llm::types::IsoString;
6use llm::{
7    AssistantReasoning, ChatMessage, Context, EncryptedReasoningContent, LlmError, LlmResponse, StopReason,
8    StreamingModelProvider, TokenUsage, ToolCallError, ToolCallRequest, ToolCallResult,
9};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio_stream::StreamExt;
16use tokio_stream::StreamMap;
17use tokio_stream::wrappers::ReceiverStream;
18
19/// Internal event type for merging LLM and tool result streams
20#[derive(Debug)]
21enum StreamEvent {
22    Llm(Result<LlmResponse, LlmError>),
23    ToolExecution(ToolExecutionEvent),
24    UserMessage(UserMessage),
25}
26
27type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
28
29const USER_STREAM_KEY: &str = "user";
30const LLM_STREAM_KEY: &str = "llm";
31
32pub(crate) struct AgentConfig {
33    pub llm: Arc<dyn StreamingModelProvider>,
34    pub context: Context,
35    pub mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
36    pub tool_timeout: Duration,
37    pub compaction_config: Option<CompactionConfig>,
38    pub auto_continue: AutoContinue,
39}
40
41pub struct Agent {
42    llm: Arc<dyn StreamingModelProvider>,
43    context: Context,
44    mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
45    message_tx: mpsc::Sender<AgentMessage>,
46    streams: StreamMap<String, EventStream>,
47    tool_timeout: Duration,
48    token_tracker: TokenTracker,
49    compaction_config: Option<CompactionConfig>,
50    auto_continue: AutoContinue,
51    active_requests: HashMap<String, ToolCallRequest>,
52    queued_user_messages: VecDeque<Vec<llm::ContentBlock>>,
53}
54
55impl Agent {
56    pub(crate) fn new(
57        config: AgentConfig,
58        user_message_rx: mpsc::Receiver<UserMessage>,
59        message_tx: mpsc::Sender<AgentMessage>,
60    ) -> Self {
61        let mut streams: StreamMap<String, EventStream> = StreamMap::new();
62        streams.insert(
63            USER_STREAM_KEY.to_string(),
64            Box::pin(ReceiverStream::new(user_message_rx).map(StreamEvent::UserMessage)),
65        );
66
67        let context_limit = config.llm.context_window();
68
69        Self {
70            llm: config.llm,
71            context: config.context,
72            mcp_command_tx: config.mcp_command_tx,
73            message_tx,
74            streams,
75            tool_timeout: config.tool_timeout,
76            token_tracker: TokenTracker::new(context_limit),
77            compaction_config: config.compaction_config,
78            auto_continue: config.auto_continue,
79            active_requests: HashMap::new(),
80            queued_user_messages: VecDeque::new(),
81        }
82    }
83
84    pub fn current_model_display_name(&self) -> String {
85        self.llm.display_name()
86    }
87
88    /// Get a reference to the token tracker
89    pub fn token_tracker(&self) -> &TokenTracker {
90        &self.token_tracker
91    }
92
93    pub async fn run(mut self) {
94        let mut state = IterationState::new();
95
96        while let Some((_, event)) = self.streams.next().await {
97            use UserMessage::{Cancel, ClearContext, SetReasoningEffort, SwitchModel, Text, UpdateTools};
98            match event {
99                StreamEvent::UserMessage(Cancel) => {
100                    self.on_user_cancel(&mut state).await;
101                }
102
103                StreamEvent::UserMessage(ClearContext) => {
104                    self.on_user_clear_context(&mut state).await;
105                }
106
107                StreamEvent::UserMessage(Text { content }) => {
108                    if self.is_busy() {
109                        self.queued_user_messages.push_back(content);
110                    } else {
111                        state = IterationState::new();
112                        self.on_user_text(content);
113                    }
114                }
115
116                StreamEvent::UserMessage(SwitchModel(new_provider)) => {
117                    self.on_switch_model(new_provider).await;
118                }
119
120                StreamEvent::UserMessage(UpdateTools(tools)) => {
121                    self.context.set_tools(tools);
122                }
123
124                StreamEvent::UserMessage(SetReasoningEffort(effort)) => {
125                    self.context.set_reasoning_effort(effort);
126                }
127
128                StreamEvent::Llm(llm_event) => {
129                    self.on_llm_event(llm_event, &mut state).await;
130                }
131
132                StreamEvent::ToolExecution(tool_event) => {
133                    self.on_tool_execution_event(tool_event, &mut state).await;
134                }
135            }
136
137            if state.is_complete() {
138                let Some(id) = state.current_message_id.take() else {
139                    continue;
140                };
141                let iteration = std::mem::replace(&mut state, IterationState::new());
142                self.on_iteration_complete(id, iteration).await;
143            }
144        }
145
146        tracing::debug!("Agent task shutting down - input channel closed");
147    }
148
149    async fn on_iteration_complete(&mut self, id: String, iteration: IterationState) {
150        let IterationState {
151            message_content,
152            reasoning_summary_text,
153            encrypted_reasoning,
154            completed_tool_calls,
155            stop_reason,
156            ..
157        } = iteration;
158        let has_tool_calls = !completed_tool_calls.is_empty();
159        let has_content = !message_content.is_empty() || has_tool_calls;
160        let should_auto_continue = self.auto_continue.should_continue(stop_reason.as_ref());
161
162        if has_content {
163            let reasoning = AssistantReasoning::from_parts(reasoning_summary_text.clone(), encrypted_reasoning);
164            self.update_context(&message_content, reasoning, completed_tool_calls);
165
166            let _ = self
167                .message_tx
168                .send(AgentMessage::Text {
169                    message_id: id.clone(),
170                    chunk: message_content.clone(),
171                    is_complete: true,
172                    model_name: self.llm.display_name(),
173                })
174                .await;
175
176            if !reasoning_summary_text.is_empty() {
177                let _ = self
178                    .message_tx
179                    .send(AgentMessage::Thought {
180                        message_id: id.clone(),
181                        chunk: reasoning_summary_text,
182                        is_complete: true,
183                        model_name: self.llm.display_name(),
184                    })
185                    .await;
186            }
187        }
188
189        let has_queued_text = !self.queued_user_messages.is_empty();
190        if has_queued_text {
191            let content: Vec<_> = self.queued_user_messages.drain(..).flatten().collect();
192            self.context.add_message(ChatMessage::User { content, timestamp: IsoString::now() });
193        }
194
195        if has_queued_text || has_tool_calls {
196            self.auto_continue.reset();
197            self.start_next_turn().await;
198        } else if should_auto_continue {
199            self.auto_continue.advance();
200            tracing::info!(
201                "LLM stopped with {:?}, auto-continuing (attempt {}/{})",
202                stop_reason,
203                self.auto_continue.count(),
204                self.auto_continue.max()
205            );
206
207            let _ = self
208                .message_tx
209                .send(AgentMessage::AutoContinue {
210                    attempt: self.auto_continue.count(),
211                    max_attempts: self.auto_continue.max(),
212                })
213                .await;
214
215            self.inject_continuation_prompt(&message_content, stop_reason.as_ref());
216            self.start_next_turn().await;
217        } else {
218            tracing::debug!("LLM completed turn with stop reason: {:?}", stop_reason);
219            self.auto_continue.reset();
220            if let Err(e) = self.message_tx.send(AgentMessage::Done).await {
221                tracing::warn!("Failed to send Done message: {:?}", e);
222            }
223        }
224    }
225
226    async fn start_next_turn(&mut self) {
227        self.maybe_preflight_compact().await;
228        self.start_llm_stream();
229    }
230
231    async fn on_user_cancel(&mut self, state: &mut IterationState) {
232        self.abort_in_flight_work();
233        *state = IterationState::new();
234        let _ = self.message_tx.send(AgentMessage::Cancelled { message: "Processing cancelled".to_string() }).await;
235        let _ = self.message_tx.send(AgentMessage::Done).await;
236    }
237
238    async fn on_user_clear_context(&mut self, state: &mut IterationState) {
239        self.abort_in_flight_work();
240        self.context.clear_conversation();
241        self.token_tracker.reset_current_usage();
242        self.auto_continue.reset();
243        *state = IterationState::new();
244
245        let _ = self.message_tx.send(AgentMessage::ContextCleared).await;
246    }
247
248    fn on_user_text(&mut self, content: Vec<llm::ContentBlock>) {
249        self.context.add_message(ChatMessage::User { content, timestamp: IsoString::now() });
250        self.auto_continue.reset();
251        self.start_llm_stream();
252    }
253
254    async fn on_switch_model(&mut self, new_provider: Box<dyn StreamingModelProvider>) {
255        let previous = self.llm.display_name();
256        let new_context_limit = new_provider.context_window();
257        self.llm = Arc::from(new_provider);
258        self.token_tracker.reset_current_usage();
259        self.token_tracker.set_context_limit(new_context_limit);
260        let new = self.llm.display_name();
261        let _ = self.message_tx.send(AgentMessage::ModelSwitched { previous, new }).await;
262
263        let _ = self.message_tx.send(self.context_usage_message()).await;
264    }
265
266    fn start_llm_stream(&mut self) {
267        self.streams.remove(LLM_STREAM_KEY);
268        let llm_stream = self.llm.stream_response(&self.context).map(StreamEvent::Llm);
269        self.streams.insert(LLM_STREAM_KEY.to_string(), Box::pin(llm_stream));
270    }
271
272    fn is_busy(&self) -> bool {
273        self.streams.contains_key(LLM_STREAM_KEY) || !self.active_requests.is_empty()
274    }
275
276    fn abort_in_flight_work(&mut self) {
277        self.streams.remove(LLM_STREAM_KEY);
278        for stream_key in self.active_requests.keys().cloned().collect::<Vec<_>>() {
279            self.streams.remove(&stream_key);
280        }
281        self.active_requests.clear();
282        self.queued_user_messages.clear();
283    }
284
285    /// Inject a continuation prompt when the LLM stops due to a resumable reason.
286    fn inject_continuation_prompt(&mut self, previous_response: &str, stop_reason: Option<&StopReason>) {
287        if !previous_response.is_empty() {
288            self.context.add_message(ChatMessage::Assistant {
289                content: previous_response.to_string(),
290                reasoning: AssistantReasoning::default(),
291                timestamp: IsoString::now(),
292                tool_calls: Vec::new(),
293            });
294        }
295
296        let reason = stop_reason.map_or_else(|| "Unknown".to_string(), |reason| format!("{reason:?}"));
297
298        self.context.add_message(ChatMessage::User {
299            content: vec![llm::ContentBlock::text(format!(
300                "<system-notification>The LLM API stopped with reason '{reason}'. Continue from where you left off and finish your task.</system-notification>"
301            ))],
302            timestamp: IsoString::now(),
303        });
304    }
305
306    async fn on_llm_event(&mut self, result: Result<LlmResponse, LlmError>, state: &mut IterationState) {
307        use LlmResponse::{
308            Done, EncryptedReasoning, Error, Reasoning, Start, Text, ToolRequestArg, ToolRequestComplete,
309            ToolRequestStart, Usage,
310        };
311
312        let response = match result {
313            Ok(response) => response,
314            Err(e) => {
315                let _ = self.message_tx.send(AgentMessage::Error { message: e.to_string() }).await;
316                return;
317            }
318        };
319
320        match response {
321            Start { message_id } => {
322                state.on_llm_start(message_id);
323            }
324
325            Text { chunk } => {
326                self.handle_llm_text(chunk, state).await;
327            }
328
329            Reasoning { chunk } => {
330                state.reasoning_summary_text.push_str(&chunk);
331                if let Some(id) = &state.current_message_id {
332                    let _ = self
333                        .message_tx
334                        .send(AgentMessage::Thought {
335                            message_id: id.clone(),
336                            chunk,
337                            is_complete: false,
338                            model_name: self.llm.display_name(),
339                        })
340                        .await;
341                }
342            }
343
344            EncryptedReasoning { id, content } => {
345                if let Some(model) = self.llm.model() {
346                    state.encrypted_reasoning = Some(EncryptedReasoningContent { id, model, content });
347                }
348            }
349
350            ToolRequestStart { id, name } => {
351                self.handle_tool_request_start(id, name).await;
352            }
353
354            ToolRequestArg { id, chunk } => {
355                self.handle_tool_request_arg(id, chunk).await;
356            }
357
358            ToolRequestComplete { tool_call } => {
359                self.handle_tool_completion(tool_call, state).await;
360            }
361
362            Done { stop_reason } => {
363                state.llm_done = true;
364                state.stop_reason = stop_reason;
365            }
366
367            Error { message } => {
368                let _ = self.message_tx.send(AgentMessage::Error { message }).await;
369            }
370
371            Usage { tokens: sample } => {
372                self.handle_llm_usage(sample).await;
373            }
374        }
375    }
376
377    async fn handle_llm_text(&mut self, chunk: String, state: &mut IterationState) {
378        state.message_content.push_str(&chunk);
379
380        if let Some(id) = &state.current_message_id {
381            let _ = self
382                .message_tx
383                .send(AgentMessage::Text {
384                    message_id: id.clone(),
385                    chunk,
386                    is_complete: false,
387                    model_name: self.llm.display_name(),
388                })
389                .await;
390        }
391    }
392
393    async fn handle_tool_request_start(&mut self, id: String, name: String) {
394        let request = ToolCallRequest { id: id.clone(), name, arguments: String::new() };
395        self.active_requests.insert(id, request.clone());
396
397        let _ = self.message_tx.send(AgentMessage::ToolCall { request, model_name: self.llm.display_name() }).await;
398    }
399
400    async fn handle_tool_request_arg(&mut self, id: String, chunk: String) {
401        let Some(request) = self.active_requests.get_mut(&id) else {
402            return;
403        };
404        request.arguments.push_str(&chunk);
405
406        let _ = self
407            .message_tx
408            .send(AgentMessage::ToolCallUpdate { tool_call_id: id, chunk, model_name: self.llm.display_name() })
409            .await;
410    }
411
412    async fn handle_tool_completion(&mut self, tool_call: ToolCallRequest, state: &mut IterationState) {
413        state.pending_tool_ids.insert(tool_call.id.clone());
414        debug_assert!(
415            self.active_requests.contains_key(&tool_call.id),
416            "tool call {} should already be in active_requests from handle_tool_request_start",
417            tool_call.id
418        );
419
420        let (tx, rx) = mpsc::channel(100);
421        let stream = ReceiverStream::new(rx).map(StreamEvent::ToolExecution);
422        let stream_key = tool_call.id.clone();
423        self.streams.insert(stream_key, Box::pin(stream));
424
425        if let Some(ref mcp_command_tx) = self.mcp_command_tx {
426            let mcp_future =
427                mcp_command_tx.send(McpCommand::ExecuteTool { request: tool_call, timeout: self.tool_timeout, tx });
428            if let Err(e) = mcp_future.await {
429                tracing::warn!("Failed to send tool request to MCP task: {:?}", e);
430            }
431        }
432    }
433
434    async fn handle_llm_usage(&mut self, sample: TokenUsage) {
435        self.token_tracker.record_usage(sample);
436        let ratio_pct = self.token_tracker.usage_ratio().map(|r| r * 100.0);
437        let remaining = self.token_tracker.tokens_remaining();
438        tracing::debug!(?sample, ?ratio_pct, ?remaining, "Token usage");
439
440        let _ = self.message_tx.send(self.context_usage_message()).await;
441
442        self.maybe_compact_context().await;
443    }
444
445    fn context_usage_message(&self) -> AgentMessage {
446        let last = self.token_tracker.last_usage();
447        AgentMessage::ContextUsageUpdate {
448            usage_ratio: self.token_tracker.usage_ratio(),
449            context_limit: self.token_tracker.context_limit(),
450            input_tokens: last.input_tokens,
451            output_tokens: last.output_tokens,
452            cache_read_tokens: last.cache_read_tokens,
453            cache_creation_tokens: last.cache_creation_tokens,
454            reasoning_tokens: last.reasoning_tokens,
455            total_input_tokens: self.token_tracker.total_input_tokens(),
456            total_output_tokens: self.token_tracker.total_output_tokens(),
457            total_cache_read_tokens: self.token_tracker.total_cache_read_tokens(),
458            total_cache_creation_tokens: self.token_tracker.total_cache_creation_tokens(),
459            total_reasoning_tokens: self.token_tracker.total_reasoning_tokens(),
460        }
461    }
462
463    /// Pre-flight check: estimate context size and compact proactively if it would
464    /// overflow before the LLM even sees it. This catches the case where large tool
465    /// results push context past the limit before usage-based compaction can fire.
466    async fn maybe_preflight_compact(&mut self) {
467        let Some(context_limit) = self.token_tracker.context_limit() else {
468            return;
469        };
470        let Some(config) = self.compaction_config.as_ref() else {
471            return;
472        };
473        let estimated = self.context.estimated_token_count();
474        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
475        let threshold = (f64::from(context_limit) * config.threshold).ceil() as u32;
476        if estimated >= threshold {
477            tracing::info!(
478                "Pre-flight compaction triggered: estimated {estimated} tokens >= {:.1}% of {context_limit} limit",
479                config.threshold * 100.0
480            );
481            if let CompactionOutcome::Failed(e) = self.compact_context().await {
482                tracing::warn!("Pre-flight compaction failed: {e}");
483            }
484        }
485    }
486
487    /// Check if compaction is needed and perform it if so.
488    async fn maybe_compact_context(&mut self) {
489        if !self.compaction_config.as_ref().is_some_and(|config| self.token_tracker.should_compact(config.threshold)) {
490            return;
491        }
492
493        if let CompactionOutcome::Failed(error_message) = self.compact_context().await {
494            tracing::warn!("Context compaction failed: {}", error_message);
495        }
496    }
497
498    async fn compact_context(&mut self) -> CompactionOutcome {
499        let Some(ref _config) = self.compaction_config else {
500            tracing::warn!("Context compaction requested but compaction is disabled");
501            return CompactionOutcome::SkippedDisabled;
502        };
503
504        match self.token_tracker.usage_ratio() {
505            Some(usage_ratio) => {
506                tracing::info!(
507                    "Starting context compaction - {} messages, {:.1}% of context limit",
508                    self.context.message_count(),
509                    usage_ratio * 100.0
510                );
511            }
512            None => {
513                tracing::info!(
514                    "Starting context compaction - {} messages (context limit unknown)",
515                    self.context.message_count(),
516                );
517            }
518        }
519
520        let _ = self
521            .message_tx
522            .send(AgentMessage::ContextCompactionStarted { message_count: self.context.message_count() })
523            .await;
524
525        let compactor = Compactor::new(self.llm.clone());
526
527        match compactor.compact(&self.context).await {
528            Ok(result) => {
529                tracing::info!("Context compacted: {} messages removed", result.messages_removed);
530
531                self.context = result.context;
532                self.token_tracker.reset_current_usage();
533
534                let _ = self
535                    .message_tx
536                    .send(AgentMessage::ContextCompactionResult {
537                        summary: result.summary,
538                        messages_removed: result.messages_removed,
539                    })
540                    .await;
541                CompactionOutcome::Compacted
542            }
543            Err(e) => CompactionOutcome::Failed(e.to_string()),
544        }
545    }
546
547    async fn on_tool_execution_event(&mut self, event: ToolExecutionEvent, state: &mut IterationState) {
548        match event {
549            ToolExecutionEvent::Started { tool_id, tool_name } => {
550                tracing::debug!("Tool execution started: {} ({})", tool_name, tool_id);
551            }
552
553            ToolExecutionEvent::Progress { tool_id, progress } => {
554                tracing::debug!(
555                    "Tool progress for {}: {}/{}",
556                    tool_id,
557                    progress.progress,
558                    progress.total.unwrap_or(0.0)
559                );
560
561                if let Some(request) = self.active_requests.get(&tool_id) {
562                    let _ = self
563                        .message_tx
564                        .send(AgentMessage::ToolProgress {
565                            request: request.clone(),
566                            progress: progress.progress,
567                            total: progress.total,
568                            message: progress.message.clone(),
569                        })
570                        .await;
571                }
572            }
573
574            ToolExecutionEvent::Complete { tool_id: _, result, result_meta } => match result {
575                Ok(tool_result) => {
576                    tracing::debug!("Tool result received: {} -> {}", tool_result.name, tool_result.result.len());
577
578                    if state.pending_tool_ids.remove(&tool_result.id) {
579                        self.active_requests.remove(&tool_result.id);
580                        state.completed_tool_calls.push(Ok(tool_result.clone()));
581
582                        let msg = AgentMessage::ToolResult {
583                            result: tool_result,
584                            result_meta,
585                            model_name: self.llm.display_name(),
586                        };
587
588                        if let Err(e) = self.message_tx.send(msg).await {
589                            tracing::warn!("Failed to send ToolCall completion message: {:?}", e);
590                        }
591                    } else {
592                        tracing::debug!("Ignoring stale tool result for id: {}", tool_result.id);
593                    }
594                }
595
596                Err(tool_error) => {
597                    if state.pending_tool_ids.remove(&tool_error.id) {
598                        self.active_requests.remove(&tool_error.id);
599                        state.completed_tool_calls.push(Err(tool_error.clone()));
600
601                        let _ = self
602                            .message_tx
603                            .send(AgentMessage::ToolError { error: tool_error, model_name: self.llm.display_name() })
604                            .await;
605                    }
606                }
607            },
608        }
609    }
610
611    fn update_context(
612        &mut self,
613        message_content: &str,
614        reasoning: AssistantReasoning,
615        completed_tools: Vec<Result<ToolCallResult, ToolCallError>>,
616    ) {
617        self.context.push_assistant_turn(message_content, reasoning, completed_tools);
618    }
619}
620
621#[derive(Debug, Clone, PartialEq, Eq)]
622enum CompactionOutcome {
623    Compacted,
624    SkippedDisabled,
625    Failed(String),
626}
627
628pub(crate) struct AutoContinue {
629    max: u32,
630    count: u32,
631}
632
633impl AutoContinue {
634    pub(crate) fn new(max: u32) -> Self {
635        Self { max, count: 0 }
636    }
637
638    fn reset(&mut self) {
639        self.count = 0;
640    }
641
642    fn should_continue(&self, stop_reason: Option<&StopReason>) -> bool {
643        matches!(stop_reason, Some(StopReason::Length)) && self.count < self.max
644    }
645
646    fn advance(&mut self) {
647        self.count += 1;
648    }
649
650    fn count(&self) -> u32 {
651        self.count
652    }
653
654    fn max(&self) -> u32 {
655        self.max
656    }
657}
658
659#[derive(Debug)]
660struct IterationState {
661    current_message_id: Option<String>,
662    message_content: String,
663    reasoning_summary_text: String,
664    encrypted_reasoning: Option<EncryptedReasoningContent>,
665    pending_tool_ids: HashSet<String>,
666    completed_tool_calls: Vec<Result<ToolCallResult, ToolCallError>>,
667    llm_done: bool,
668    stop_reason: Option<StopReason>,
669}
670
671impl IterationState {
672    fn new() -> Self {
673        Self {
674            current_message_id: None,
675            message_content: String::new(),
676            reasoning_summary_text: String::new(),
677            encrypted_reasoning: None,
678            pending_tool_ids: HashSet::new(),
679            completed_tool_calls: Vec::new(),
680            llm_done: false,
681            stop_reason: None,
682        }
683    }
684
685    fn on_llm_start(&mut self, message_id: String) {
686        self.current_message_id = Some(message_id);
687        self.message_content.clear();
688        self.reasoning_summary_text.clear();
689        self.encrypted_reasoning = None;
690        self.stop_reason = None;
691    }
692
693    fn is_complete(&self) -> bool {
694        self.llm_done && self.pending_tool_ids.is_empty()
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use llm::testing::FakeLlmProvider;
702    use tokio::sync::mpsc;
703
704    #[tokio::test]
705    async fn test_preflight_compaction_uses_configured_threshold() {
706        let llm = Arc::new(
707            FakeLlmProvider::with_single_response(vec![
708                LlmResponse::start("summary"),
709                LlmResponse::text("summary"),
710                LlmResponse::done(),
711            ])
712            .with_context_window(Some(100)),
713        );
714        let context = Context::new(
715            vec![ChatMessage::User {
716                content: vec![llm::ContentBlock::text("x".repeat(344))],
717                timestamp: IsoString::now(),
718            }],
719            vec![],
720        );
721        let (user_tx, user_rx) = mpsc::channel(1);
722        let (message_tx, _message_rx) = mpsc::channel(8);
723        drop(user_tx);
724
725        let mut agent = Agent::new(
726            AgentConfig {
727                llm,
728                context,
729                mcp_command_tx: None,
730                tool_timeout: Duration::from_secs(1),
731                compaction_config: Some(CompactionConfig::with_threshold(0.85)),
732                auto_continue: AutoContinue::new(0),
733            },
734            user_rx,
735            message_tx,
736        );
737
738        agent.maybe_preflight_compact().await;
739
740        assert!(
741            matches!(
742                agent.context.messages().as_slice(),
743                [ChatMessage::Summary { content, .. }] if content == "summary"
744            ),
745            "expected context to be compacted, got {:?}",
746            agent.context.messages()
747        );
748    }
749}