Skip to main content

aether_core/core/
agent.rs

1use crate::context::{CompactionConfig, Compactor, TokenTracker};
2use crate::events::{AgentMessage, UserMessage};
3use crate::mcp::run_mcp_task::{McpCommand, ToolExecutionEvent};
4use futures::Stream;
5use llm::types::IsoString;
6use llm::{
7    AssistantReasoning, ChatMessage, Context, EncryptedReasoningContent, LlmError, LlmResponse,
8    StopReason, StreamingModelProvider, ToolCallError, ToolCallRequest, ToolCallResult,
9};
10use std::collections::{HashMap, HashSet};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio_stream::StreamExt;
16use tokio_stream::StreamMap;
17use tokio_stream::wrappers::ReceiverStream;
18
19/// Internal event type for merging LLM and tool result streams
20#[derive(Debug)]
21enum StreamEvent {
22    Llm(Result<LlmResponse, LlmError>),
23    ToolExecution(ToolExecutionEvent),
24    UserMessage(UserMessage),
25}
26
27type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
28
29pub(crate) struct AgentConfig {
30    pub llm: Arc<dyn StreamingModelProvider>,
31    pub context: Context,
32    pub mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
33    pub tool_timeout: Duration,
34    pub compaction_config: Option<CompactionConfig>,
35    pub auto_continue: AutoContinue,
36}
37
38pub struct Agent {
39    llm: Arc<dyn StreamingModelProvider>,
40    context: Context,
41    mcp_command_tx: Option<mpsc::Sender<McpCommand>>,
42    message_tx: mpsc::Sender<AgentMessage>,
43    streams: StreamMap<String, EventStream>,
44    tool_timeout: Duration,
45    token_tracker: TokenTracker,
46    compaction_config: Option<CompactionConfig>,
47    auto_continue: AutoContinue,
48    active_requests: HashMap<String, ToolCallRequest>,
49}
50
51impl Agent {
52    pub(crate) fn new(
53        config: AgentConfig,
54        user_message_rx: mpsc::Receiver<UserMessage>,
55        message_tx: mpsc::Sender<AgentMessage>,
56    ) -> Self {
57        let mut streams: StreamMap<String, EventStream> = StreamMap::new();
58        streams.insert(
59            "user".to_string(),
60            Box::pin(ReceiverStream::new(user_message_rx).map(StreamEvent::UserMessage)),
61        );
62
63        let context_limit = config.llm.context_window();
64
65        Self {
66            llm: config.llm,
67            context: config.context,
68            mcp_command_tx: config.mcp_command_tx,
69            message_tx,
70            streams,
71            tool_timeout: config.tool_timeout,
72            token_tracker: TokenTracker::new(context_limit),
73            compaction_config: config.compaction_config,
74            auto_continue: config.auto_continue,
75            active_requests: HashMap::new(),
76        }
77    }
78
79    pub fn current_model_display_name(&self) -> String {
80        self.llm.display_name()
81    }
82
83    /// Get a reference to the token tracker
84    pub fn token_tracker(&self) -> &TokenTracker {
85        &self.token_tracker
86    }
87
88    pub async fn run(mut self) {
89        let mut state = IterationState::new();
90
91        while let Some((_, event)) = self.streams.next().await {
92            use UserMessage::{
93                Cancel, ClearContext, SetReasoningEffort, SwitchModel, Text, UpdateTools,
94            };
95            match event {
96                StreamEvent::UserMessage(Cancel) => {
97                    self.on_user_cancel(&mut state).await;
98                }
99
100                StreamEvent::UserMessage(ClearContext) => {
101                    self.on_user_clear_context(&mut state).await;
102                }
103
104                StreamEvent::UserMessage(Text { content }) => {
105                    state = IterationState::new();
106                    self.on_user_text(content);
107                }
108
109                StreamEvent::UserMessage(SwitchModel(new_provider)) => {
110                    self.on_switch_model(new_provider).await;
111                }
112
113                StreamEvent::UserMessage(UpdateTools(tools)) => {
114                    self.context.set_tools(tools);
115                }
116
117                StreamEvent::UserMessage(SetReasoningEffort(effort)) => {
118                    self.context.set_reasoning_effort(effort);
119                }
120
121                StreamEvent::Llm(llm_event) => {
122                    if !state.cancelled {
123                        self.on_llm_event(llm_event, &mut state).await;
124                    }
125                }
126
127                StreamEvent::ToolExecution(tool_event) => {
128                    if !state.cancelled {
129                        self.on_tool_execution_event(tool_event, &mut state).await;
130                    }
131                }
132            }
133
134            if state.is_complete() {
135                let Some(id) = state.current_message_id.take() else {
136                    continue;
137                };
138                let iteration = std::mem::replace(&mut state, IterationState::new());
139                self.on_iteration_complete(id, iteration).await;
140            }
141        }
142
143        tracing::debug!("Agent task shutting down - input channel closed");
144    }
145
146    async fn on_iteration_complete(&mut self, id: String, iteration: IterationState) {
147        let IterationState {
148            message_content,
149            reasoning_summary_text,
150            encrypted_reasoning,
151            completed_tool_calls,
152            stop_reason,
153            ..
154        } = iteration;
155        let has_tool_calls = !completed_tool_calls.is_empty();
156        let has_content = !message_content.is_empty() || has_tool_calls;
157
158        // Skip context update for empty responses (e.g., API errors mid-stream)
159        if !has_content && !self.auto_continue.should_continue(stop_reason.as_ref()) {
160            let _ = self.message_tx.send(AgentMessage::Done).await;
161            return;
162        }
163
164        let reasoning =
165            AssistantReasoning::from_parts(reasoning_summary_text.clone(), encrypted_reasoning);
166        self.update_context(&message_content, reasoning, completed_tool_calls);
167
168        let _ = self
169            .message_tx
170            .send(AgentMessage::Text {
171                message_id: id.clone(),
172                chunk: message_content.clone(),
173                is_complete: true,
174                model_name: self.llm.display_name(),
175            })
176            .await;
177
178        if !reasoning_summary_text.is_empty() {
179            let _ = self
180                .message_tx
181                .send(AgentMessage::Thought {
182                    message_id: id.clone(),
183                    chunk: reasoning_summary_text,
184                    is_complete: true,
185                    model_name: self.llm.display_name(),
186                })
187                .await;
188        }
189
190        if has_tool_calls {
191            self.auto_continue.on_tool_calls();
192            self.maybe_preflight_compact().await;
193            self.start_llm_stream();
194        } else if self.auto_continue.should_continue(stop_reason.as_ref()) {
195            self.auto_continue.advance();
196            tracing::info!(
197                "LLM stopped with {:?}, auto-continuing (attempt {}/{})",
198                stop_reason,
199                self.auto_continue.count(),
200                self.auto_continue.max()
201            );
202
203            let _ = self
204                .message_tx
205                .send(AgentMessage::AutoContinue {
206                    attempt: self.auto_continue.count(),
207                    max_attempts: self.auto_continue.max(),
208                })
209                .await;
210
211            self.inject_continuation_prompt(&message_content, stop_reason.as_ref());
212            self.maybe_preflight_compact().await;
213            self.start_llm_stream();
214        } else {
215            tracing::debug!("LLM completed turn with stop reason: {:?}", stop_reason);
216            self.auto_continue.on_completion();
217            if let Err(e) = self.message_tx.send(AgentMessage::Done).await {
218                tracing::warn!("Failed to send Done message: {:?}", e);
219            }
220        }
221    }
222
223    async fn on_user_cancel(&mut self, state: &mut IterationState) {
224        state.cancelled = true;
225        self.streams.remove("llm");
226        let _ = self
227            .message_tx
228            .send(AgentMessage::Cancelled {
229                message: "Processing cancelled".to_string(),
230            })
231            .await;
232        let _ = self.message_tx.send(AgentMessage::Done).await;
233    }
234
235    async fn on_user_clear_context(&mut self, state: &mut IterationState) {
236        self.clear_active_streams();
237        self.active_requests.clear();
238        self.context.clear_conversation();
239        self.token_tracker.reset_current_usage();
240        self.auto_continue.on_completion();
241        *state = IterationState::new();
242
243        let _ = self.message_tx.send(AgentMessage::ContextCleared).await;
244    }
245
246    fn on_user_text(&mut self, content: Vec<llm::ContentBlock>) {
247        self.context.add_message(ChatMessage::User {
248            content,
249            timestamp: IsoString::now(),
250        });
251
252        self.start_llm_stream();
253    }
254
255    async fn on_switch_model(&mut self, new_provider: Box<dyn StreamingModelProvider>) {
256        let previous = self.llm.display_name();
257        let new_context_limit = new_provider.context_window();
258        self.llm = Arc::from(new_provider);
259        self.token_tracker.reset_current_usage();
260        self.token_tracker.set_context_limit(new_context_limit);
261        let new = self.llm.display_name();
262        let _ = self
263            .message_tx
264            .send(AgentMessage::ModelSwitched { previous, new })
265            .await;
266
267        let _ = self
268            .message_tx
269            .send(AgentMessage::ContextUsageUpdate {
270                usage_ratio: self.token_tracker.usage_ratio(),
271                tokens_used: self.token_tracker.last_input_tokens(),
272                context_limit: self.token_tracker.context_limit(),
273            })
274            .await;
275    }
276
277    fn start_llm_stream(&mut self) {
278        self.streams.remove("llm");
279        let llm_stream = self
280            .llm
281            .stream_response(&self.context)
282            .map(StreamEvent::Llm);
283        self.streams.insert("llm".to_string(), Box::pin(llm_stream));
284    }
285
286    fn clear_active_streams(&mut self) {
287        self.streams.remove("llm");
288        for stream_key in self.active_requests.keys().cloned().collect::<Vec<_>>() {
289            self.streams.remove(&stream_key);
290        }
291    }
292
293    /// Inject a continuation prompt when the LLM stops due to a resumable reason.
294    fn inject_continuation_prompt(
295        &mut self,
296        previous_response: &str,
297        stop_reason: Option<&StopReason>,
298    ) {
299        if !previous_response.is_empty() {
300            self.context.add_message(ChatMessage::Assistant {
301                content: previous_response.to_string(),
302                reasoning: AssistantReasoning::default(),
303                timestamp: IsoString::now(),
304                tool_calls: Vec::new(),
305            });
306        }
307
308        let reason =
309            stop_reason.map_or_else(|| "Unknown".to_string(), |reason| format!("{reason:?}"));
310
311        self.context.add_message(ChatMessage::User {
312            content: vec![llm::ContentBlock::text(format!(
313                "<system-notification>The LLM API stopped with reason '{reason}'. Continue from where you left off and finish your task.</system-notification>"
314            ))],
315            timestamp: IsoString::now(),
316        });
317    }
318
319    async fn on_llm_event(
320        &mut self,
321        result: Result<LlmResponse, LlmError>,
322        state: &mut IterationState,
323    ) {
324        use LlmResponse::{
325            Done, EncryptedReasoning, Error, Reasoning, Start, Text, ToolRequestArg,
326            ToolRequestComplete, ToolRequestStart, Usage,
327        };
328
329        let response = match result {
330            Ok(response) => response,
331            Err(e) => {
332                let _ = self
333                    .message_tx
334                    .send(AgentMessage::Error {
335                        message: e.to_string(),
336                    })
337                    .await;
338                return;
339            }
340        };
341
342        match response {
343            Start { message_id } => {
344                state.on_llm_start(message_id);
345            }
346
347            Text { chunk } => {
348                self.handle_llm_text(chunk, state).await;
349            }
350
351            Reasoning { chunk } => {
352                state.reasoning_summary_text.push_str(&chunk);
353                if let Some(id) = &state.current_message_id {
354                    let _ = self
355                        .message_tx
356                        .send(AgentMessage::Thought {
357                            message_id: id.clone(),
358                            chunk,
359                            is_complete: false,
360                            model_name: self.llm.display_name(),
361                        })
362                        .await;
363                }
364            }
365
366            EncryptedReasoning { id, content } => {
367                if let Some(model) = self.llm.model() {
368                    state.encrypted_reasoning =
369                        Some(EncryptedReasoningContent { id, model, content });
370                }
371            }
372
373            ToolRequestStart { id, name } => {
374                self.handle_tool_request_start(id, name).await;
375            }
376
377            ToolRequestArg { id, chunk } => {
378                self.handle_tool_request_arg(id, chunk).await;
379            }
380
381            ToolRequestComplete { tool_call } => {
382                self.handle_tool_completion(tool_call, state).await;
383            }
384
385            Done { stop_reason } => {
386                state.llm_done = true;
387                state.stop_reason = stop_reason;
388            }
389
390            Error { message } => {
391                let _ = self.message_tx.send(AgentMessage::Error { message }).await;
392            }
393
394            Usage {
395                input_tokens,
396                output_tokens,
397                cached_input_tokens,
398            } => {
399                self.handle_llm_usage(input_tokens, output_tokens, cached_input_tokens)
400                    .await;
401            }
402        }
403    }
404
405    async fn handle_llm_text(&mut self, chunk: String, state: &mut IterationState) {
406        state.message_content.push_str(&chunk);
407
408        if let Some(id) = &state.current_message_id {
409            let _ = self
410                .message_tx
411                .send(AgentMessage::Text {
412                    message_id: id.clone(),
413                    chunk,
414                    is_complete: false,
415                    model_name: self.llm.display_name(),
416                })
417                .await;
418        }
419    }
420
421    async fn handle_tool_request_start(&mut self, id: String, name: String) {
422        let request = ToolCallRequest {
423            id: id.clone(),
424            name,
425            arguments: String::new(),
426        };
427        self.active_requests.insert(id, request.clone());
428
429        let _ = self
430            .message_tx
431            .send(AgentMessage::ToolCall {
432                request,
433                model_name: self.llm.display_name(),
434            })
435            .await;
436    }
437
438    async fn handle_tool_request_arg(&mut self, id: String, chunk: String) {
439        let Some(request) = self.active_requests.get_mut(&id) else {
440            return;
441        };
442        request.arguments.push_str(&chunk);
443
444        let _ = self
445            .message_tx
446            .send(AgentMessage::ToolCallUpdate {
447                tool_call_id: id,
448                chunk,
449                model_name: self.llm.display_name(),
450            })
451            .await;
452    }
453
454    async fn handle_tool_completion(
455        &mut self,
456        tool_call: ToolCallRequest,
457        state: &mut IterationState,
458    ) {
459        state.pending_tool_ids.insert(tool_call.id.clone());
460        debug_assert!(
461            self.active_requests.contains_key(&tool_call.id),
462            "tool call {} should already be in active_requests from handle_tool_request_start",
463            tool_call.id
464        );
465
466        let (tx, rx) = mpsc::channel(100);
467        let stream = ReceiverStream::new(rx).map(StreamEvent::ToolExecution);
468        let stream_key = tool_call.id.clone();
469        self.streams.insert(stream_key, Box::pin(stream));
470
471        if let Some(ref mcp_command_tx) = self.mcp_command_tx {
472            let mcp_future = mcp_command_tx.send(McpCommand::ExecuteTool {
473                request: tool_call,
474                timeout: self.tool_timeout,
475                tx,
476            });
477            if let Err(e) = mcp_future.await {
478                tracing::warn!("Failed to send tool request to MCP task: {:?}", e);
479            }
480        }
481    }
482
483    async fn handle_llm_usage(
484        &mut self,
485        input_tokens: u32,
486        output_tokens: u32,
487        cached_input_tokens: Option<u32>,
488    ) {
489        self.token_tracker
490            .record_usage(input_tokens, output_tokens, cached_input_tokens);
491        match (
492            self.token_tracker.usage_ratio(),
493            self.token_tracker.tokens_remaining(),
494        ) {
495            (Some(usage_ratio), Some(tokens_remaining)) => {
496                tracing::debug!(
497                    "Token usage - input: {}, output: {}, ratio: {:.2}%, remaining: {}",
498                    input_tokens,
499                    output_tokens,
500                    usage_ratio * 100.0,
501                    tokens_remaining
502                );
503            }
504            _ => {
505                tracing::debug!(
506                    "Token usage - input: {}, output: {}, ratio: unknown (context limit unavailable)",
507                    input_tokens,
508                    output_tokens
509                );
510            }
511        }
512
513        let _ = self
514            .message_tx
515            .send(AgentMessage::ContextUsageUpdate {
516                usage_ratio: self.token_tracker.usage_ratio(),
517                tokens_used: self.token_tracker.last_input_tokens(),
518                context_limit: self.token_tracker.context_limit(),
519            })
520            .await;
521
522        self.maybe_compact_context().await;
523    }
524
525    /// Pre-flight check: estimate context size and compact proactively if it would
526    /// overflow before the LLM even sees it. This catches the case where large tool
527    /// results push context past the limit before usage-based compaction can fire.
528    async fn maybe_preflight_compact(&mut self) {
529        let Some(context_limit) = self.token_tracker.context_limit() else {
530            return;
531        };
532        let Some(config) = self.compaction_config.as_ref() else {
533            return;
534        };
535        let estimated = self.context.estimated_token_count();
536        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
537        let threshold = (f64::from(context_limit) * config.threshold).ceil() as u32;
538        if estimated >= threshold {
539            tracing::info!(
540                "Pre-flight compaction triggered: estimated {estimated} tokens >= {:.1}% of {context_limit} limit",
541                config.threshold * 100.0
542            );
543            if let CompactionOutcome::Failed(e) = self.compact_context().await {
544                tracing::warn!("Pre-flight compaction failed: {e}");
545            }
546        }
547    }
548
549    /// Check if compaction is needed and perform it if so.
550    async fn maybe_compact_context(&mut self) {
551        if !self
552            .compaction_config
553            .as_ref()
554            .is_some_and(|config| self.token_tracker.should_compact(config.threshold))
555        {
556            return;
557        }
558
559        if let CompactionOutcome::Failed(error_message) = self.compact_context().await {
560            tracing::warn!("Context compaction failed: {}", error_message);
561        }
562    }
563
564    async fn compact_context(&mut self) -> CompactionOutcome {
565        let Some(ref _config) = self.compaction_config else {
566            tracing::warn!("Context compaction requested but compaction is disabled");
567            return CompactionOutcome::SkippedDisabled;
568        };
569
570        match self.token_tracker.usage_ratio() {
571            Some(usage_ratio) => {
572                tracing::info!(
573                    "Starting context compaction - {} messages, {:.1}% of context limit",
574                    self.context.message_count(),
575                    usage_ratio * 100.0
576                );
577            }
578            None => {
579                tracing::info!(
580                    "Starting context compaction - {} messages (context limit unknown)",
581                    self.context.message_count(),
582                );
583            }
584        }
585
586        let _ = self
587            .message_tx
588            .send(AgentMessage::ContextCompactionStarted {
589                message_count: self.context.message_count(),
590            })
591            .await;
592
593        let compactor = Compactor::new(self.llm.clone());
594
595        match compactor.compact(&self.context).await {
596            Ok(result) => {
597                tracing::info!(
598                    "Context compacted: {} messages removed",
599                    result.messages_removed
600                );
601
602                self.context = result.context;
603                self.token_tracker.reset_current_usage();
604
605                let _ = self
606                    .message_tx
607                    .send(AgentMessage::ContextCompactionResult {
608                        summary: result.summary,
609                        messages_removed: result.messages_removed,
610                    })
611                    .await;
612                CompactionOutcome::Compacted
613            }
614            Err(e) => CompactionOutcome::Failed(e.to_string()),
615        }
616    }
617
618    async fn on_tool_execution_event(
619        &mut self,
620        event: ToolExecutionEvent,
621        state: &mut IterationState,
622    ) {
623        match event {
624            ToolExecutionEvent::Started { tool_id, tool_name } => {
625                tracing::debug!("Tool execution started: {} ({})", tool_name, tool_id);
626            }
627
628            ToolExecutionEvent::Progress { tool_id, progress } => {
629                tracing::debug!(
630                    "Tool progress for {}: {}/{}",
631                    tool_id,
632                    progress.progress,
633                    progress.total.unwrap_or(0.0)
634                );
635
636                if let Some(request) = self.active_requests.get(&tool_id) {
637                    let _ = self
638                        .message_tx
639                        .send(AgentMessage::ToolProgress {
640                            request: request.clone(),
641                            progress: progress.progress,
642                            total: progress.total,
643                            message: progress.message.clone(),
644                        })
645                        .await;
646                }
647            }
648
649            ToolExecutionEvent::Complete {
650                tool_id: _,
651                result,
652                result_meta,
653            } => match result {
654                Ok(tool_result) => {
655                    tracing::debug!(
656                        "Tool result received: {} -> {}",
657                        tool_result.name,
658                        tool_result.result.len()
659                    );
660
661                    if state.pending_tool_ids.remove(&tool_result.id) {
662                        self.active_requests.remove(&tool_result.id);
663                        state.completed_tool_calls.push(Ok(tool_result.clone()));
664
665                        let msg = AgentMessage::ToolResult {
666                            result: tool_result,
667                            result_meta,
668                            model_name: self.llm.display_name(),
669                        };
670
671                        if let Err(e) = self.message_tx.send(msg).await {
672                            tracing::warn!("Failed to send ToolCall completion message: {:?}", e);
673                        }
674                    } else {
675                        tracing::debug!("Ignoring stale tool result for id: {}", tool_result.id);
676                    }
677                }
678
679                Err(tool_error) => {
680                    if state.pending_tool_ids.remove(&tool_error.id) {
681                        self.active_requests.remove(&tool_error.id);
682                        state.completed_tool_calls.push(Err(tool_error.clone()));
683
684                        let _ = self
685                            .message_tx
686                            .send(AgentMessage::ToolError {
687                                error: tool_error,
688                                model_name: self.llm.display_name(),
689                            })
690                            .await;
691                    }
692                }
693            },
694        }
695    }
696
697    fn update_context(
698        &mut self,
699        message_content: &str,
700        reasoning: AssistantReasoning,
701        completed_tools: Vec<Result<ToolCallResult, ToolCallError>>,
702    ) {
703        self.context
704            .push_assistant_turn(message_content, reasoning, completed_tools);
705    }
706}
707
708#[derive(Debug, Clone, PartialEq, Eq)]
709enum CompactionOutcome {
710    Compacted,
711    SkippedDisabled,
712    Failed(String),
713}
714
715pub(crate) struct AutoContinue {
716    max: u32,
717    count: u32,
718}
719
720impl AutoContinue {
721    pub(crate) fn new(max: u32) -> Self {
722        Self { max, count: 0 }
723    }
724
725    fn on_tool_calls(&mut self) {
726        self.count = 0;
727    }
728
729    fn on_completion(&mut self) {
730        self.count = 0;
731    }
732
733    fn should_continue(&self, stop_reason: Option<&StopReason>) -> bool {
734        matches!(stop_reason, Some(StopReason::Length)) && self.count < self.max
735    }
736
737    fn advance(&mut self) {
738        self.count += 1;
739    }
740
741    fn count(&self) -> u32 {
742        self.count
743    }
744
745    fn max(&self) -> u32 {
746        self.max
747    }
748}
749
750#[derive(Debug)]
751struct IterationState {
752    current_message_id: Option<String>,
753    message_content: String,
754    reasoning_summary_text: String,
755    encrypted_reasoning: Option<EncryptedReasoningContent>,
756    pending_tool_ids: HashSet<String>,
757    completed_tool_calls: Vec<Result<ToolCallResult, ToolCallError>>,
758    llm_done: bool,
759    stop_reason: Option<StopReason>,
760    cancelled: bool,
761}
762
763impl IterationState {
764    fn new() -> Self {
765        Self {
766            current_message_id: None,
767            message_content: String::new(),
768            reasoning_summary_text: String::new(),
769            encrypted_reasoning: None,
770            pending_tool_ids: HashSet::new(),
771            completed_tool_calls: Vec::new(),
772            llm_done: false,
773            stop_reason: None,
774            cancelled: false,
775        }
776    }
777
778    fn on_llm_start(&mut self, message_id: String) {
779        self.current_message_id = Some(message_id);
780        self.message_content.clear();
781        self.reasoning_summary_text.clear();
782        self.encrypted_reasoning = None;
783        self.stop_reason = None;
784    }
785
786    fn is_complete(&self) -> bool {
787        self.llm_done && self.pending_tool_ids.is_empty()
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794    use llm::testing::FakeLlmProvider;
795    use tokio::sync::mpsc;
796
797    #[tokio::test]
798    async fn test_preflight_compaction_uses_configured_threshold() {
799        let llm = Arc::new(
800            FakeLlmProvider::with_single_response(vec![
801                LlmResponse::start("summary"),
802                LlmResponse::text("summary"),
803                LlmResponse::done(),
804            ])
805            .with_context_window(Some(100)),
806        );
807        let context = Context::new(
808            vec![ChatMessage::User {
809                content: vec![llm::ContentBlock::text("x".repeat(344))],
810                timestamp: IsoString::now(),
811            }],
812            vec![],
813        );
814        let (user_tx, user_rx) = mpsc::channel(1);
815        let (message_tx, _message_rx) = mpsc::channel(8);
816        drop(user_tx);
817
818        let mut agent = Agent::new(
819            AgentConfig {
820                llm,
821                context,
822                mcp_command_tx: None,
823                tool_timeout: Duration::from_secs(1),
824                compaction_config: Some(CompactionConfig::with_threshold(0.85)),
825                auto_continue: AutoContinue::new(0),
826            },
827            user_rx,
828            message_tx,
829        );
830
831        agent.maybe_preflight_compact().await;
832
833        assert!(
834            matches!(
835                agent.context.messages().as_slice(),
836                [ChatMessage::Summary { content, .. }] if content == "summary"
837            ),
838            "expected context to be compacted, got {:?}",
839            agent.context.messages()
840        );
841    }
842}