Skip to main content

steer_core/app/domain/
reduce.rs

1use crate::agents::default_agent_spec_id;
2use crate::api::provider::TokenUsage;
3use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
4
5use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
6
7use crate::app::domain::effect::{Effect, McpServerConfig};
8use crate::app::domain::event::{
9    CancellationInfo, ContextWindowUsage, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
10};
11use crate::app::domain::state::{
12    AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
13};
14use crate::primary_agents::{
15    default_primary_agent_id, primary_agent_spec, resolve_effective_config,
16};
17use crate::session::state::{BackendConfig, ToolDecision};
18
19use crate::app::domain::event::CompactTrigger;
20use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
21use serde_json::Value;
22use steer_tools::ToolError;
23use steer_tools::result::ToolResult;
24use steer_tools::tools::BASH_TOOL_NAME;
25use thiserror::Error;
26
27const MIN_MESSAGES_FOR_COMPACT: usize = 3;
28const COMPACTION_CONTINUE_PROMPT: &str =
29    "Continue from the compaction summary and resume the conversation.";
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum InvalidActionKind {
33    OperationInFlight,
34    MissingSessionConfig,
35    UnknownPrimaryAgent,
36    QueueEmpty,
37}
38
39#[derive(Debug, Error)]
40pub enum ReduceError {
41    #[error("{message}")]
42    InvalidAction {
43        message: String,
44        kind: InvalidActionKind,
45    },
46    #[error("Invariant violated: {message}")]
47    Invariant { message: String },
48}
49
50fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
51    ReduceError::InvalidAction {
52        message: message.into(),
53        kind,
54    }
55}
56
57fn derive_context_window_usage(
58    total_tokens: u32,
59    context_window_tokens: Option<u32>,
60    configured_max_output_tokens: Option<u32>,
61) -> Option<ContextWindowUsage> {
62    context_window_tokens.map(|max_context_tokens| {
63        let reserve = configured_max_output_tokens.unwrap_or(0);
64        let effective_context_tokens = max_context_tokens.saturating_sub(reserve);
65        let remaining_tokens = effective_context_tokens.saturating_sub(total_tokens);
66        let (utilization_ratio, estimated) = if effective_context_tokens > 0 {
67            let ratio =
68                (f64::from(total_tokens) / f64::from(effective_context_tokens)).clamp(0.0, 1.0);
69            (Some(ratio), false)
70        } else {
71            (None, true)
72        };
73
74        ContextWindowUsage {
75            max_context_tokens: Some(max_context_tokens),
76            remaining_tokens: Some(remaining_tokens),
77            utilization_ratio,
78            estimated,
79        }
80    })
81}
82
83pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
84    match action {
85        Action::UserInput {
86            session_id,
87            content,
88            op_id,
89            message_id,
90            model,
91            timestamp,
92        } => Ok(handle_user_input(
93            state, session_id, content, op_id, message_id, model, timestamp,
94        )),
95
96        Action::UserEditedMessage {
97            session_id,
98            message_id,
99            new_content,
100            op_id,
101            new_message_id,
102            model,
103            timestamp,
104        } => handle_user_edited_message(
105            state,
106            session_id,
107            UserEditedMessageParams {
108                original_message_id: message_id,
109                new_content,
110                op_id,
111                new_message_id,
112                model,
113                timestamp,
114            },
115        ),
116
117        Action::ToolApprovalRequested {
118            session_id,
119            request_id,
120            tool_call,
121        } => Ok(handle_tool_approval_requested(
122            state, session_id, request_id, tool_call,
123        )),
124
125        Action::ToolApprovalDecided {
126            session_id,
127            request_id,
128            decision,
129            remember,
130        } => Ok(handle_tool_approval_decided(
131            state, session_id, request_id, decision, remember,
132        )),
133
134        Action::ToolExecutionStarted {
135            session_id,
136            tool_call_id,
137            tool_name,
138            tool_parameters,
139        } => Ok(handle_tool_execution_started(
140            state,
141            session_id,
142            tool_call_id,
143            tool_name,
144            tool_parameters,
145        )),
146
147        Action::ToolResult {
148            session_id,
149            tool_call_id,
150            tool_name,
151            result,
152        } => Ok(handle_tool_result(
153            state,
154            session_id,
155            tool_call_id,
156            tool_name,
157            result,
158        )),
159
160        Action::ModelResponseComplete {
161            session_id,
162            op_id,
163            message_id,
164            content,
165            usage,
166            context_window_tokens,
167            configured_max_output_tokens,
168            timestamp,
169        } => Ok(handle_model_response_complete(
170            state,
171            session_id,
172            ModelResponseCompleteParams {
173                op_id,
174                message_id,
175                content,
176                usage,
177                context_window_tokens,
178                configured_max_output_tokens,
179                timestamp,
180            },
181        )),
182
183        Action::ModelResponseError {
184            session_id,
185            op_id,
186            error,
187        } => Ok(handle_model_response_error(
188            state, session_id, op_id, &error,
189        )),
190
191        Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
192
193        Action::DirectBashCommand {
194            session_id,
195            op_id,
196            message_id,
197            command,
198            timestamp,
199        } => Ok(handle_direct_bash(
200            state, session_id, op_id, message_id, command, timestamp,
201        )),
202
203        Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
204
205        Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
206
207        Action::RequestCompaction {
208            session_id,
209            op_id,
210            model,
211        } => handle_request_compaction(state, session_id, op_id, model),
212
213        Action::Hydrate {
214            session_id,
215            events,
216            starting_sequence,
217        } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
218
219        Action::WorkspaceFilesListed { files, .. } => {
220            state.workspace_files = files;
221            Ok(vec![])
222        }
223
224        Action::ToolSchemasAvailable { tools, .. } => {
225            state.tools = tools;
226            Ok(vec![])
227        }
228
229        Action::ToolSchemasUpdated { schemas, .. } => {
230            state.tools = schemas;
231            Ok(vec![])
232        }
233
234        Action::SwitchPrimaryAgent {
235            session_id,
236            agent_id,
237        } => handle_switch_primary_agent(state, session_id, agent_id),
238
239        Action::McpServerStateChanged {
240            session_id,
241            server_name,
242            state: new_state,
243        } => {
244            // When connected, merge MCP tools into state.tools
245            if let McpServerState::Connected { tools } = &new_state {
246                let tools = state.session_config.as_ref().map_or_else(
247                    || tools.clone(),
248                    |config| config.filter_tools_by_visibility(tools.clone()),
249                );
250
251                // Add MCP tools that aren't already present (by name)
252                for tool in tools {
253                    if !state.tools.iter().any(|t| t.name == tool.name) {
254                        state.tools.push(tool.clone());
255                    }
256                }
257            }
258
259            // When disconnected or failed, remove tools from that server
260            if matches!(
261                &new_state,
262                McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
263            ) {
264                let prefix = format!("mcp__{server_name}__");
265                state.tools.retain(|t| !t.name.starts_with(&prefix));
266            }
267
268            state
269                .mcp_servers
270                .insert(server_name.clone(), new_state.clone());
271            Ok(vec![Effect::EmitEvent {
272                session_id,
273                event: SessionEvent::McpServerStateChanged {
274                    server_name,
275                    state: new_state,
276                },
277            }])
278        }
279
280        Action::CompactionComplete {
281            session_id,
282            op_id,
283            compaction_id,
284            summary_message_id,
285            summary,
286            compacted_head_message_id,
287            previous_active_message_id,
288            model,
289            timestamp,
290        } => Ok(handle_compaction_complete(
291            state,
292            session_id,
293            CompactionCompleteParams {
294                op_id,
295                compaction_id,
296                summary_message_id,
297                summary,
298                compacted_head_message_id,
299                previous_active_message_id,
300                model_name: model,
301                timestamp,
302            },
303        )),
304
305        Action::CompactionFailed {
306            session_id,
307            op_id,
308            error,
309        } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
310
311        Action::Shutdown => Ok(vec![]),
312    }
313}
314
315fn handle_user_input(
316    state: &mut AppState,
317    session_id: crate::app::domain::types::SessionId,
318    content: Vec<UserContent>,
319    op_id: crate::app::domain::types::OpId,
320    message_id: crate::app::domain::types::MessageId,
321    model: crate::config::model::ModelId,
322    timestamp: u64,
323) -> Vec<Effect> {
324    let mut effects = Vec::new();
325
326    if state.has_active_operation() {
327        state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
328            content,
329            op_id,
330            message_id,
331            model,
332            queued_at: timestamp,
333        });
334        effects.push(Effect::EmitEvent {
335            session_id,
336            event: SessionEvent::QueueUpdated {
337                queue: snapshot_queue(state),
338            },
339        });
340        return effects;
341    }
342
343    let parent_id = state.message_graph.active_message_id.clone();
344
345    let message = Message {
346        data: MessageData::User { content },
347        timestamp,
348        id: message_id.0.clone(),
349        parent_message_id: parent_id,
350    };
351
352    state.message_graph.add_message(message.clone());
353    state.message_graph.active_message_id = Some(message_id.0.clone());
354
355    state.start_operation(op_id, OperationKind::AgentLoop);
356    state.operation_models.insert(op_id, model.clone());
357
358    effects.push(Effect::EmitEvent {
359        session_id,
360        event: SessionEvent::UserMessageAdded {
361            message: message.clone(),
362        },
363    });
364
365    effects.push(Effect::EmitEvent {
366        session_id,
367        event: SessionEvent::OperationStarted {
368            op_id,
369            kind: OperationKind::AgentLoop,
370        },
371    });
372
373    effects.push(Effect::CallModel {
374        session_id,
375        op_id,
376        model,
377        messages: state
378            .message_graph
379            .get_thread_messages()
380            .into_iter()
381            .cloned()
382            .collect(),
383        system_context: state.cached_system_context.clone(),
384        tools: state.tools.clone(),
385    });
386
387    effects
388}
389
390struct UserEditedMessageParams {
391    original_message_id: crate::app::domain::types::MessageId,
392    new_content: Vec<UserContent>,
393    op_id: crate::app::domain::types::OpId,
394    new_message_id: crate::app::domain::types::MessageId,
395    model: crate::config::model::ModelId,
396    timestamp: u64,
397}
398
399fn handle_user_edited_message(
400    state: &mut AppState,
401    session_id: crate::app::domain::types::SessionId,
402    params: UserEditedMessageParams,
403) -> Result<Vec<Effect>, ReduceError> {
404    let UserEditedMessageParams {
405        original_message_id,
406        new_content,
407        op_id,
408        new_message_id,
409        model,
410        timestamp,
411    } = params;
412    let mut effects = Vec::new();
413
414    if state.has_active_operation() {
415        return Err(invalid_action(
416            InvalidActionKind::OperationInFlight,
417            "Cannot edit message while an operation is active.",
418        ));
419    }
420
421    let parent_id = state
422        .message_graph
423        .messages
424        .iter()
425        .find(|m| m.id() == original_message_id.0)
426        .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
427
428    let message = Message {
429        data: MessageData::User {
430            content: new_content,
431        },
432        timestamp,
433        id: new_message_id.0.clone(),
434        parent_message_id: parent_id,
435    };
436
437    state.message_graph.add_message(message.clone());
438    state.message_graph.active_message_id = Some(new_message_id.0.clone());
439
440    state.start_operation(op_id, OperationKind::AgentLoop);
441    state.operation_models.insert(op_id, model.clone());
442
443    effects.push(Effect::EmitEvent {
444        session_id,
445        event: SessionEvent::UserMessageAdded {
446            message: message.clone(),
447        },
448    });
449
450    effects.push(Effect::EmitEvent {
451        session_id,
452        event: SessionEvent::OperationStarted {
453            op_id,
454            kind: OperationKind::AgentLoop,
455        },
456    });
457
458    effects.push(Effect::CallModel {
459        session_id,
460        op_id,
461        model,
462        messages: state
463            .message_graph
464            .get_thread_messages()
465            .into_iter()
466            .cloned()
467            .collect(),
468        system_context: state.cached_system_context.clone(),
469        tools: state.tools.clone(),
470    });
471
472    Ok(effects)
473}
474
475fn handle_tool_approval_requested(
476    state: &mut AppState,
477    session_id: crate::app::domain::types::SessionId,
478    request_id: crate::app::domain::types::RequestId,
479    tool_call: steer_tools::ToolCall,
480) -> Vec<Effect> {
481    let mut effects = Vec::new();
482
483    if let Err(error) = validate_tool_call(state, &tool_call) {
484        let error_message = error.to_string();
485        return fail_tool_call_without_execution(
486            state,
487            session_id,
488            tool_call,
489            error,
490            error_message,
491            "invalid",
492            true,
493        );
494    }
495
496    let decision = get_tool_decision(state, &tool_call);
497
498    match decision {
499        ToolDecision::Allow => {
500            let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
501                return vec![Effect::EmitEvent {
502                    session_id,
503                    event: SessionEvent::Error {
504                        message: "Tool approval requested without active operation".to_string(),
505                    },
506                }];
507            };
508            state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
509                &tool_call.id,
510            ));
511
512            effects.push(Effect::ExecuteTool {
513                session_id,
514                op_id,
515                tool_call,
516            });
517        }
518        ToolDecision::Deny => {
519            let error = ToolError::DeniedByPolicy(tool_call.name.clone());
520            let tool_name = tool_call.name.clone();
521            effects.extend(fail_tool_call_without_execution(
522                state,
523                session_id,
524                tool_call,
525                error,
526                format!("Tool '{tool_name}' denied by policy"),
527                "denied",
528                true,
529            ));
530        }
531        ToolDecision::Ask => {
532            if state.pending_approval.is_some() {
533                state.approval_queue.push_back(QueuedApproval { tool_call });
534                return effects;
535            }
536
537            state.pending_approval = Some(PendingApproval {
538                request_id,
539                tool_call: tool_call.clone(),
540            });
541
542            effects.push(Effect::EmitEvent {
543                session_id,
544                event: SessionEvent::ApprovalRequested {
545                    request_id,
546                    tool_call: tool_call.clone(),
547                },
548            });
549
550            effects.push(Effect::RequestUserApproval {
551                session_id,
552                request_id,
553                tool_call,
554            });
555        }
556    }
557
558    effects
559}
560
561fn validate_tool_call(
562    state: &AppState,
563    tool_call: &steer_tools::ToolCall,
564) -> Result<(), ToolError> {
565    if tool_call.name.trim().is_empty() {
566        return Err(ToolError::invalid_params(
567            "unknown",
568            "Malformed tool call: missing tool name",
569        ));
570    }
571
572    if tool_call.id.trim().is_empty() {
573        return Err(ToolError::invalid_params(
574            tool_call.name.clone(),
575            "Malformed tool call: missing tool call id",
576        ));
577    }
578
579    if state.tools.is_empty() {
580        return Ok(());
581    }
582
583    let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
584        return Err(ToolError::UnknownTool(tool_call.name.clone()));
585    };
586
587    validate_against_json_schema(
588        &tool_call.name,
589        schema.input_schema.as_value(),
590        &tool_call.parameters,
591    )
592}
593
594fn validate_against_json_schema(
595    tool_name: &str,
596    schema: &Value,
597    params: &Value,
598) -> Result<(), ToolError> {
599    let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
600        ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
601    })?;
602
603    if let Err(errors) = validator.validate(params) {
604        let message = errors
605            .into_iter()
606            .map(|error| error.to_string())
607            .next()
608            .unwrap_or_else(|| "Parameters do not match schema".to_string());
609        return Err(ToolError::invalid_params(tool_name.to_string(), message));
610    }
611
612    Ok(())
613}
614
615fn emit_tool_failure_message(
616    state: &mut AppState,
617    session_id: crate::app::domain::types::SessionId,
618    tool_call_id: &str,
619    tool_name: &str,
620    tool_error: ToolError,
621    event_error: String,
622    message_id_prefix: &str,
623) -> Vec<Effect> {
624    let mut effects = Vec::new();
625
626    let tool_result = ToolResult::Error(tool_error);
627    let parent_id = state.message_graph.active_message_id.clone();
628    let tool_message = Message {
629        data: MessageData::Tool {
630            tool_use_id: tool_call_id.to_string(),
631            result: tool_result,
632        },
633        timestamp: 0,
634        id: format!("{message_id_prefix}_{tool_call_id}"),
635        parent_message_id: parent_id,
636    };
637    state.message_graph.add_message(tool_message.clone());
638
639    if let Some(model) = state
640        .current_operation
641        .as_ref()
642        .and_then(|op| state.operation_models.get(&op.op_id).cloned())
643    {
644        effects.push(Effect::EmitEvent {
645            session_id,
646            event: SessionEvent::ToolCallFailed {
647                id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
648                name: tool_name.to_string(),
649                error: event_error,
650                model,
651            },
652        });
653    }
654
655    effects.push(Effect::EmitEvent {
656        session_id,
657        event: SessionEvent::ToolMessageAdded {
658            message: tool_message,
659        },
660    });
661
662    effects
663}
664
665fn fail_tool_call_without_execution(
666    state: &mut AppState,
667    session_id: crate::app::domain::types::SessionId,
668    tool_call: steer_tools::ToolCall,
669    tool_error: ToolError,
670    event_error: String,
671    message_id_prefix: &str,
672    call_model_if_ready: bool,
673) -> Vec<Effect> {
674    let mut effects = emit_tool_failure_message(
675        state,
676        session_id,
677        &tool_call.id,
678        &tool_call.name,
679        tool_error,
680        event_error,
681        message_id_prefix,
682    );
683
684    if !call_model_if_ready {
685        return effects;
686    }
687
688    let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
689        effects.push(Effect::EmitEvent {
690            session_id,
691            event: SessionEvent::Error {
692                message: "Tool failure recorded without active operation".to_string(),
693            },
694        });
695        return effects;
696    };
697    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
698        model
699    } else {
700        effects.push(Effect::EmitEvent {
701            session_id,
702            event: SessionEvent::Error {
703                message: format!("Missing model for operation {op_id}"),
704            },
705        });
706        return effects;
707    };
708
709    let all_tools_complete = state
710        .current_operation
711        .as_ref()
712        .is_none_or(|op| op.pending_tool_calls.is_empty());
713    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
714
715    if all_tools_complete && no_pending_approvals {
716        effects.push(Effect::CallModel {
717            session_id,
718            op_id,
719            model,
720            messages: state
721                .message_graph
722                .get_thread_messages()
723                .into_iter()
724                .cloned()
725                .collect(),
726            system_context: state.cached_system_context.clone(),
727            tools: state.tools.clone(),
728        });
729    }
730
731    effects
732}
733
734fn handle_tool_approval_decided(
735    state: &mut AppState,
736    session_id: crate::app::domain::types::SessionId,
737    request_id: crate::app::domain::types::RequestId,
738    decision: ApprovalDecision,
739    remember: Option<ApprovalMemory>,
740) -> Vec<Effect> {
741    let mut effects = Vec::new();
742
743    let pending = match state.pending_approval.take() {
744        Some(p) if p.request_id == request_id => p,
745        other => {
746            state.pending_approval = other;
747            return effects;
748        }
749    };
750
751    let resolved_memory = if decision == ApprovalDecision::Approved {
752        match remember {
753            Some(ApprovalMemory::PendingTool) => {
754                Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
755            }
756            Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
757            Some(ApprovalMemory::BashPattern(pattern)) => {
758                Some(ApprovalMemory::BashPattern(pattern))
759            }
760            None => None,
761        }
762    } else {
763        None
764    };
765
766    effects.push(Effect::EmitEvent {
767        session_id,
768        event: SessionEvent::ApprovalDecided {
769            request_id,
770            decision,
771            remember: resolved_memory.clone(),
772        },
773    });
774
775    if decision == ApprovalDecision::Approved {
776        if let Some(ref memory) = resolved_memory {
777            match memory {
778                ApprovalMemory::Tool(name) => {
779                    state.approved_tools.insert(name.clone());
780                }
781                ApprovalMemory::BashPattern(pattern) => {
782                    state.approved_bash_patterns.insert(pattern.clone());
783                }
784                ApprovalMemory::PendingTool => {}
785            }
786        }
787
788        let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
789            effects.push(Effect::EmitEvent {
790                session_id,
791                event: SessionEvent::Error {
792                    message: "Tool approval decided without active operation".to_string(),
793                },
794            });
795            return effects;
796        };
797        state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
798            &pending.tool_call.id,
799        ));
800
801        effects.push(Effect::ExecuteTool {
802            session_id,
803            op_id,
804            tool_call: pending.tool_call,
805        });
806    } else {
807        let tool_name = pending.tool_call.name.clone();
808        let error = ToolError::DeniedByUser(tool_name.clone());
809        effects.extend(fail_tool_call_without_execution(
810            state,
811            session_id,
812            pending.tool_call,
813            error,
814            format!("Tool '{tool_name}' denied by user"),
815            "denied",
816            false,
817        ));
818    }
819
820    effects.extend(process_next_queued_approval(state, session_id));
821
822    effects
823}
824
825fn process_next_queued_approval(
826    state: &mut AppState,
827    session_id: crate::app::domain::types::SessionId,
828) -> Vec<Effect> {
829    let mut effects = Vec::new();
830
831    while let Some(queued) = state.approval_queue.pop_front() {
832        let decision = get_tool_decision(state, &queued.tool_call);
833
834        match decision {
835            ToolDecision::Allow => {
836                let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
837                    effects.push(Effect::EmitEvent {
838                        session_id,
839                        event: SessionEvent::Error {
840                            message: "Queued tool approval processed without active operation"
841                                .to_string(),
842                        },
843                    });
844                    state.approval_queue.push_front(queued);
845                    break;
846                };
847                state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
848                    &queued.tool_call.id,
849                ));
850
851                effects.push(Effect::ExecuteTool {
852                    session_id,
853                    op_id,
854                    tool_call: queued.tool_call,
855                });
856            }
857            ToolDecision::Deny => {
858                let tool_name = queued.tool_call.name.clone();
859                let error = ToolError::DeniedByPolicy(tool_name.clone());
860                effects.extend(fail_tool_call_without_execution(
861                    state,
862                    session_id,
863                    queued.tool_call,
864                    error,
865                    format!("Tool '{tool_name}' denied by policy"),
866                    "denied",
867                    false,
868                ));
869            }
870            ToolDecision::Ask => {
871                let request_id = crate::app::domain::types::RequestId::new();
872                state.pending_approval = Some(PendingApproval {
873                    request_id,
874                    tool_call: queued.tool_call.clone(),
875                });
876
877                effects.push(Effect::EmitEvent {
878                    session_id,
879                    event: SessionEvent::ApprovalRequested {
880                        request_id,
881                        tool_call: queued.tool_call.clone(),
882                    },
883                });
884
885                effects.push(Effect::RequestUserApproval {
886                    session_id,
887                    request_id,
888                    tool_call: queued.tool_call,
889                });
890
891                break;
892            }
893        }
894    }
895
896    let all_tools_complete = state
897        .current_operation
898        .as_ref()
899        .is_none_or(|op| op.pending_tool_calls.is_empty());
900    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
901
902    if all_tools_complete
903        && no_pending_approvals
904        && let Some(op) = &state.current_operation
905    {
906        let op_id = op.op_id;
907        if let Some(model) = state.operation_models.get(&op_id).cloned() {
908            effects.push(Effect::CallModel {
909                session_id,
910                op_id,
911                model,
912                messages: state
913                    .message_graph
914                    .get_thread_messages()
915                    .into_iter()
916                    .cloned()
917                    .collect(),
918                system_context: state.cached_system_context.clone(),
919                tools: state.tools.clone(),
920            });
921        }
922    }
923
924    effects
925}
926
927fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
928    if state.approved_tools.contains(&tool_call.name) {
929        return ToolDecision::Allow;
930    }
931
932    if tool_call.name == DISPATCH_AGENT_TOOL_NAME
933        && let Ok(params) =
934            serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
935        && let Some(config) = state.session_config.as_ref()
936    {
937        let policy = &config.tool_config.approval_policy;
938        match params.target {
939            DispatchAgentTarget::Resume { .. } => {
940                return ToolDecision::Allow;
941            }
942            DispatchAgentTarget::New { agent, .. } => {
943                let agent_id = agent
944                    .as_deref()
945                    .filter(|value| !value.trim().is_empty())
946                    .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
947                if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
948                    return ToolDecision::Allow;
949                }
950            }
951        }
952    }
953
954    if tool_call.name == BASH_TOOL_NAME
955        && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
956            tool_call.parameters.clone(),
957        )
958        && state.is_bash_pattern_approved(&params.command)
959    {
960        return ToolDecision::Allow;
961    }
962
963    state
964        .session_config
965        .as_ref()
966        .map_or(ToolDecision::Ask, |config| {
967            config
968                .tool_config
969                .approval_policy
970                .tool_decision(&tool_call.name)
971        })
972}
973
974fn handle_tool_execution_started(
975    state: &mut AppState,
976    session_id: crate::app::domain::types::SessionId,
977    tool_call_id: crate::app::domain::types::ToolCallId,
978    tool_name: String,
979    tool_parameters: serde_json::Value,
980) -> Vec<Effect> {
981    state.add_pending_tool_call(tool_call_id.clone());
982
983    let op_id = match state.current_operation.as_ref() {
984        Some(op) => op.op_id,
985        None => {
986            return vec![Effect::EmitEvent {
987                session_id,
988                event: SessionEvent::Error {
989                    message: "Tool call started without active operation".to_string(),
990                },
991            }];
992        }
993    };
994
995    let is_direct_bash = matches!(
996        state.current_operation.as_ref().map(|op| &op.kind),
997        Some(OperationKind::DirectBash { .. })
998    );
999
1000    if is_direct_bash {
1001        return vec![];
1002    }
1003
1004    let model = match state.operation_models.get(&op_id).cloned() {
1005        Some(model) => model,
1006        None => {
1007            return vec![Effect::EmitEvent {
1008                session_id,
1009                event: SessionEvent::Error {
1010                    message: format!("Missing model for tool call on operation {op_id}"),
1011                },
1012            }];
1013        }
1014    };
1015
1016    vec![Effect::EmitEvent {
1017        session_id,
1018        event: SessionEvent::ToolCallStarted {
1019            id: tool_call_id,
1020            name: tool_name,
1021            parameters: tool_parameters,
1022            model,
1023        },
1024    }]
1025}
1026
1027fn handle_tool_result(
1028    state: &mut AppState,
1029    session_id: crate::app::domain::types::SessionId,
1030    tool_call_id: crate::app::domain::types::ToolCallId,
1031    tool_name: String,
1032    result: Result<ToolResult, ToolError>,
1033) -> Vec<Effect> {
1034    let mut effects = Vec::new();
1035
1036    let op = match &state.current_operation {
1037        Some(op) => {
1038            if state.cancelled_ops.contains(&op.op_id) {
1039                tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
1040                return effects;
1041            }
1042            op.clone()
1043        }
1044        None => return effects,
1045    };
1046    let op_id = op.op_id;
1047
1048    state.remove_pending_tool_call(&tool_call_id);
1049
1050    let tool_result = match result {
1051        Ok(r) => r,
1052        Err(e) => ToolResult::Error(e),
1053    };
1054
1055    let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1056
1057    if is_direct_bash {
1058        let command = match &op.kind {
1059            OperationKind::DirectBash { command } => command.clone(),
1060            _ => tool_name,
1061        };
1062
1063        let (stdout, stderr, exit_code) = match &tool_result {
1064            ToolResult::Bash(result) => (
1065                result.stdout.clone(),
1066                result.stderr.clone(),
1067                result.exit_code,
1068            ),
1069            ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1070            other => (format!("{other:?}"), String::new(), 0),
1071        };
1072
1073        let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1074            state
1075                .message_graph
1076                .update_command_execution(
1077                    id.as_str(),
1078                    command.clone(),
1079                    stdout.clone(),
1080                    stderr.clone(),
1081                    exit_code,
1082                )
1083                .or_else(|| {
1084                    let parent_id = state.message_graph.active_message_id.clone();
1085                    let timestamp = Message::current_timestamp();
1086                    Some(Message {
1087                        data: MessageData::User {
1088                            content: vec![UserContent::CommandExecution {
1089                                command: command.clone(),
1090                                stdout: stdout.clone(),
1091                                stderr: stderr.clone(),
1092                                exit_code,
1093                            }],
1094                        },
1095                        timestamp,
1096                        id: id.to_string(),
1097                        parent_message_id: parent_id,
1098                    })
1099                })
1100        });
1101
1102        state.complete_operation(op_id);
1103
1104        if let Some(message) = updated {
1105            effects.push(Effect::EmitEvent {
1106                session_id,
1107                event: SessionEvent::MessageUpdated { message },
1108            });
1109        }
1110
1111        effects.push(Effect::EmitEvent {
1112            session_id,
1113            event: SessionEvent::OperationCompleted { op_id },
1114        });
1115
1116        effects.extend(maybe_start_queued_work(state, session_id));
1117
1118        return effects;
1119    }
1120
1121    let model = match state.operation_models.get(&op_id).cloned() {
1122        Some(model) => model,
1123        None => {
1124            return vec![Effect::EmitEvent {
1125                session_id,
1126                event: SessionEvent::Error {
1127                    message: format!("Missing model for tool result on operation {op_id}"),
1128                },
1129            }];
1130        }
1131    };
1132
1133    let event = match &tool_result {
1134        ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1135            id: tool_call_id.clone(),
1136            name: tool_name.clone(),
1137            error: e.to_string(),
1138            model: model.clone(),
1139        },
1140        _ => SessionEvent::ToolCallCompleted {
1141            id: tool_call_id.clone(),
1142            name: tool_name,
1143            result: tool_result.clone(),
1144            model: model.clone(),
1145        },
1146    };
1147
1148    effects.push(Effect::EmitEvent { session_id, event });
1149
1150    let parent_id = state.message_graph.active_message_id.clone();
1151    let tool_message = Message {
1152        data: MessageData::Tool {
1153            tool_use_id: tool_call_id.0.clone(),
1154            result: tool_result,
1155        },
1156        timestamp: 0,
1157        id: format!("tool_result_{}", tool_call_id.0),
1158        parent_message_id: parent_id,
1159    };
1160    state.message_graph.add_message(tool_message.clone());
1161
1162    effects.push(Effect::EmitEvent {
1163        session_id,
1164        event: SessionEvent::ToolMessageAdded {
1165            message: tool_message,
1166        },
1167    });
1168
1169    let all_tools_complete = state
1170        .current_operation
1171        .as_ref()
1172        .is_none_or(|op| op.pending_tool_calls.is_empty());
1173    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1174
1175    if all_tools_complete && no_pending_approvals {
1176        effects.push(Effect::CallModel {
1177            session_id,
1178            op_id,
1179            model,
1180            messages: state
1181                .message_graph
1182                .get_thread_messages()
1183                .into_iter()
1184                .cloned()
1185                .collect(),
1186            system_context: state.cached_system_context.clone(),
1187            tools: state.tools.clone(),
1188        });
1189    }
1190
1191    effects
1192}
1193
1194struct ModelResponseCompleteParams {
1195    op_id: crate::app::domain::types::OpId,
1196    message_id: crate::app::domain::types::MessageId,
1197    content: Vec<AssistantContent>,
1198    usage: Option<TokenUsage>,
1199    context_window_tokens: Option<u32>,
1200    configured_max_output_tokens: Option<u32>,
1201    timestamp: u64,
1202}
1203
1204fn handle_model_response_complete(
1205    state: &mut AppState,
1206    session_id: crate::app::domain::types::SessionId,
1207    params: ModelResponseCompleteParams,
1208) -> Vec<Effect> {
1209    let ModelResponseCompleteParams {
1210        op_id,
1211        message_id,
1212        content,
1213        usage,
1214        context_window_tokens,
1215        configured_max_output_tokens,
1216        timestamp,
1217    } = params;
1218    let mut effects = Vec::new();
1219
1220    if state.cancelled_ops.contains(&op_id) {
1221        tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1222        return effects;
1223    }
1224
1225    let tool_calls: Vec<_> = content
1226        .iter()
1227        .filter_map(|c| {
1228            if let AssistantContent::ToolCall { tool_call, .. } = c {
1229                Some(tool_call.clone())
1230            } else {
1231                None
1232            }
1233        })
1234        .collect();
1235
1236    let parent_id = state.message_graph.active_message_id.clone();
1237
1238    let message = Message {
1239        data: MessageData::Assistant {
1240            content: content.clone(),
1241        },
1242        timestamp,
1243        id: message_id.0.clone(),
1244        parent_message_id: parent_id,
1245    };
1246
1247    state.message_graph.add_message(message.clone());
1248    state.message_graph.active_message_id = Some(message_id.0.clone());
1249
1250    let model = match state.operation_models.get(&op_id).cloned() {
1251        Some(model) => model,
1252        None => {
1253            return vec![Effect::EmitEvent {
1254                session_id,
1255                event: SessionEvent::Error {
1256                    message: format!("Missing model for operation {op_id}"),
1257                },
1258            }];
1259        }
1260    };
1261
1262    effects.push(Effect::EmitEvent {
1263        session_id,
1264        event: SessionEvent::AssistantMessageAdded {
1265            message,
1266            model: model.clone(),
1267        },
1268    });
1269
1270    // Capture usage before shadowing – TokenUsage is Copy.
1271    let outer_usage = usage;
1272
1273    if let Some(usage) = outer_usage {
1274        let context_window = derive_context_window_usage(
1275            usage.total_tokens,
1276            context_window_tokens,
1277            configured_max_output_tokens,
1278        );
1279        state.record_llm_usage(op_id, model.clone(), usage, context_window.clone());
1280        effects.push(Effect::EmitEvent {
1281            session_id,
1282            event: SessionEvent::LlmUsageUpdated {
1283                op_id,
1284                model: model.clone(),
1285                usage,
1286                context_window,
1287            },
1288        });
1289    }
1290
1291    if tool_calls.is_empty() {
1292        state.complete_operation(op_id);
1293        effects.push(Effect::EmitEvent {
1294            session_id,
1295            event: SessionEvent::OperationCompleted { op_id },
1296        });
1297        // Try auto-compact first; if it doesn't fire, drain queued work.
1298        let auto = maybe_auto_compact(
1299            state,
1300            session_id,
1301            outer_usage,
1302            context_window_tokens,
1303            configured_max_output_tokens,
1304            &model,
1305        );
1306        if auto.is_empty() {
1307            effects.extend(maybe_start_queued_work(state, session_id));
1308        } else {
1309            effects.extend(auto);
1310        }
1311    } else {
1312        for tool_call in tool_calls {
1313            let request_id = crate::app::domain::types::RequestId::new();
1314            effects.extend(handle_tool_approval_requested(
1315                state, session_id, request_id, tool_call,
1316            ));
1317        }
1318    }
1319
1320    effects
1321}
1322
1323fn handle_model_response_error(
1324    state: &mut AppState,
1325    session_id: crate::app::domain::types::SessionId,
1326    op_id: crate::app::domain::types::OpId,
1327    error: &str,
1328) -> Vec<Effect> {
1329    let mut effects = Vec::new();
1330
1331    if state.cancelled_ops.contains(&op_id) {
1332        return effects;
1333    }
1334
1335    state.complete_operation(op_id);
1336
1337    effects.push(Effect::EmitEvent {
1338        session_id,
1339        event: SessionEvent::Error {
1340            message: error.to_string(),
1341        },
1342    });
1343
1344    effects.push(Effect::EmitEvent {
1345        session_id,
1346        event: SessionEvent::OperationCompleted { op_id },
1347    });
1348
1349    effects.extend(maybe_start_queued_work(state, session_id));
1350
1351    effects
1352}
1353
1354fn handle_direct_bash(
1355    state: &mut AppState,
1356    session_id: crate::app::domain::types::SessionId,
1357    op_id: crate::app::domain::types::OpId,
1358    message_id: crate::app::domain::types::MessageId,
1359    command: String,
1360    timestamp: u64,
1361) -> Vec<Effect> {
1362    let mut effects = Vec::new();
1363
1364    if state.has_active_operation() {
1365        state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1366            command,
1367            op_id,
1368            message_id,
1369            queued_at: timestamp,
1370        });
1371        effects.push(Effect::EmitEvent {
1372            session_id,
1373            event: SessionEvent::QueueUpdated {
1374                queue: snapshot_queue(state),
1375            },
1376        });
1377        return effects;
1378    }
1379
1380    let parent_id = state.message_graph.active_message_id.clone();
1381    let message = Message {
1382        data: MessageData::User {
1383            content: vec![UserContent::CommandExecution {
1384                command: command.clone(),
1385                stdout: String::new(),
1386                stderr: String::new(),
1387                exit_code: 0,
1388            }],
1389        },
1390        timestamp,
1391        id: message_id.0.clone(),
1392        parent_message_id: parent_id,
1393    };
1394
1395    state.message_graph.add_message(message.clone());
1396    state.message_graph.active_message_id = Some(message_id.0.clone());
1397
1398    state.start_operation(
1399        op_id,
1400        OperationKind::DirectBash {
1401            command: command.clone(),
1402        },
1403    );
1404    state.operation_messages.insert(op_id, message_id);
1405
1406    effects.push(Effect::EmitEvent {
1407        session_id,
1408        event: SessionEvent::UserMessageAdded { message },
1409    });
1410
1411    effects.push(Effect::EmitEvent {
1412        session_id,
1413        event: SessionEvent::OperationStarted {
1414            op_id,
1415            kind: OperationKind::DirectBash {
1416                command: command.clone(),
1417            },
1418        },
1419    });
1420
1421    let tool_call = steer_tools::ToolCall {
1422        id: format!("direct_bash_{op_id}"),
1423        name: BASH_TOOL_NAME.to_string(),
1424        parameters: serde_json::json!({ "command": command }),
1425    };
1426
1427    effects.push(Effect::ExecuteTool {
1428        session_id,
1429        op_id,
1430        tool_call,
1431    });
1432
1433    effects
1434}
1435
1436fn handle_dequeue_queued_item(
1437    state: &mut AppState,
1438    session_id: crate::app::domain::types::SessionId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440    if state.pop_next_queued_work().is_some() {
1441        Ok(vec![Effect::EmitEvent {
1442            session_id,
1443            event: SessionEvent::QueueUpdated {
1444                queue: snapshot_queue(state),
1445            },
1446        }])
1447    } else {
1448        Err(invalid_action(
1449            InvalidActionKind::QueueEmpty,
1450            "No queued item to remove.",
1451        ))
1452    }
1453}
1454
1455fn maybe_auto_compact(
1456    state: &mut AppState,
1457    session_id: crate::app::domain::types::SessionId,
1458    usage: Option<TokenUsage>,
1459    context_window_tokens: Option<u32>,
1460    configured_max_output_tokens: Option<u32>,
1461    model: &crate::config::model::ModelId,
1462) -> Vec<Effect> {
1463    // Guard 1: config exists and auto-compaction is enabled.
1464    let config = match &state.session_config {
1465        Some(c) if c.auto_compaction.enabled => c,
1466        _ => return vec![],
1467    };
1468    let threshold = config.auto_compaction.threshold_ratio();
1469
1470    // Guard 2: we have usage data.
1471    let usage = match usage {
1472        Some(u) => u,
1473        None => return vec![],
1474    };
1475
1476    // Guard 3: context window utilization is above threshold (non-estimated).
1477    let cw = match derive_context_window_usage(
1478        usage.total_tokens,
1479        context_window_tokens,
1480        configured_max_output_tokens,
1481    ) {
1482        Some(cw) if !cw.estimated => cw,
1483        _ => return vec![],
1484    };
1485    let ratio = match cw.utilization_ratio {
1486        Some(r) if r >= threshold => r,
1487        _ => return vec![],
1488    };
1489    let _ = ratio; // used only in guard
1490
1491    // Guard 4: enough messages to compact.
1492    if state.message_graph.get_thread_messages().len() < MIN_MESSAGES_FOR_COMPACT {
1493        return vec![];
1494    }
1495
1496    // Guard 5: no queued work pending.
1497    if !state.queued_work.is_empty() {
1498        return vec![];
1499    }
1500
1501    // Guard 6: no active operation (operation was completed before this call).
1502    if state.has_active_operation() {
1503        return vec![];
1504    }
1505
1506    let op_id = crate::app::domain::types::OpId::new();
1507    let kind = OperationKind::Compact {
1508        trigger: CompactTrigger::Auto,
1509    };
1510    state.start_operation(op_id, kind.clone());
1511    state.operation_models.insert(op_id, model.clone());
1512
1513    vec![
1514        Effect::EmitEvent {
1515            session_id,
1516            event: SessionEvent::OperationStarted { op_id, kind },
1517        },
1518        Effect::RequestCompaction {
1519            session_id,
1520            op_id,
1521            model: model.clone(),
1522        },
1523    ]
1524}
1525
1526fn maybe_start_queued_work(
1527    state: &mut AppState,
1528    session_id: crate::app::domain::types::SessionId,
1529) -> Vec<Effect> {
1530    if state.has_active_operation() {
1531        return vec![];
1532    }
1533
1534    let Some(next) = state.pop_next_queued_work() else {
1535        return vec![];
1536    };
1537
1538    let mut effects = vec![Effect::EmitEvent {
1539        session_id,
1540        event: SessionEvent::QueueUpdated {
1541            queue: snapshot_queue(state),
1542        },
1543    }];
1544
1545    match next {
1546        QueuedWorkItem::UserMessage(item) => {
1547            effects.extend(handle_user_input(
1548                state,
1549                session_id,
1550                item.content,
1551                item.op_id,
1552                item.message_id,
1553                item.model,
1554                item.queued_at,
1555            ));
1556        }
1557        QueuedWorkItem::DirectBash(item) => {
1558            effects.extend(handle_direct_bash(
1559                state,
1560                session_id,
1561                item.op_id,
1562                item.message_id,
1563                item.command,
1564                item.queued_at,
1565            ));
1566        }
1567    }
1568
1569    effects
1570}
1571
1572fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1573    state
1574        .queued_work
1575        .iter()
1576        .map(snapshot_queued_work_item)
1577        .collect()
1578}
1579
1580fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1581    match item {
1582        QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1583            kind: Some(QueuedWorkKind::UserMessage),
1584            content: message
1585                .content
1586                .iter()
1587                .filter_map(|item| match item {
1588                    UserContent::Text { text } => Some(text.as_str()),
1589                    _ => None,
1590                })
1591                .collect::<String>(),
1592            queued_at: message.queued_at,
1593            model: Some(message.model.clone()),
1594            op_id: message.op_id,
1595            message_id: message.message_id.clone(),
1596            attachment_count: message
1597                .content
1598                .iter()
1599                .filter(|item| matches!(item, UserContent::Image { .. }))
1600                .count() as u32,
1601        },
1602        QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1603            kind: Some(QueuedWorkKind::DirectBash),
1604            content: command.command.clone(),
1605            queued_at: command.queued_at,
1606            model: None,
1607            op_id: command.op_id,
1608            message_id: command.message_id.clone(),
1609            attachment_count: 0,
1610        },
1611    }
1612}
1613
1614fn handle_request_compaction(
1615    state: &mut AppState,
1616    session_id: crate::app::domain::types::SessionId,
1617    op_id: crate::app::domain::types::OpId,
1618    model: crate::config::model::ModelId,
1619) -> Result<Vec<Effect>, ReduceError> {
1620    let message_count = state.message_graph.get_thread_messages().len();
1621
1622    if state.has_active_operation() {
1623        return Err(invalid_action(
1624            InvalidActionKind::OperationInFlight,
1625            "Cannot compact while an operation is active.",
1626        ));
1627    }
1628
1629    if message_count < MIN_MESSAGES_FOR_COMPACT {
1630        return Ok(vec![Effect::EmitEvent {
1631            session_id,
1632            event: SessionEvent::CompactResult {
1633                result: crate::app::domain::event::CompactResult::InsufficientMessages,
1634                trigger: CompactTrigger::Manual,
1635            },
1636        }]);
1637    }
1638
1639    let kind = OperationKind::Compact {
1640        trigger: CompactTrigger::Manual,
1641    };
1642    state.start_operation(op_id, kind.clone());
1643    state.operation_models.insert(op_id, model.clone());
1644
1645    Ok(vec![
1646        Effect::EmitEvent {
1647            session_id,
1648            event: SessionEvent::OperationStarted { op_id, kind },
1649        },
1650        Effect::RequestCompaction {
1651            session_id,
1652            op_id,
1653            model,
1654        },
1655    ])
1656}
1657
1658fn handle_cancel(
1659    state: &mut AppState,
1660    session_id: crate::app::domain::types::SessionId,
1661    target_op: Option<crate::app::domain::types::OpId>,
1662) -> Vec<Effect> {
1663    let mut effects = Vec::new();
1664
1665    let op = match &state.current_operation {
1666        Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1667        _ => return effects,
1668    };
1669
1670    state.record_cancelled_op(op.op_id);
1671
1672    if let OperationKind::Compact { trigger } = op.kind {
1673        effects.push(Effect::EmitEvent {
1674            session_id,
1675            event: SessionEvent::CompactResult {
1676                result: crate::app::domain::event::CompactResult::Cancelled,
1677                trigger,
1678            },
1679        });
1680    }
1681
1682    let pending_approval = state.pending_approval.take();
1683    let queued_approvals = std::mem::take(&mut state.approval_queue);
1684
1685    if matches!(op.kind, OperationKind::AgentLoop) {
1686        if let Some(pending) = pending_approval {
1687            let tool_name = pending.tool_call.name.clone();
1688            effects.extend(emit_tool_failure_message(
1689                state,
1690                session_id,
1691                &pending.tool_call.id,
1692                &tool_name,
1693                ToolError::Cancelled(tool_name.clone()),
1694                format!("Tool '{tool_name}' cancelled"),
1695                "cancelled",
1696            ));
1697        }
1698
1699        for queued in queued_approvals {
1700            let tool_name = queued.tool_call.name.clone();
1701            effects.extend(emit_tool_failure_message(
1702                state,
1703                session_id,
1704                &queued.tool_call.id,
1705                &tool_name,
1706                ToolError::Cancelled(tool_name.clone()),
1707                format!("Tool '{tool_name}' cancelled"),
1708                "cancelled",
1709            ));
1710        }
1711
1712        for tool_call_id in &op.pending_tool_calls {
1713            let tool_name = state
1714                .message_graph
1715                .find_tool_name_by_id(tool_call_id.as_str())
1716                .unwrap_or_else(|| tool_call_id.as_str().to_string());
1717            let event_error = if tool_name == tool_call_id.as_str() {
1718                format!("Tool call '{tool_call_id}' cancelled")
1719            } else {
1720                format!("Tool '{tool_name}' cancelled")
1721            };
1722            effects.extend(emit_tool_failure_message(
1723                state,
1724                session_id,
1725                tool_call_id.as_str(),
1726                &tool_name,
1727                ToolError::Cancelled(tool_name.clone()),
1728                event_error,
1729                "cancelled",
1730            ));
1731        }
1732    }
1733    state.active_streams.remove(&op.op_id);
1734
1735    let dequeued_item = state.pop_next_queued_work();
1736    let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1737
1738    effects.push(Effect::EmitEvent {
1739        session_id,
1740        event: SessionEvent::OperationCancelled {
1741            op_id: op.op_id,
1742            info: CancellationInfo {
1743                pending_tool_calls: op.pending_tool_calls.len(),
1744                popped_queued_item,
1745            },
1746        },
1747    });
1748
1749    effects.push(Effect::CancelOperation {
1750        session_id,
1751        op_id: op.op_id,
1752    });
1753
1754    state.complete_operation(op.op_id);
1755
1756    if dequeued_item.is_some() {
1757        effects.push(Effect::EmitEvent {
1758            session_id,
1759            event: SessionEvent::QueueUpdated {
1760                queue: snapshot_queue(state),
1761            },
1762        });
1763    }
1764
1765    effects
1766}
1767
1768fn handle_hydrate(
1769    state: &mut AppState,
1770    session_id: crate::app::domain::types::SessionId,
1771    events: Vec<SessionEvent>,
1772    starting_sequence: u64,
1773) -> Vec<Effect> {
1774    for event in events {
1775        apply_event_to_state(state, &event);
1776    }
1777
1778    state.event_sequence = starting_sequence;
1779
1780    emit_mcp_connect_effects(state, session_id)
1781}
1782
1783fn handle_switch_primary_agent(
1784    state: &mut AppState,
1785    session_id: crate::app::domain::types::SessionId,
1786    agent_id: String,
1787) -> Result<Vec<Effect>, ReduceError> {
1788    if state.current_operation.is_some() {
1789        return Err(invalid_action(
1790            InvalidActionKind::OperationInFlight,
1791            "Cannot switch primary agent while an operation is active.",
1792        ));
1793    }
1794
1795    let Some(base_config) = state
1796        .base_session_config
1797        .as_ref()
1798        .or(state.session_config.as_ref())
1799    else {
1800        return Err(invalid_action(
1801            InvalidActionKind::MissingSessionConfig,
1802            "Cannot switch primary agent without session config.",
1803        ));
1804    };
1805
1806    let Some(_spec) = primary_agent_spec(&agent_id) else {
1807        return Err(invalid_action(
1808            InvalidActionKind::UnknownPrimaryAgent,
1809            format!("Unknown primary agent '{agent_id}'."),
1810        ));
1811    };
1812
1813    let mut updated_config = base_config.clone();
1814    updated_config.primary_agent_id = Some(agent_id.clone());
1815    let new_config = resolve_effective_config(&updated_config);
1816    let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1817
1818    apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1819
1820    let mut effects = Vec::new();
1821    effects.push(Effect::EmitEvent {
1822        session_id,
1823        event: SessionEvent::SessionConfigUpdated {
1824            config: Box::new(new_config),
1825            primary_agent_id: agent_id,
1826        },
1827    });
1828    effects.extend(backend_effects);
1829    effects.push(Effect::ReloadToolSchemas { session_id });
1830
1831    Ok(effects)
1832}
1833
1834fn apply_session_config_state(
1835    state: &mut AppState,
1836    config: &crate::session::state::SessionConfig,
1837    primary_agent_id: Option<String>,
1838    update_base: bool,
1839) {
1840    state.apply_session_config(config, primary_agent_id, update_base);
1841}
1842
1843fn mcp_backend_diff_effects(
1844    session_id: crate::app::domain::types::SessionId,
1845    old_config: &crate::session::state::SessionConfig,
1846    new_config: &crate::session::state::SessionConfig,
1847) -> Vec<Effect> {
1848    let old_map = collect_mcp_backends(old_config);
1849    let new_map = collect_mcp_backends(new_config);
1850
1851    let mut effects = Vec::new();
1852
1853    for (server_name, (old_transport, old_filter)) in &old_map {
1854        match new_map.get(server_name) {
1855            None => {
1856                effects.push(Effect::DisconnectMcpServer {
1857                    session_id,
1858                    server_name: server_name.clone(),
1859                });
1860            }
1861            Some((new_transport, new_filter)) => {
1862                if new_transport != old_transport || new_filter != old_filter {
1863                    effects.push(Effect::DisconnectMcpServer {
1864                        session_id,
1865                        server_name: server_name.clone(),
1866                    });
1867                    effects.push(Effect::ConnectMcpServer {
1868                        session_id,
1869                        config: McpServerConfig {
1870                            server_name: server_name.clone(),
1871                            transport: new_transport.clone(),
1872                            tool_filter: new_filter.clone(),
1873                        },
1874                    });
1875                }
1876            }
1877        }
1878    }
1879
1880    for (server_name, (new_transport, new_filter)) in &new_map {
1881        if !old_map.contains_key(server_name) {
1882            effects.push(Effect::ConnectMcpServer {
1883                session_id,
1884                config: McpServerConfig {
1885                    server_name: server_name.clone(),
1886                    transport: new_transport.clone(),
1887                    tool_filter: new_filter.clone(),
1888                },
1889            });
1890        }
1891    }
1892
1893    effects
1894}
1895
1896fn collect_mcp_backends(
1897    config: &crate::session::state::SessionConfig,
1898) -> std::collections::HashMap<
1899    String,
1900    (
1901        crate::tools::McpTransport,
1902        crate::session::state::ToolFilter,
1903    ),
1904> {
1905    let mut map = std::collections::HashMap::new();
1906
1907    for backend_config in &config.tool_config.backends {
1908        let BackendConfig::Mcp {
1909            server_name,
1910            transport,
1911            tool_filter,
1912        } = backend_config;
1913
1914        map.insert(
1915            server_name.clone(),
1916            (transport.clone(), tool_filter.clone()),
1917        );
1918    }
1919
1920    map
1921}
1922
1923pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1924    match event {
1925        SessionEvent::SessionCreated { config, .. } => {
1926            let primary_agent_id = config
1927                .primary_agent_id
1928                .clone()
1929                .unwrap_or_else(|| default_primary_agent_id().to_string());
1930            apply_session_config_state(state, config, Some(primary_agent_id), true);
1931        }
1932        SessionEvent::SessionConfigUpdated {
1933            config,
1934            primary_agent_id,
1935        } => {
1936            apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1937        }
1938        SessionEvent::AssistantMessageAdded { message, .. }
1939        | SessionEvent::UserMessageAdded { message }
1940        | SessionEvent::ToolMessageAdded { message } => {
1941            state.message_graph.add_message(message.clone());
1942            state.message_graph.active_message_id = Some(message.id().to_string());
1943        }
1944        SessionEvent::MessageUpdated { message } => {
1945            state.message_graph.replace_message(message.clone());
1946        }
1947        SessionEvent::ApprovalDecided {
1948            decision, remember, ..
1949        } => {
1950            if *decision == ApprovalDecision::Approved
1951                && let Some(memory) = remember
1952            {
1953                match memory {
1954                    ApprovalMemory::Tool(name) => {
1955                        state.approved_tools.insert(name.clone());
1956                    }
1957                    ApprovalMemory::BashPattern(pattern) => {
1958                        state.approved_bash_patterns.insert(pattern.clone());
1959                    }
1960                    ApprovalMemory::PendingTool => {}
1961                }
1962            }
1963            state.pending_approval = None;
1964        }
1965        SessionEvent::OperationCompleted { op_id } => {
1966            state.complete_operation(*op_id);
1967        }
1968        SessionEvent::OperationCancelled { op_id, .. } => {
1969            state.record_cancelled_op(*op_id);
1970            state.complete_operation(*op_id);
1971        }
1972        SessionEvent::LlmUsageUpdated {
1973            op_id,
1974            model,
1975            usage,
1976            context_window,
1977        } => {
1978            state.record_llm_usage(*op_id, model.clone(), *usage, context_window.clone());
1979        }
1980        SessionEvent::McpServerStateChanged {
1981            server_name,
1982            state: mcp_state,
1983        } => {
1984            state
1985                .mcp_servers
1986                .insert(server_name.clone(), mcp_state.clone());
1987        }
1988        SessionEvent::QueueUpdated { queue } => {
1989            let parse_content = |content: &str| {
1990                if content.trim().is_empty() {
1991                    None
1992                } else {
1993                    Some(vec![UserContent::Text {
1994                        text: content.to_string(),
1995                    }])
1996                }
1997            };
1998
1999            state.queued_work = queue
2000                .iter()
2001                .filter_map(|item| match item.kind {
2002                    Some(QueuedWorkKind::UserMessage) => {
2003                        let content = parse_content(item.content.as_str())?;
2004                        Some(QueuedWorkItem::UserMessage(
2005                            crate::app::domain::state::QueuedUserMessage {
2006                                content,
2007                                op_id: item.op_id,
2008                                message_id: item.message_id.clone(),
2009                                model: item.model.clone().unwrap_or_else(
2010                                    crate::config::model::builtin::claude_sonnet_4_5,
2011                                ),
2012                                queued_at: item.queued_at,
2013                            },
2014                        ))
2015                    }
2016                    Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
2017                        crate::app::domain::state::QueuedBashCommand {
2018                            command: item.content.clone(),
2019                            op_id: item.op_id,
2020                            message_id: item.message_id.clone(),
2021                            queued_at: item.queued_at,
2022                        },
2023                    )),
2024                    None => {
2025                        let content = parse_content(item.content.as_str())?;
2026                        Some(QueuedWorkItem::UserMessage(
2027                            crate::app::domain::state::QueuedUserMessage {
2028                                content,
2029                                op_id: item.op_id,
2030                                message_id: item.message_id.clone(),
2031                                model: item.model.clone().unwrap_or_else(
2032                                    crate::config::model::builtin::claude_sonnet_4_5,
2033                                ),
2034                                queued_at: item.queued_at,
2035                            },
2036                        ))
2037                    }
2038                })
2039                .collect();
2040        }
2041        SessionEvent::ConversationCompacted { record } => {
2042            state
2043                .compaction_summary_ids
2044                .insert(record.summary_message_id.to_string());
2045            state
2046                .message_graph
2047                .mark_compaction_summary(record.summary_message_id.to_string());
2048        }
2049        _ => {}
2050    }
2051
2052    state.event_sequence += 1;
2053}
2054
2055fn maybe_continue_after_compaction(
2056    state: &mut AppState,
2057    session_id: crate::app::domain::types::SessionId,
2058    trigger: CompactTrigger,
2059    model: &crate::config::model::ModelId,
2060) -> Vec<Effect> {
2061    if trigger != CompactTrigger::Auto || state.has_active_operation() {
2062        return vec![];
2063    }
2064
2065    let op_id = crate::app::domain::types::OpId::new();
2066    state.start_operation(op_id, OperationKind::AgentLoop);
2067    state.operation_models.insert(op_id, model.clone());
2068
2069    let mut messages: Vec<Message> = state
2070        .message_graph
2071        .get_thread_messages()
2072        .into_iter()
2073        .cloned()
2074        .collect();
2075    messages.push(Message {
2076        data: MessageData::User {
2077            content: vec![UserContent::Text {
2078                text: COMPACTION_CONTINUE_PROMPT.to_string(),
2079            }],
2080        },
2081        timestamp: Message::current_timestamp(),
2082        id: format!("compact_continue_{op_id}"),
2083        parent_message_id: state.message_graph.active_message_id.clone(),
2084    });
2085
2086    vec![
2087        Effect::EmitEvent {
2088            session_id,
2089            event: SessionEvent::OperationStarted {
2090                op_id,
2091                kind: OperationKind::AgentLoop,
2092            },
2093        },
2094        Effect::CallModel {
2095            session_id,
2096            op_id,
2097            model: model.clone(),
2098            messages,
2099            system_context: state.cached_system_context.clone(),
2100            tools: state.tools.clone(),
2101        },
2102    ]
2103}
2104
2105struct CompactionCompleteParams {
2106    op_id: crate::app::domain::types::OpId,
2107    compaction_id: crate::app::domain::types::CompactionId,
2108    summary_message_id: crate::app::domain::types::MessageId,
2109    summary: String,
2110    compacted_head_message_id: crate::app::domain::types::MessageId,
2111    previous_active_message_id: Option<crate::app::domain::types::MessageId>,
2112    model_name: String,
2113    timestamp: u64,
2114}
2115
2116fn handle_compaction_complete(
2117    state: &mut AppState,
2118    session_id: crate::app::domain::types::SessionId,
2119    params: CompactionCompleteParams,
2120) -> Vec<Effect> {
2121    use crate::app::conversation::{AssistantContent, Message, MessageData};
2122    use crate::app::domain::types::CompactionRecord;
2123
2124    let CompactionCompleteParams {
2125        op_id,
2126        compaction_id,
2127        summary_message_id,
2128        summary,
2129        compacted_head_message_id,
2130        previous_active_message_id,
2131        model_name,
2132        timestamp,
2133    } = params;
2134
2135    let summary_message = Message {
2136        data: MessageData::Assistant {
2137            content: vec![AssistantContent::Text {
2138                text: summary.clone(),
2139            }],
2140        },
2141        id: summary_message_id.to_string(),
2142        parent_message_id: None,
2143        timestamp,
2144    };
2145
2146    state.message_graph.add_message(summary_message.clone());
2147
2148    // Mark the summary so get_active_thread() stops here (LLM won't see older messages).
2149    state
2150        .compaction_summary_ids
2151        .insert(summary_message_id.to_string());
2152    state
2153        .message_graph
2154        .mark_compaction_summary(summary_message_id.to_string());
2155
2156    let record = CompactionRecord::with_timestamp(
2157        compaction_id,
2158        summary_message_id,
2159        compacted_head_message_id,
2160        previous_active_message_id,
2161        model_name,
2162        timestamp,
2163    );
2164
2165    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
2166        model
2167    } else {
2168        state.complete_operation(op_id);
2169        return vec![Effect::EmitEvent {
2170            session_id,
2171            event: SessionEvent::Error {
2172                message: format!("Missing model for compaction operation {op_id}"),
2173            },
2174        }];
2175    };
2176
2177    let trigger = state
2178        .current_operation
2179        .as_ref()
2180        .and_then(|op| match op.kind {
2181            OperationKind::Compact { trigger } => Some(trigger),
2182            _ => None,
2183        })
2184        .unwrap_or(CompactTrigger::Manual);
2185
2186    state.complete_operation(op_id);
2187
2188    let mut effects = vec![
2189        Effect::EmitEvent {
2190            session_id,
2191            event: SessionEvent::AssistantMessageAdded {
2192                message: summary_message,
2193                model: model.clone(),
2194            },
2195        },
2196        Effect::EmitEvent {
2197            session_id,
2198            event: SessionEvent::CompactResult {
2199                result: crate::app::domain::event::CompactResult::Success(summary),
2200                trigger,
2201            },
2202        },
2203        Effect::EmitEvent {
2204            session_id,
2205            event: SessionEvent::ConversationCompacted { record },
2206        },
2207        Effect::EmitEvent {
2208            session_id,
2209            event: SessionEvent::OperationCompleted { op_id },
2210        },
2211    ];
2212
2213    effects.extend(maybe_start_queued_work(state, session_id));
2214    effects.extend(maybe_continue_after_compaction(
2215        state, session_id, trigger, &model,
2216    ));
2217
2218    effects
2219}
2220
2221fn handle_compaction_failed(
2222    state: &mut AppState,
2223    session_id: crate::app::domain::types::SessionId,
2224    op_id: crate::app::domain::types::OpId,
2225    error: String,
2226) -> Vec<Effect> {
2227    let trigger = state
2228        .current_operation
2229        .as_ref()
2230        .and_then(|op| match op.kind {
2231            OperationKind::Compact { trigger } => Some(trigger),
2232            _ => None,
2233        })
2234        .unwrap_or(CompactTrigger::Manual);
2235
2236    state.complete_operation(op_id);
2237
2238    let mut effects = vec![
2239        Effect::EmitEvent {
2240            session_id,
2241            event: SessionEvent::CompactResult {
2242                result: crate::app::domain::event::CompactResult::Failed(error),
2243                trigger,
2244            },
2245        },
2246        Effect::EmitEvent {
2247            session_id,
2248            event: SessionEvent::OperationCompleted { op_id },
2249        },
2250    ];
2251
2252    effects.extend(maybe_start_queued_work(state, session_id));
2253
2254    effects
2255}
2256
2257fn emit_mcp_connect_effects(
2258    state: &AppState,
2259    session_id: crate::app::domain::types::SessionId,
2260) -> Vec<Effect> {
2261    let mut effects = Vec::new();
2262
2263    let Some(ref config) = state.session_config else {
2264        return effects;
2265    };
2266
2267    for backend_config in &config.tool_config.backends {
2268        let BackendConfig::Mcp {
2269            server_name,
2270            transport,
2271            tool_filter,
2272        } = backend_config;
2273
2274        let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2275            matches!(
2276                s,
2277                McpServerState::Connecting | McpServerState::Connected { .. }
2278            )
2279        });
2280
2281        if !already_connected {
2282            effects.push(Effect::ConnectMcpServer {
2283                session_id,
2284                config: McpServerConfig {
2285                    server_name: server_name.clone(),
2286                    transport: transport.clone(),
2287                    tool_filter: tool_filter.clone(),
2288                },
2289            });
2290        }
2291    }
2292
2293    effects
2294}
2295
2296#[cfg(test)]
2297mod tests {
2298    use super::*;
2299    use crate::api::provider::TokenUsage;
2300    use crate::app::domain::event::ContextWindowUsage;
2301    use crate::app::domain::state::{OperationState, PendingApproval};
2302    use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2303    use crate::config::model::builtin;
2304    use crate::primary_agents::resolve_effective_config;
2305    use crate::session::state::{
2306        ApprovalRules, SessionConfig, SessionPolicyOverrides, ToolApprovalPolicy, ToolVisibility,
2307        UnapprovedBehavior,
2308    };
2309    use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2310    use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2311    use schemars::schema_for;
2312    use serde_json::json;
2313    use std::collections::HashSet;
2314    use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2315
2316    fn test_state() -> AppState {
2317        AppState::new(SessionId::new())
2318    }
2319
2320    fn test_schema(name: &str) -> ToolSchema {
2321        ToolSchema {
2322            name: name.to_string(),
2323            display_name: name.to_string(),
2324            description: String::new(),
2325            input_schema: InputSchema::empty_object(),
2326        }
2327    }
2328
2329    fn base_session_config() -> SessionConfig {
2330        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2331        config.primary_agent_id = Some("normal".to_string());
2332        config.policy_overrides = SessionPolicyOverrides::empty();
2333        resolve_effective_config(&config)
2334    }
2335
2336    fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2337        super::reduce(state, action).expect("reduce failed")
2338    }
2339
2340    #[test]
2341    fn test_user_input_starts_operation() {
2342        let mut state = test_state();
2343        let session_id = state.session_id;
2344        let op_id = OpId::new();
2345        let message_id = MessageId::new();
2346        let model = builtin::claude_sonnet_4_5();
2347
2348        let effects = reduce(
2349            &mut state,
2350            Action::UserInput {
2351                session_id,
2352                content: vec![UserContent::Text {
2353                    text: "Hello".to_string(),
2354                }],
2355                op_id,
2356                message_id,
2357                model,
2358                timestamp: 1_234_567_890,
2359            },
2360        );
2361
2362        assert_eq!(state.message_graph.messages.len(), 1);
2363        assert!(state.current_operation.is_some());
2364        assert!(
2365            effects
2366                .iter()
2367                .any(|e| matches!(e, Effect::CallModel { .. }))
2368        );
2369    }
2370
2371    #[test]
2372    fn test_switch_primary_agent_updates_visibility() {
2373        let mut state = test_state();
2374        let session_id = state.session_id;
2375        let config = base_session_config();
2376        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2377
2378        let effects = reduce(
2379            &mut state,
2380            Action::SwitchPrimaryAgent {
2381                session_id,
2382                agent_id: "plan".to_string(),
2383            },
2384        );
2385
2386        let updated = state.session_config.as_ref().expect("config");
2387        match &updated.tool_config.visibility {
2388            ToolVisibility::Whitelist(allowed) => {
2389                assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2390                for name in READ_ONLY_TOOL_NAMES {
2391                    assert!(allowed.contains(*name));
2392                }
2393                assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2394            }
2395            other => panic!("Unexpected tool visibility: {other:?}"),
2396        }
2397        assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2398        assert!(effects.iter().any(|e| matches!(
2399            e,
2400            Effect::EmitEvent {
2401                event: SessionEvent::SessionConfigUpdated { .. },
2402                ..
2403            }
2404        )));
2405        assert!(
2406            effects
2407                .iter()
2408                .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2409        );
2410    }
2411
2412    #[test]
2413    fn test_switch_primary_agent_yolo_auto_approves() {
2414        let mut state = test_state();
2415        let session_id = state.session_id;
2416        let config = base_session_config();
2417        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2418
2419        let _ = reduce(
2420            &mut state,
2421            Action::SwitchPrimaryAgent {
2422                session_id,
2423                agent_id: "yolo".to_string(),
2424            },
2425        );
2426
2427        let updated = state.session_config.as_ref().expect("config");
2428        assert_eq!(
2429            updated.tool_config.approval_policy.default_behavior,
2430            UnapprovedBehavior::Allow
2431        );
2432    }
2433
2434    #[test]
2435    fn dispatch_agent_resume_is_auto_approved() {
2436        let mut state = test_state();
2437        let session_id = state.session_id;
2438        let config = base_session_config();
2439        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2440
2441        let tool_call = ToolCall {
2442            id: "tc_dispatch_resume".to_string(),
2443            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2444            parameters: json!({
2445                "prompt": "resume work",
2446                "target": {
2447                    "session": "resume",
2448                    "session_id": SessionId::new().to_string()
2449                }
2450            }),
2451        };
2452
2453        let decision = get_tool_decision(&state, &tool_call);
2454        assert_eq!(decision, ToolDecision::Allow);
2455        assert_eq!(state.session_id, session_id);
2456    }
2457
2458    #[test]
2459    fn test_switch_primary_agent_restores_base_prompt() {
2460        let mut state = test_state();
2461        let session_id = state.session_id;
2462        let mut config = base_session_config();
2463        config.system_prompt = Some("base prompt".to_string());
2464        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2465
2466        let _ = reduce(
2467            &mut state,
2468            Action::SwitchPrimaryAgent {
2469                session_id,
2470                agent_id: "plan".to_string(),
2471            },
2472        );
2473
2474        let _ = reduce(
2475            &mut state,
2476            Action::SwitchPrimaryAgent {
2477                session_id,
2478                agent_id: "normal".to_string(),
2479            },
2480        );
2481
2482        let updated = state.session_config.as_ref().expect("config");
2483        assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2484    }
2485
2486    #[test]
2487    fn test_switch_primary_agent_blocked_during_operation() {
2488        let mut state = test_state();
2489        let session_id = state.session_id;
2490        let config = base_session_config();
2491        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2492
2493        state.current_operation = Some(OperationState {
2494            op_id: OpId::new(),
2495            kind: OperationKind::AgentLoop,
2496            pending_tool_calls: HashSet::new(),
2497        });
2498
2499        let result = super::reduce(
2500            &mut state,
2501            Action::SwitchPrimaryAgent {
2502                session_id,
2503                agent_id: "plan".to_string(),
2504            },
2505        );
2506
2507        assert!(matches!(
2508            result,
2509            Err(ReduceError::InvalidAction {
2510                kind: InvalidActionKind::OperationInFlight,
2511                ..
2512            })
2513        ));
2514        assert!(state.primary_agent_id.as_deref() == Some("normal"));
2515    }
2516
2517    #[test]
2518    fn test_late_result_ignored_after_cancel() {
2519        let mut state = test_state();
2520        let session_id = state.session_id;
2521        let op_id = OpId::new();
2522        let tool_call_id = ToolCallId::from_string("tc_1");
2523
2524        state.current_operation = Some(OperationState {
2525            op_id,
2526            kind: OperationKind::AgentLoop,
2527            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2528        });
2529
2530        let _ = reduce(
2531            &mut state,
2532            Action::Cancel {
2533                session_id,
2534                op_id: None,
2535            },
2536        );
2537
2538        state.current_operation = Some(OperationState {
2539            op_id,
2540            kind: OperationKind::AgentLoop,
2541            pending_tool_calls: HashSet::new(),
2542        });
2543        state
2544            .operation_models
2545            .insert(op_id, builtin::claude_sonnet_4_5());
2546        state
2547            .operation_models
2548            .insert(op_id, builtin::claude_sonnet_4_5());
2549        state
2550            .operation_models
2551            .insert(op_id, builtin::claude_sonnet_4_5());
2552
2553        let effects = reduce(
2554            &mut state,
2555            Action::ToolResult {
2556                session_id,
2557                tool_call_id,
2558                tool_name: "test".to_string(),
2559                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2560                    tool_name: "test".to_string(),
2561                    payload: "done".to_string(),
2562                })),
2563            },
2564        );
2565
2566        assert!(effects.is_empty());
2567    }
2568
2569    #[test]
2570    fn test_pre_approved_tool_executes_immediately() {
2571        let mut state = test_state();
2572        let session_id = state.session_id;
2573        let op_id = OpId::new();
2574
2575        state.approved_tools.insert("test_tool".to_string());
2576        state.current_operation = Some(OperationState {
2577            op_id,
2578            kind: OperationKind::AgentLoop,
2579            pending_tool_calls: HashSet::new(),
2580        });
2581        state
2582            .operation_models
2583            .insert(op_id, builtin::claude_sonnet_4_5());
2584        state
2585            .operation_models
2586            .insert(op_id, builtin::claude_sonnet_4_5());
2587        state
2588            .operation_models
2589            .insert(op_id, builtin::claude_sonnet_4_5());
2590
2591        let tool_call = steer_tools::ToolCall {
2592            id: "tc_1".to_string(),
2593            name: "test_tool".to_string(),
2594            parameters: serde_json::json!({}),
2595        };
2596
2597        let effects = reduce(
2598            &mut state,
2599            Action::ToolApprovalRequested {
2600                session_id,
2601                request_id: RequestId::new(),
2602                tool_call,
2603            },
2604        );
2605
2606        assert!(
2607            effects
2608                .iter()
2609                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2610        );
2611        assert!(state.pending_approval.is_none());
2612    }
2613
2614    #[test]
2615    fn test_denied_tool_request_emits_failure_message() {
2616        let mut state = test_state();
2617        let session_id = state.session_id;
2618        let op_id = OpId::new();
2619
2620        state.current_operation = Some(OperationState {
2621            op_id,
2622            kind: OperationKind::AgentLoop,
2623            pending_tool_calls: HashSet::new(),
2624        });
2625
2626        state
2627            .operation_models
2628            .insert(op_id, builtin::claude_sonnet_4_5());
2629
2630        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2631        config.tool_config.approval_policy = ToolApprovalPolicy {
2632            default_behavior: UnapprovedBehavior::Deny,
2633            preapproved: ApprovalRules::default(),
2634        };
2635        state.session_config = Some(config);
2636
2637        let tool_call = steer_tools::ToolCall {
2638            id: "tc_1".to_string(),
2639            name: "test_tool".to_string(),
2640            parameters: serde_json::json!({}),
2641        };
2642
2643        let effects = reduce(
2644            &mut state,
2645            Action::ToolApprovalRequested {
2646                session_id,
2647                request_id: RequestId::new(),
2648                tool_call,
2649            },
2650        );
2651
2652        assert!(effects.iter().any(|e| matches!(
2653            e,
2654            Effect::EmitEvent {
2655                event: SessionEvent::ToolCallFailed { .. },
2656                ..
2657            }
2658        )));
2659        assert!(effects.iter().any(|e| matches!(
2660            e,
2661            Effect::EmitEvent {
2662                event: SessionEvent::ToolMessageAdded { .. },
2663                ..
2664            }
2665        )));
2666        assert!(
2667            !effects
2668                .iter()
2669                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2670        );
2671        assert!(
2672            !effects
2673                .iter()
2674                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2675        );
2676        assert!(state.pending_approval.is_none());
2677        assert!(state.approval_queue.is_empty());
2678        assert_eq!(state.message_graph.messages.len(), 1);
2679
2680        match &state.message_graph.messages[0].data {
2681            MessageData::Tool { result, .. } => match result {
2682                ToolResult::Error(error) => {
2683                    assert!(
2684                        matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2685                    );
2686                }
2687                _ => panic!("expected denied tool error"),
2688            },
2689            _ => panic!("expected tool message"),
2690        }
2691    }
2692
2693    #[test]
2694    fn test_user_denied_tool_request_emits_failure_message() {
2695        let mut state = test_state();
2696        let session_id = state.session_id;
2697        let op_id = OpId::new();
2698
2699        state.current_operation = Some(OperationState {
2700            op_id,
2701            kind: OperationKind::AgentLoop,
2702            pending_tool_calls: HashSet::new(),
2703        });
2704        state
2705            .operation_models
2706            .insert(op_id, builtin::claude_sonnet_4_5());
2707
2708        let tool_call = steer_tools::ToolCall {
2709            id: "tc_1".to_string(),
2710            name: "test_tool".to_string(),
2711            parameters: serde_json::json!({}),
2712        };
2713        let request_id = RequestId::new();
2714        state.pending_approval = Some(PendingApproval {
2715            request_id,
2716            tool_call: tool_call.clone(),
2717        });
2718
2719        let effects = reduce(
2720            &mut state,
2721            Action::ToolApprovalDecided {
2722                session_id,
2723                request_id,
2724                decision: ApprovalDecision::Denied,
2725                remember: None,
2726            },
2727        );
2728
2729        assert!(effects.iter().any(|e| matches!(
2730            e,
2731            Effect::EmitEvent {
2732                event: SessionEvent::ToolCallFailed { .. },
2733                ..
2734            }
2735        )));
2736        assert!(effects.iter().any(|e| matches!(
2737            e,
2738            Effect::EmitEvent {
2739                event: SessionEvent::ToolMessageAdded { .. },
2740                ..
2741            }
2742        )));
2743        assert!(
2744            !effects
2745                .iter()
2746                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2747        );
2748        assert!(state.pending_approval.is_none());
2749        assert!(state.approval_queue.is_empty());
2750        assert_eq!(state.message_graph.messages.len(), 1);
2751
2752        match &state.message_graph.messages[0].data {
2753            MessageData::Tool { result, .. } => match result {
2754                ToolResult::Error(error) => {
2755                    assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2756                }
2757                _ => panic!("expected denied tool error"),
2758            },
2759            _ => panic!("expected tool message"),
2760        }
2761    }
2762
2763    #[test]
2764    fn test_cancel_pops_queued_item_without_auto_start() {
2765        let mut state = test_state();
2766        let session_id = state.session_id;
2767        let op_id = OpId::new();
2768
2769        state.current_operation = Some(OperationState {
2770            op_id,
2771            kind: OperationKind::AgentLoop,
2772            pending_tool_calls: HashSet::new(),
2773        });
2774        state
2775            .operation_models
2776            .insert(op_id, builtin::claude_sonnet_4_5());
2777
2778        let queued_op = OpId::new();
2779        let queued_message_id = MessageId::from_string("queued_msg");
2780        let _ = reduce(
2781            &mut state,
2782            Action::UserInput {
2783                session_id,
2784                content: vec![UserContent::Text {
2785                    text: "Queued message".to_string(),
2786                }],
2787                op_id: queued_op,
2788                message_id: queued_message_id.clone(),
2789                model: builtin::claude_sonnet_4_5(),
2790                timestamp: 1,
2791            },
2792        );
2793
2794        let effects = reduce(
2795            &mut state,
2796            Action::Cancel {
2797                session_id,
2798                op_id: None,
2799            },
2800        );
2801
2802        assert!(state.current_operation.is_none());
2803        assert!(state.queued_work.is_empty());
2804
2805        let cancellation_info = effects.iter().find_map(|effect| match effect {
2806            Effect::EmitEvent {
2807                event: SessionEvent::OperationCancelled { info, .. },
2808                ..
2809            } => Some(info),
2810            _ => None,
2811        });
2812        let info = cancellation_info.expect("expected OperationCancelled event");
2813        let popped = info
2814            .popped_queued_item
2815            .as_ref()
2816            .expect("expected popped queued item");
2817        assert_eq!(popped.content, "Queued message");
2818        assert_eq!(popped.op_id, queued_op);
2819        assert_eq!(popped.message_id, queued_message_id);
2820
2821        assert!(
2822            !effects.iter().any(|effect| matches!(
2823                effect,
2824                Effect::EmitEvent {
2825                    event: SessionEvent::OperationStarted { .. },
2826                    ..
2827                }
2828            )),
2829            "queued work should not auto-start on cancel"
2830        );
2831    }
2832
2833    #[test]
2834    fn test_cancel_injects_tool_results_for_pending_calls() {
2835        let mut state = test_state();
2836        let session_id = state.session_id;
2837        let op_id = OpId::new();
2838
2839        let tool_call = ToolCall {
2840            id: "tc_1".to_string(),
2841            name: "test_tool".to_string(),
2842            parameters: serde_json::json!({}),
2843        };
2844
2845        state.message_graph.add_message(Message {
2846            data: MessageData::Assistant {
2847                content: vec![AssistantContent::ToolCall {
2848                    tool_call: tool_call.clone(),
2849                    thought_signature: None,
2850                }],
2851            },
2852            timestamp: 0,
2853            id: "msg_1".to_string(),
2854            parent_message_id: None,
2855        });
2856
2857        state.current_operation = Some(OperationState {
2858            op_id,
2859            kind: OperationKind::AgentLoop,
2860            pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2861        });
2862        state
2863            .operation_models
2864            .insert(op_id, builtin::claude_sonnet_4_5());
2865
2866        let effects = reduce(
2867            &mut state,
2868            Action::Cancel {
2869                session_id,
2870                op_id: None,
2871            },
2872        );
2873
2874        assert!(effects.iter().any(|e| matches!(
2875            e,
2876            Effect::EmitEvent {
2877                event: SessionEvent::ToolMessageAdded { .. },
2878                ..
2879            }
2880        )));
2881
2882        let tool_message = state
2883            .message_graph
2884            .messages
2885            .iter()
2886            .find(|message| matches!(message.data, MessageData::Tool { .. }))
2887            .expect("tool result should be injected on cancel");
2888
2889        match &tool_message.data {
2890            MessageData::Tool { result, .. } => match result {
2891                ToolResult::Error(error) => {
2892                    assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2893                }
2894                _ => panic!("expected cancelled tool error"),
2895            },
2896            _ => panic!("expected tool message"),
2897        }
2898    }
2899
2900    #[test]
2901    fn test_malformed_tool_call_auto_denies() {
2902        let mut state = test_state();
2903        let session_id = state.session_id;
2904        let op_id = OpId::new();
2905
2906        state.current_operation = Some(OperationState {
2907            op_id,
2908            kind: OperationKind::AgentLoop,
2909            pending_tool_calls: HashSet::new(),
2910        });
2911
2912        state
2913            .operation_models
2914            .insert(op_id, builtin::claude_sonnet_4_5());
2915
2916        let mut properties = serde_json::Map::new();
2917        properties.insert("command".to_string(), json!({ "type": "string" }));
2918
2919        state.tools.push(ToolSchema {
2920            name: "test_tool".to_string(),
2921            display_name: "test_tool".to_string(),
2922            description: String::new(),
2923            input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2924        });
2925
2926        let tool_call = ToolCall {
2927            id: "tc_1".to_string(),
2928            name: "test_tool".to_string(),
2929            parameters: json!({}),
2930        };
2931
2932        let effects = reduce(
2933            &mut state,
2934            Action::ToolApprovalRequested {
2935                session_id,
2936                request_id: RequestId::new(),
2937                tool_call,
2938            },
2939        );
2940
2941        assert!(effects.iter().any(|e| matches!(
2942            e,
2943            Effect::EmitEvent {
2944                event: SessionEvent::ToolCallFailed { .. },
2945                ..
2946            }
2947        )));
2948        assert!(effects.iter().any(|e| matches!(
2949            e,
2950            Effect::EmitEvent {
2951                event: SessionEvent::ToolMessageAdded { .. },
2952                ..
2953            }
2954        )));
2955        assert!(
2956            !effects
2957                .iter()
2958                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2959        );
2960        assert!(
2961            !effects
2962                .iter()
2963                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2964        );
2965        assert!(state.pending_approval.is_none());
2966        assert!(state.approval_queue.is_empty());
2967        assert_eq!(state.message_graph.messages.len(), 1);
2968
2969        match &state.message_graph.messages[0].data {
2970            MessageData::Tool { result, .. } => match result {
2971                ToolResult::Error(error) => {
2972                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2973                }
2974                _ => panic!("expected invalid params tool error"),
2975            },
2976            _ => panic!("expected tool message"),
2977        }
2978    }
2979
2980    #[test]
2981    fn test_approval_queuing() {
2982        let mut state = test_state();
2983        let session_id = state.session_id;
2984        let op_id = OpId::new();
2985
2986        state.current_operation = Some(OperationState {
2987            op_id,
2988            kind: OperationKind::AgentLoop,
2989            pending_tool_calls: HashSet::new(),
2990        });
2991
2992        let tool_call_1 = steer_tools::ToolCall {
2993            id: "tc_1".to_string(),
2994            name: "tool_1".to_string(),
2995            parameters: serde_json::json!({}),
2996        };
2997        let tool_call_2 = steer_tools::ToolCall {
2998            id: "tc_2".to_string(),
2999            name: "tool_2".to_string(),
3000            parameters: serde_json::json!({}),
3001        };
3002
3003        let _ = reduce(
3004            &mut state,
3005            Action::ToolApprovalRequested {
3006                session_id,
3007                request_id: RequestId::new(),
3008                tool_call: tool_call_1,
3009            },
3010        );
3011
3012        assert!(state.pending_approval.is_some());
3013
3014        let _ = reduce(
3015            &mut state,
3016            Action::ToolApprovalRequested {
3017                session_id,
3018                request_id: RequestId::new(),
3019                tool_call: tool_call_2,
3020            },
3021        );
3022
3023        assert_eq!(state.approval_queue.len(), 1);
3024    }
3025
3026    #[test]
3027    fn test_unknown_tool_is_treated_as_nonexistent_and_never_prompts() {
3028        let mut state = test_state();
3029        let session_id = state.session_id;
3030        let op_id = OpId::new();
3031
3032        state.current_operation = Some(OperationState {
3033            op_id,
3034            kind: OperationKind::AgentLoop,
3035            pending_tool_calls: HashSet::new(),
3036        });
3037
3038        state
3039            .operation_models
3040            .insert(op_id, builtin::claude_sonnet_4_5());
3041
3042        state.tools.push(test_schema("view"));
3043
3044        let tool_call = ToolCall {
3045            id: "tc_unknown".to_string(),
3046            name: "bash".to_string(),
3047            parameters: json!({ "command": "echo hi" }),
3048        };
3049
3050        let effects = reduce(
3051            &mut state,
3052            Action::ToolApprovalRequested {
3053                session_id,
3054                request_id: RequestId::new(),
3055                tool_call,
3056            },
3057        );
3058
3059        assert!(effects.iter().any(|e| matches!(
3060            e,
3061            Effect::EmitEvent {
3062                event: SessionEvent::ToolCallFailed { .. },
3063                ..
3064            }
3065        )));
3066        assert!(effects.iter().any(|e| matches!(
3067            e,
3068            Effect::EmitEvent {
3069                event: SessionEvent::ToolMessageAdded { .. },
3070                ..
3071            }
3072        )));
3073        assert!(
3074            !effects
3075                .iter()
3076                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3077        );
3078        assert!(state.pending_approval.is_none());
3079        assert!(state.approval_queue.is_empty());
3080        assert_eq!(state.message_graph.messages.len(), 1);
3081
3082        match &state.message_graph.messages[0].data {
3083            MessageData::Tool { result, .. } => match result {
3084                ToolResult::Error(error) => {
3085                    assert!(matches!(error, ToolError::UnknownTool(name) if name == "bash"));
3086                }
3087                _ => panic!("expected unknown tool error"),
3088            },
3089            _ => panic!("expected tool message"),
3090        }
3091    }
3092
3093    #[test]
3094    fn test_dispatch_agent_missing_target_auto_denies() {
3095        let mut state = test_state();
3096        let session_id = state.session_id;
3097        let op_id = OpId::new();
3098
3099        state.current_operation = Some(OperationState {
3100            op_id,
3101            kind: OperationKind::AgentLoop,
3102            pending_tool_calls: HashSet::new(),
3103        });
3104
3105        state
3106            .operation_models
3107            .insert(op_id, builtin::claude_sonnet_4_5());
3108
3109        let input_schema: InputSchema =
3110            schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
3111        state.tools.push(ToolSchema {
3112            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3113            display_name: "Dispatch Agent".to_string(),
3114            description: String::new(),
3115            input_schema,
3116        });
3117
3118        let tool_call = ToolCall {
3119            id: "tc_dispatch".to_string(),
3120            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3121            parameters: json!({ "prompt": "hello world" }),
3122        };
3123
3124        let effects = reduce(
3125            &mut state,
3126            Action::ToolApprovalRequested {
3127                session_id,
3128                request_id: RequestId::new(),
3129                tool_call,
3130            },
3131        );
3132
3133        assert!(effects.iter().any(|e| matches!(
3134            e,
3135            Effect::EmitEvent {
3136                event: SessionEvent::ToolCallFailed { .. },
3137                ..
3138            }
3139        )));
3140        assert!(effects.iter().any(|e| matches!(
3141            e,
3142            Effect::EmitEvent {
3143                event: SessionEvent::ToolMessageAdded { .. },
3144                ..
3145            }
3146        )));
3147        assert!(
3148            !effects
3149                .iter()
3150                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3151        );
3152        assert!(state.pending_approval.is_none());
3153        assert!(state.approval_queue.is_empty());
3154
3155        match &state.message_graph.messages[0].data {
3156            MessageData::Tool { result, .. } => match result {
3157                ToolResult::Error(error) => {
3158                    assert!(matches!(error, ToolError::InvalidParams { .. }));
3159                }
3160                _ => panic!("expected invalid params tool error"),
3161            },
3162            _ => panic!("expected tool message"),
3163        }
3164    }
3165
3166    #[test]
3167    fn test_model_response_with_tool_calls_requests_approval() {
3168        let mut state = test_state();
3169        let session_id = state.session_id;
3170        let op_id = OpId::new();
3171        let message_id = MessageId::new();
3172
3173        state.current_operation = Some(OperationState {
3174            op_id,
3175            kind: OperationKind::AgentLoop,
3176            pending_tool_calls: HashSet::new(),
3177        });
3178        state
3179            .operation_models
3180            .insert(op_id, builtin::claude_sonnet_4_5());
3181
3182        let tool_call = steer_tools::ToolCall {
3183            id: "tc_1".to_string(),
3184            name: "bash".to_string(),
3185            parameters: serde_json::json!({"command": "ls"}),
3186        };
3187
3188        let content = vec![
3189            AssistantContent::Text {
3190                text: "Let me list the files.".to_string(),
3191            },
3192            AssistantContent::ToolCall {
3193                tool_call: tool_call.clone(),
3194                thought_signature: None,
3195            },
3196        ];
3197
3198        let effects = reduce(
3199            &mut state,
3200            Action::ModelResponseComplete {
3201                session_id,
3202                op_id,
3203                message_id,
3204                content,
3205                usage: None,
3206                context_window_tokens: None,
3207                configured_max_output_tokens: None,
3208                timestamp: 12345,
3209            },
3210        );
3211
3212        assert!(state.pending_approval.is_some());
3213        assert!(
3214            effects
3215                .iter()
3216                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3217        );
3218        assert!(state.current_operation.is_some());
3219    }
3220
3221    #[test]
3222    fn test_model_response_no_tools_completes_operation() {
3223        let mut state = test_state();
3224        let session_id = state.session_id;
3225        let op_id = OpId::new();
3226        let message_id = MessageId::new();
3227
3228        state.current_operation = Some(OperationState {
3229            op_id,
3230            kind: OperationKind::AgentLoop,
3231            pending_tool_calls: HashSet::new(),
3232        });
3233        state
3234            .operation_models
3235            .insert(op_id, builtin::claude_sonnet_4_5());
3236
3237        let content = vec![AssistantContent::Text {
3238            text: "Hello! How can I help?".to_string(),
3239        }];
3240
3241        let effects = reduce(
3242            &mut state,
3243            Action::ModelResponseComplete {
3244                session_id,
3245                op_id,
3246                message_id,
3247                content,
3248                usage: None,
3249                context_window_tokens: None,
3250                configured_max_output_tokens: None,
3251                timestamp: 12345,
3252            },
3253        );
3254
3255        assert!(state.current_operation.is_none());
3256        assert!(effects.iter().any(|e| matches!(
3257            e,
3258            Effect::EmitEvent {
3259                event: SessionEvent::OperationCompleted { .. },
3260                ..
3261            }
3262        )));
3263        assert!(!effects.iter().any(|e| matches!(
3264            e,
3265            Effect::EmitEvent {
3266                event: SessionEvent::LlmUsageUpdated { .. },
3267                ..
3268            }
3269        )));
3270    }
3271
3272    #[test]
3273    fn test_model_response_with_usage_emits_usage_event_and_updates_state() {
3274        let mut state = test_state();
3275        let session_id = state.session_id;
3276        let op_id = OpId::new();
3277        let message_id = MessageId::new();
3278        let model = builtin::claude_sonnet_4_5();
3279        let usage = TokenUsage::new(10, 20, 30);
3280
3281        state.current_operation = Some(OperationState {
3282            op_id,
3283            kind: OperationKind::AgentLoop,
3284            pending_tool_calls: HashSet::new(),
3285        });
3286        state.operation_models.insert(op_id, model.clone());
3287
3288        let effects = reduce(
3289            &mut state,
3290            Action::ModelResponseComplete {
3291                session_id,
3292                op_id,
3293                message_id,
3294                content: vec![AssistantContent::Text {
3295                    text: "Done".to_string(),
3296                }],
3297                usage: Some(usage),
3298                context_window_tokens: None,
3299                configured_max_output_tokens: None,
3300                timestamp: 12345,
3301            },
3302        );
3303
3304        assert!(effects.iter().any(|e| matches!(
3305            e,
3306            Effect::EmitEvent {
3307                event: SessionEvent::AssistantMessageAdded { .. },
3308                ..
3309            }
3310        )));
3311        assert!(effects.iter().any(|e| matches!(
3312            e,
3313            Effect::EmitEvent {
3314                event: SessionEvent::LlmUsageUpdated {
3315                    op_id: usage_op_id,
3316                    model: usage_model,
3317                    usage: usage_payload,
3318                    context_window,
3319                },
3320                ..
3321            } if *usage_op_id == op_id && usage_model == &model && *usage_payload == usage && context_window.is_none()
3322        )));
3323
3324        let snapshot = state
3325            .llm_usage_by_op
3326            .get(&op_id)
3327            .expect("usage snapshot should be recorded");
3328        assert_eq!(snapshot.model, model);
3329        assert_eq!(snapshot.usage, usage);
3330        assert!(snapshot.context_window.is_none());
3331        assert_eq!(state.llm_usage_totals, usage);
3332    }
3333
3334    #[test]
3335    fn test_model_response_with_usage_and_context_window_emits_utilization() {
3336        let mut state = test_state();
3337        let session_id = state.session_id;
3338        let op_id = OpId::new();
3339        let message_id = MessageId::new();
3340        let model = builtin::claude_sonnet_4_5();
3341        let usage = TokenUsage::new(10, 20, 30);
3342        let context_window_tokens = Some(100u32);
3343
3344        state.current_operation = Some(OperationState {
3345            op_id,
3346            kind: OperationKind::AgentLoop,
3347            pending_tool_calls: HashSet::new(),
3348        });
3349        state.operation_models.insert(op_id, model.clone());
3350
3351        let effects = reduce(
3352            &mut state,
3353            Action::ModelResponseComplete {
3354                session_id,
3355                op_id,
3356                message_id,
3357                content: vec![AssistantContent::Text {
3358                    text: "Done".to_string(),
3359                }],
3360                usage: Some(usage),
3361                context_window_tokens,
3362                configured_max_output_tokens: None,
3363                timestamp: 12345,
3364            },
3365        );
3366
3367        assert!(effects.iter().any(|e| matches!(
3368            e,
3369            Effect::EmitEvent {
3370                event: SessionEvent::LlmUsageUpdated {
3371                    op_id: usage_op_id,
3372                    model: usage_model,
3373                    usage: usage_payload,
3374                    context_window,
3375                },
3376                ..
3377            } if *usage_op_id == op_id
3378                && usage_model == &model
3379                && *usage_payload == usage
3380                && matches!(context_window, Some(ContextWindowUsage {
3381                    max_context_tokens: Some(100),
3382                    remaining_tokens: Some(70),
3383                    utilization_ratio: Some(ratio),
3384                    estimated: false,
3385                }) if (*ratio - 0.3).abs() < 1e-9)
3386        )));
3387
3388        let snapshot = state
3389            .llm_usage_by_op
3390            .get(&op_id)
3391            .expect("usage snapshot should be recorded");
3392        assert_eq!(snapshot.model, model);
3393        assert_eq!(snapshot.usage, usage);
3394        assert_eq!(
3395            snapshot.context_window,
3396            Some(ContextWindowUsage {
3397                max_context_tokens: Some(100),
3398                remaining_tokens: Some(70),
3399                utilization_ratio: Some(0.3),
3400                estimated: false,
3401            })
3402        );
3403        assert_eq!(state.llm_usage_totals, usage);
3404    }
3405
3406    #[test]
3407    fn test_model_response_utilization_uses_reserved_context_window() {
3408        let mut state = test_state();
3409        let session_id = state.session_id;
3410        let op_id = OpId::new();
3411        let message_id = MessageId::new();
3412        let model = builtin::claude_sonnet_4_5();
3413        let usage = TokenUsage::new(10, 20, 30);
3414
3415        state.current_operation = Some(OperationState {
3416            op_id,
3417            kind: OperationKind::AgentLoop,
3418            pending_tool_calls: HashSet::new(),
3419        });
3420        state.operation_models.insert(op_id, model.clone());
3421
3422        let effects = reduce(
3423            &mut state,
3424            Action::ModelResponseComplete {
3425                session_id,
3426                op_id,
3427                message_id,
3428                content: vec![AssistantContent::Text {
3429                    text: "Done".to_string(),
3430                }],
3431                usage: Some(usage),
3432                context_window_tokens: Some(100),
3433                configured_max_output_tokens: Some(40),
3434                timestamp: 12345,
3435            },
3436        );
3437
3438        assert!(effects.iter().any(|e| matches!(
3439            e,
3440            Effect::EmitEvent {
3441                event: SessionEvent::LlmUsageUpdated {
3442                    context_window: Some(ContextWindowUsage {
3443                        max_context_tokens: Some(100),
3444                        remaining_tokens: Some(30),
3445                        utilization_ratio: Some(ratio),
3446                        estimated: false,
3447                    }),
3448                    ..
3449                },
3450                ..
3451            } if (*ratio - 0.5).abs() < 1e-9
3452        )));
3453
3454        let snapshot = state
3455            .llm_usage_by_op
3456            .get(&op_id)
3457            .expect("usage snapshot should be recorded");
3458        assert_eq!(
3459            snapshot.context_window,
3460            Some(ContextWindowUsage {
3461                max_context_tokens: Some(100),
3462                remaining_tokens: Some(30),
3463                utilization_ratio: Some(0.5),
3464                estimated: false,
3465            })
3466        );
3467    }
3468
3469    #[test]
3470    fn test_model_response_with_zero_context_window_marks_estimated() {
3471        let mut state = test_state();
3472        let session_id = state.session_id;
3473        let op_id = OpId::new();
3474        let message_id = MessageId::new();
3475        let model = builtin::claude_sonnet_4_5();
3476        let usage = TokenUsage::new(10, 20, 30);
3477
3478        state.current_operation = Some(OperationState {
3479            op_id,
3480            kind: OperationKind::AgentLoop,
3481            pending_tool_calls: HashSet::new(),
3482        });
3483        state.operation_models.insert(op_id, model);
3484
3485        let effects = reduce(
3486            &mut state,
3487            Action::ModelResponseComplete {
3488                session_id,
3489                op_id,
3490                message_id,
3491                content: vec![AssistantContent::Text {
3492                    text: "Done".to_string(),
3493                }],
3494                usage: Some(usage),
3495                context_window_tokens: Some(0),
3496                configured_max_output_tokens: None,
3497                timestamp: 12345,
3498            },
3499        );
3500
3501        assert!(effects.iter().any(|e| matches!(
3502            e,
3503            Effect::EmitEvent {
3504                event: SessionEvent::LlmUsageUpdated {
3505                    context_window,
3506                    ..
3507                },
3508                ..
3509            } if matches!(context_window, Some(ContextWindowUsage {
3510                max_context_tokens: Some(0),
3511                remaining_tokens: Some(0),
3512                utilization_ratio: None,
3513                estimated: true,
3514            }))
3515        )));
3516    }
3517
3518    #[test]
3519    fn test_apply_usage_event_updates_replay_state_and_totals() {
3520        let mut state = test_state();
3521        let op_a = OpId::new();
3522        let op_b = OpId::new();
3523        let model = builtin::claude_sonnet_4_5();
3524
3525        apply_event_to_state(
3526            &mut state,
3527            &SessionEvent::LlmUsageUpdated {
3528                op_id: op_a,
3529                model: model.clone(),
3530                usage: TokenUsage::new(3, 5, 8),
3531                context_window: None,
3532            },
3533        );
3534
3535        assert_eq!(state.llm_usage_totals, TokenUsage::new(3, 5, 8));
3536
3537        apply_event_to_state(
3538            &mut state,
3539            &SessionEvent::LlmUsageUpdated {
3540                op_id: op_a,
3541                model: model.clone(),
3542                usage: TokenUsage::new(7, 11, 18),
3543                context_window: None,
3544            },
3545        );
3546
3547        assert_eq!(state.llm_usage_totals, TokenUsage::new(7, 11, 18));
3548
3549        apply_event_to_state(
3550            &mut state,
3551            &SessionEvent::LlmUsageUpdated {
3552                op_id: op_b,
3553                model,
3554                usage: TokenUsage::new(2, 4, 6),
3555                context_window: None,
3556            },
3557        );
3558
3559        assert_eq!(state.llm_usage_totals, TokenUsage::new(9, 15, 24));
3560    }
3561
3562    #[test]
3563    fn test_out_of_order_completion_preserves_newer_operation() {
3564        let mut state = test_state();
3565        let session_id = state.session_id;
3566        let model = builtin::claude_sonnet_4_5();
3567
3568        let op_a = OpId::new();
3569        let op_b = OpId::new();
3570
3571        let _ = reduce(
3572            &mut state,
3573            Action::UserInput {
3574                session_id,
3575                content: vec![UserContent::Text {
3576                    text: "first".to_string(),
3577                }],
3578                op_id: op_a,
3579                message_id: MessageId::new(),
3580                model: model.clone(),
3581                timestamp: 1,
3582            },
3583        );
3584
3585        let _ = reduce(
3586            &mut state,
3587            Action::UserInput {
3588                session_id,
3589                content: vec![UserContent::Text {
3590                    text: "second".to_string(),
3591                }],
3592                op_id: op_b,
3593                message_id: MessageId::new(),
3594                model: model.clone(),
3595                timestamp: 2,
3596            },
3597        );
3598
3599        let _ = reduce(
3600            &mut state,
3601            Action::ModelResponseComplete {
3602                session_id,
3603                op_id: op_a,
3604                message_id: MessageId::new(),
3605                content: vec![AssistantContent::Text {
3606                    text: "done A".to_string(),
3607                }],
3608                usage: None,
3609                context_window_tokens: None,
3610                configured_max_output_tokens: None,
3611                timestamp: 3,
3612            },
3613        );
3614
3615        assert!(
3616            state
3617                .current_operation
3618                .as_ref()
3619                .is_some_and(|op| op.op_id == op_b)
3620        );
3621        assert!(state.operation_models.contains_key(&op_b));
3622        assert!(!state.operation_models.contains_key(&op_a));
3623
3624        let effects = reduce(
3625            &mut state,
3626            Action::ModelResponseComplete {
3627                session_id,
3628                op_id: op_b,
3629                message_id: MessageId::new(),
3630                content: vec![AssistantContent::Text {
3631                    text: "done B".to_string(),
3632                }],
3633                usage: None,
3634                context_window_tokens: None,
3635                configured_max_output_tokens: None,
3636                timestamp: 4,
3637            },
3638        );
3639
3640        assert!(effects.iter().any(|e| matches!(
3641            e,
3642            Effect::EmitEvent {
3643                event: SessionEvent::OperationCompleted { op_id },
3644                ..
3645            } if *op_id == op_b
3646        )));
3647        assert!(!effects.iter().any(|e| matches!(
3648            e,
3649            Effect::EmitEvent {
3650                event: SessionEvent::Error { message },
3651                ..
3652            } if message.contains("Missing model for operation")
3653        )));
3654    }
3655
3656    #[test]
3657    fn test_tool_approval_does_not_call_model_before_result() {
3658        let mut state = test_state();
3659        let session_id = state.session_id;
3660        let op_id = OpId::new();
3661
3662        state.current_operation = Some(OperationState {
3663            op_id,
3664            kind: OperationKind::AgentLoop,
3665            pending_tool_calls: HashSet::new(),
3666        });
3667        state
3668            .operation_models
3669            .insert(op_id, builtin::claude_sonnet_4_5());
3670
3671        let tool_call = steer_tools::ToolCall {
3672            id: "tc_1".to_string(),
3673            name: "bash".to_string(),
3674            parameters: serde_json::json!({"command": "ls"}),
3675        };
3676        let request_id = RequestId::new();
3677        state.pending_approval = Some(PendingApproval {
3678            request_id,
3679            tool_call: tool_call.clone(),
3680        });
3681
3682        let effects = reduce(
3683            &mut state,
3684            Action::ToolApprovalDecided {
3685                session_id,
3686                request_id,
3687                decision: ApprovalDecision::Approved,
3688                remember: None,
3689            },
3690        );
3691
3692        assert!(
3693            effects
3694                .iter()
3695                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3696        );
3697        assert!(
3698            !effects
3699                .iter()
3700                .any(|e| matches!(e, Effect::CallModel { .. }))
3701        );
3702        assert!(state.current_operation.as_ref().is_some_and(|op| {
3703            op.pending_tool_calls
3704                .contains(&ToolCallId::from_string("tc_1"))
3705        }));
3706    }
3707
3708    #[test]
3709    fn test_mcp_tool_visibility_and_disconnect_removal() {
3710        let mut state = test_state();
3711        let session_id = state.session_id;
3712
3713        let mut allowed = HashSet::new();
3714        allowed.insert("mcp__alpha__allowed".to_string());
3715
3716        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3717        config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3718        state.session_config = Some(config);
3719
3720        state.tools.push(test_schema("bash"));
3721
3722        let _ = reduce(
3723            &mut state,
3724            Action::McpServerStateChanged {
3725                session_id,
3726                server_name: "alpha".to_string(),
3727                state: McpServerState::Connected {
3728                    tools: vec![
3729                        test_schema("mcp__alpha__allowed"),
3730                        test_schema("mcp__alpha__blocked"),
3731                    ],
3732                },
3733            },
3734        );
3735
3736        assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3737        assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3738
3739        let _ = reduce(
3740            &mut state,
3741            Action::McpServerStateChanged {
3742                session_id,
3743                server_name: "alpha".to_string(),
3744                state: McpServerState::Disconnected { error: None },
3745            },
3746        );
3747
3748        assert!(
3749            !state
3750                .tools
3751                .iter()
3752                .any(|t| t.name.starts_with("mcp__alpha__"))
3753        );
3754        assert!(state.tools.iter().any(|t| t.name == "bash"));
3755    }
3756
3757    #[test]
3758    fn test_tool_result_continues_agent_loop() {
3759        let mut state = test_state();
3760        let session_id = state.session_id;
3761        let op_id = OpId::new();
3762        let tool_call_id = ToolCallId::from_string("tc_1");
3763
3764        state.current_operation = Some(OperationState {
3765            op_id,
3766            kind: OperationKind::AgentLoop,
3767            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3768        });
3769        state
3770            .operation_models
3771            .insert(op_id, builtin::claude_sonnet_4_5());
3772
3773        let effects = reduce(
3774            &mut state,
3775            Action::ToolResult {
3776                session_id,
3777                tool_call_id,
3778                tool_name: "bash".to_string(),
3779                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3780                    tool_name: "bash".to_string(),
3781                    payload: "file1.txt\nfile2.txt".to_string(),
3782                })),
3783            },
3784        );
3785
3786        assert!(
3787            effects
3788                .iter()
3789                .any(|e| matches!(e, Effect::CallModel { .. }))
3790        );
3791    }
3792
3793    #[test]
3794    fn test_tool_result_waits_for_pending_tools() {
3795        let mut state = test_state();
3796        let session_id = state.session_id;
3797        let op_id = OpId::new();
3798        let tool_call_id_1 = ToolCallId::from_string("tc_1");
3799        let tool_call_id_2 = ToolCallId::from_string("tc_2");
3800
3801        state.current_operation = Some(OperationState {
3802            op_id,
3803            kind: OperationKind::AgentLoop,
3804            pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3805                .into_iter()
3806                .collect(),
3807        });
3808        state
3809            .operation_models
3810            .insert(op_id, builtin::claude_sonnet_4_5());
3811
3812        let effects = reduce(
3813            &mut state,
3814            Action::ToolResult {
3815                session_id,
3816                tool_call_id: tool_call_id_1,
3817                tool_name: "bash".to_string(),
3818                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3819                    tool_name: "bash".to_string(),
3820                    payload: "done".to_string(),
3821                })),
3822            },
3823        );
3824
3825        assert!(
3826            !effects
3827                .iter()
3828                .any(|e| matches!(e, Effect::CallModel { .. }))
3829        );
3830
3831        let effects = reduce(
3832            &mut state,
3833            Action::ToolResult {
3834                session_id,
3835                tool_call_id: tool_call_id_2,
3836                tool_name: "bash".to_string(),
3837                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3838                    tool_name: "bash".to_string(),
3839                    payload: "done".to_string(),
3840                })),
3841            },
3842        );
3843
3844        assert!(
3845            effects
3846                .iter()
3847                .any(|e| matches!(e, Effect::CallModel { .. }))
3848        );
3849    }
3850
3851    // ── Auto-compaction reducer tests ──────────────────────────────────
3852
3853    fn setup_auto_compact_state(
3854        enabled: bool,
3855        threshold_percent: u32,
3856        message_count: usize,
3857    ) -> AppState {
3858        use crate::session::state::AutoCompactionConfig;
3859
3860        let mut state = test_state();
3861
3862        // Apply session config with auto-compaction settings.
3863        let mut config = base_session_config();
3864        config.auto_compaction = AutoCompactionConfig {
3865            enabled,
3866            threshold_percent,
3867        };
3868        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
3869
3870        // Add the requested number of messages, linked via parent_message_id so
3871        // that `get_thread_messages` returns them all.
3872        let mut prev_id: Option<String> = None;
3873        for i in 0..message_count {
3874            let id = format!("msg_{}", i);
3875            state.message_graph.add_message(Message {
3876                data: MessageData::User {
3877                    content: vec![UserContent::Text {
3878                        text: format!("message {}", i),
3879                    }],
3880                },
3881                timestamp: i as u64,
3882                id: id.clone(),
3883                parent_message_id: prev_id.clone(),
3884            });
3885            prev_id = Some(id);
3886        }
3887
3888        state
3889    }
3890
3891    #[test]
3892    fn test_maybe_auto_compact_triggers_when_all_guards_pass() {
3893        let mut state = setup_auto_compact_state(true, 90, 4);
3894        let session_id = state.session_id;
3895        let model = builtin::claude_sonnet_4_5();
3896
3897        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3898        let effects = maybe_auto_compact(
3899            &mut state,
3900            session_id,
3901            usage,
3902            Some(100_000),
3903            Some(10_000),
3904            &model,
3905        );
3906
3907        assert!(!effects.is_empty(), "expected non-empty effects");
3908        assert!(
3909            effects
3910                .iter()
3911                .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3912            "expected Effect::RequestCompaction"
3913        );
3914        assert!(
3915            effects.iter().any(|e| matches!(
3916                e,
3917                Effect::EmitEvent {
3918                    event: SessionEvent::OperationStarted { .. },
3919                    ..
3920                }
3921            )),
3922            "expected OperationStarted event"
3923        );
3924    }
3925
3926    #[test]
3927    fn test_maybe_auto_compact_disabled_config() {
3928        let mut state = setup_auto_compact_state(false, 90, 4);
3929        let session_id = state.session_id;
3930        let model = builtin::claude_sonnet_4_5();
3931
3932        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3933        let effects = maybe_auto_compact(
3934            &mut state,
3935            session_id,
3936            usage,
3937            Some(100_000),
3938            Some(10_000),
3939            &model,
3940        );
3941
3942        assert!(
3943            effects.is_empty(),
3944            "expected empty effects when auto-compaction is disabled"
3945        );
3946    }
3947
3948    #[test]
3949    fn test_maybe_auto_compact_below_threshold() {
3950        let mut state = setup_auto_compact_state(true, 90, 4);
3951        let session_id = state.session_id;
3952        let model = builtin::claude_sonnet_4_5();
3953
3954        // 50% utilization, well below 90% threshold.
3955        let usage = Some(TokenUsage::new(40_000, 10_000, 50_000));
3956        let effects = maybe_auto_compact(
3957            &mut state,
3958            session_id,
3959            usage,
3960            Some(100_000),
3961            Some(10_000),
3962            &model,
3963        );
3964
3965        assert!(
3966            effects.is_empty(),
3967            "expected empty effects when utilization is below threshold"
3968        );
3969    }
3970
3971    #[test]
3972    fn test_maybe_auto_compact_insufficient_messages() {
3973        // Only 2 messages — below MIN_MESSAGES_FOR_COMPACT (3).
3974        let mut state = setup_auto_compact_state(true, 90, 2);
3975        let session_id = state.session_id;
3976        let model = builtin::claude_sonnet_4_5();
3977
3978        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3979        let effects = maybe_auto_compact(
3980            &mut state,
3981            session_id,
3982            usage,
3983            Some(100_000),
3984            Some(10_000),
3985            &model,
3986        );
3987
3988        assert!(
3989            effects.is_empty(),
3990            "expected empty effects with insufficient messages"
3991        );
3992    }
3993
3994    #[test]
3995    fn test_maybe_auto_compact_queued_work_blocks() {
3996        use crate::app::domain::state::QueuedUserMessage;
3997
3998        let mut state = setup_auto_compact_state(true, 90, 4);
3999        let session_id = state.session_id;
4000        let model = builtin::claude_sonnet_4_5();
4001
4002        // Add queued work so guard 5 fires.
4003        state.queue_user_message(QueuedUserMessage {
4004            op_id: OpId::new(),
4005            message_id: MessageId::new(),
4006            content: vec![UserContent::Text {
4007                text: "queued msg".to_string(),
4008            }],
4009            model: builtin::claude_sonnet_4_5(),
4010            queued_at: 0,
4011        });
4012
4013        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
4014        let effects = maybe_auto_compact(
4015            &mut state,
4016            session_id,
4017            usage,
4018            Some(100_000),
4019            Some(10_000),
4020            &model,
4021        );
4022
4023        assert!(
4024            effects.is_empty(),
4025            "expected empty effects when there is queued work"
4026        );
4027    }
4028
4029    #[test]
4030    fn test_handle_compaction_failed_emits_compact_result() {
4031        use crate::app::domain::event::{CompactResult, CompactTrigger};
4032
4033        let mut state = test_state();
4034        let session_id = state.session_id;
4035        let op_id = OpId::new();
4036
4037        // Set up an active Compact { trigger: Auto } operation.
4038        state.start_operation(
4039            op_id,
4040            OperationKind::Compact {
4041                trigger: CompactTrigger::Auto,
4042            },
4043        );
4044        state
4045            .operation_models
4046            .insert(op_id, builtin::claude_sonnet_4_5());
4047
4048        let effects = reduce(
4049            &mut state,
4050            Action::CompactionFailed {
4051                session_id,
4052                op_id,
4053                error: "test error".into(),
4054            },
4055        );
4056
4057        // Should contain CompactResult::Failed with the right trigger.
4058        let has_compact_result = effects.iter().any(|e| {
4059            matches!(
4060                e,
4061                Effect::EmitEvent {
4062                    event: SessionEvent::CompactResult {
4063                        result: CompactResult::Failed(msg),
4064                        trigger: CompactTrigger::Auto,
4065                    },
4066                    ..
4067                } if msg == "test error"
4068            )
4069        });
4070        assert!(
4071            has_compact_result,
4072            "expected CompactResult::Failed event with Auto trigger"
4073        );
4074
4075        // Should NOT contain a SessionEvent::Error.
4076        let has_error_event = effects.iter().any(|e| {
4077            matches!(
4078                e,
4079                Effect::EmitEvent {
4080                    event: SessionEvent::Error { .. },
4081                    ..
4082                }
4083            )
4084        });
4085        assert!(
4086            !has_error_event,
4087            "should not emit SessionEvent::Error for compaction failure"
4088        );
4089    }
4090
4091    /// Helper: extract the `messages` field from the first `Effect::CallModel` in a list of effects.
4092    fn extract_callmodel_messages(effects: &[Effect]) -> Option<&Vec<Message>> {
4093        effects.iter().find_map(|e| match e {
4094            Effect::CallModel { messages, .. } => Some(messages),
4095            _ => None,
4096        })
4097    }
4098
4099    /// Helper: check whether any message in the list contains the given text substring.
4100    fn messages_contain_text(messages: &[Message], needle: &str) -> bool {
4101        messages.iter().any(|m| match &m.data {
4102            MessageData::User { content } => content.iter().any(|c| match c {
4103                UserContent::Text { text } => text.contains(needle),
4104                _ => false,
4105            }),
4106            MessageData::Assistant { content } => content.iter().any(|c| match c {
4107                AssistantContent::Text { text } => text.contains(needle),
4108                _ => false,
4109            }),
4110            _ => false,
4111        })
4112    }
4113
4114    #[test]
4115    fn test_auto_compaction_completion_triggers_continuation() {
4116        use crate::app::domain::event::{CompactResult, CompactTrigger};
4117        use crate::app::domain::types::CompactionId;
4118
4119        let mut state = setup_auto_compact_state(true, 90, 4);
4120        let session_id = state.session_id;
4121        let model = builtin::claude_sonnet_4_5();
4122
4123        let auto_effects = maybe_auto_compact(
4124            &mut state,
4125            session_id,
4126            Some(TokenUsage::new(80_000, 15_000, 95_000)),
4127            Some(100_000),
4128            Some(10_000),
4129            &model,
4130        );
4131
4132        let op_id = auto_effects
4133            .iter()
4134            .find_map(|effect| match effect {
4135                Effect::RequestCompaction { op_id, .. } => Some(*op_id),
4136                _ => None,
4137            })
4138            .expect("expected auto compaction request");
4139
4140        let summary_message_id = MessageId::new();
4141        let previous_active_message_id = state
4142            .message_graph
4143            .active_message_id
4144            .clone()
4145            .map(MessageId::from_string);
4146        let head_before = previous_active_message_id
4147            .clone()
4148            .expect("expected active message before compaction");
4149
4150        let completion_effects = reduce(
4151            &mut state,
4152            Action::CompactionComplete {
4153                session_id,
4154                op_id,
4155                compaction_id: CompactionId::new(),
4156                summary_message_id: summary_message_id.clone(),
4157                summary: "Auto summary".to_string(),
4158                compacted_head_message_id: head_before,
4159                previous_active_message_id,
4160                model: model.id.clone(),
4161                timestamp: 42,
4162            },
4163        );
4164
4165        assert!(
4166            completion_effects.iter().any(|effect| matches!(
4167                effect,
4168                Effect::EmitEvent {
4169                    event: SessionEvent::CompactResult {
4170                        result: CompactResult::Success(summary),
4171                        trigger: CompactTrigger::Auto,
4172                    },
4173                    ..
4174                } if summary == "Auto summary"
4175            )),
4176            "expected auto compaction result event"
4177        );
4178
4179        assert!(
4180            completion_effects
4181                .iter()
4182                .any(|effect| matches!(effect, Effect::CallModel { .. })),
4183            "expected auto compaction completion to continue with a model call"
4184        );
4185
4186        let messages = extract_callmodel_messages(&completion_effects)
4187            .expect("expected continuation CallModel after auto compaction");
4188
4189        assert!(
4190            matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Auto summary"))),
4191            "first continuation message should be compaction summary"
4192        );
4193        assert!(
4194            matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == COMPACTION_CONTINUE_PROMPT))),
4195            "second continuation message should be continuation prompt"
4196        );
4197    }
4198
4199    #[test]
4200    fn test_compaction_full_cycle_callmodel_filters_messages() {
4201        use crate::app::domain::types::CompactionId;
4202
4203        let mut state = test_state();
4204        let session_id = state.session_id;
4205        let model = builtin::claude_sonnet_4_5();
4206
4207        // Build 2 user/assistant turns (4 messages, exceeds MIN_MESSAGES_FOR_COMPACT=3).
4208        // Turn 1:
4209        let op_id_1 = OpId::new();
4210        let _ = reduce(
4211            &mut state,
4212            Action::UserInput {
4213                session_id,
4214                content: vec![UserContent::Text {
4215                    text: "hello".to_string(),
4216                }],
4217                op_id: op_id_1,
4218                message_id: MessageId::new(),
4219                model: model.clone(),
4220                timestamp: 1,
4221            },
4222        );
4223        let _ = reduce(
4224            &mut state,
4225            Action::ModelResponseComplete {
4226                session_id,
4227                op_id: op_id_1,
4228                message_id: MessageId::new(),
4229                content: vec![AssistantContent::Text {
4230                    text: "hi there".to_string(),
4231                }],
4232                usage: None,
4233                context_window_tokens: None,
4234                configured_max_output_tokens: None,
4235                timestamp: 2,
4236            },
4237        );
4238
4239        // Turn 2:
4240        let op_id_1b = OpId::new();
4241        let _ = reduce(
4242            &mut state,
4243            Action::UserInput {
4244                session_id,
4245                content: vec![UserContent::Text {
4246                    text: "how are you".to_string(),
4247                }],
4248                op_id: op_id_1b,
4249                message_id: MessageId::new(),
4250                model: model.clone(),
4251                timestamp: 3,
4252            },
4253        );
4254        let assistant_msg_id = MessageId::new();
4255        let _ = reduce(
4256            &mut state,
4257            Action::ModelResponseComplete {
4258                session_id,
4259                op_id: op_id_1b,
4260                message_id: assistant_msg_id.clone(),
4261                content: vec![AssistantContent::Text {
4262                    text: "doing well".to_string(),
4263                }],
4264                usage: None,
4265                context_window_tokens: None,
4266                configured_max_output_tokens: None,
4267                timestamp: 4,
4268            },
4269        );
4270        assert!(
4271            state.current_operation.is_none(),
4272            "operation should be complete after text-only response"
4273        );
4274        assert!(state.message_graph.messages.len() >= 3);
4275
4276        // RequestCompaction — starts Compact operation.
4277        let op_id_2 = OpId::new();
4278        let compact_effects = reduce(
4279            &mut state,
4280            Action::RequestCompaction {
4281                session_id,
4282                op_id: op_id_2,
4283                model: model.clone(),
4284            },
4285        );
4286        assert!(
4287            compact_effects
4288                .iter()
4289                .any(|e| matches!(e, Effect::RequestCompaction { .. })),
4290            "expected Effect::RequestCompaction"
4291        );
4292
4293        // Step 4: CompactionComplete — summary replaces old messages as boundary.
4294        let compaction_id = CompactionId::new();
4295        let summary_message_id = MessageId::new();
4296        let active_before = state.message_graph.active_message_id.clone();
4297        let completion_effects = reduce(
4298            &mut state,
4299            Action::CompactionComplete {
4300                session_id,
4301                op_id: op_id_2,
4302                compaction_id,
4303                summary_message_id: summary_message_id.clone(),
4304                summary: "Summary of conversation.".to_string(),
4305                compacted_head_message_id: assistant_msg_id,
4306                previous_active_message_id: active_before.map(MessageId::from_string),
4307                model: "claude-sonnet-4-5-20250929".to_string(),
4308                timestamp: 3,
4309            },
4310        );
4311        assert!(
4312            state.compaction_summary_ids.contains(&summary_message_id.0),
4313            "compaction_summary_ids should contain the summary id"
4314        );
4315        assert!(
4316            !completion_effects
4317                .iter()
4318                .any(|e| matches!(e, Effect::CallModel { .. })),
4319            "manual compaction should not trigger continuation"
4320        );
4321
4322        let compact_result_trigger = completion_effects.iter().find_map(|effect| match effect {
4323            Effect::EmitEvent {
4324                event: SessionEvent::CompactResult { trigger, .. },
4325                ..
4326            } => Some(*trigger),
4327            _ => None,
4328        });
4329        assert_eq!(compact_result_trigger, Some(CompactTrigger::Manual));
4330
4331        // Step 5: New UserInput("new question") — triggers CallModel.
4332        let op_id_3 = OpId::new();
4333        let effects = reduce(
4334            &mut state,
4335            Action::UserInput {
4336                session_id,
4337                content: vec![UserContent::Text {
4338                    text: "new question".to_string(),
4339                }],
4340                op_id: op_id_3,
4341                message_id: MessageId::new(),
4342                model: model.clone(),
4343                timestamp: 10,
4344            },
4345        );
4346
4347        // Assertions on CallModel messages.
4348        let messages = extract_callmodel_messages(&effects)
4349            .expect("expected Effect::CallModel from UserInput after compaction");
4350
4351        assert_eq!(
4352            messages.len(),
4353            2,
4354            "CallModel should contain exactly 2 messages (summary + new user msg), got: {:?}",
4355            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4356        );
4357
4358        // First message is the summary (assistant).
4359        assert!(
4360            matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Summary of conversation."))),
4361            "messages[0] should be the compaction summary"
4362        );
4363
4364        // Second message is the new user message.
4365        assert!(
4366            matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == "new question"))),
4367            "messages[1] should be the new user message"
4368        );
4369
4370        // Pre-compaction messages must NOT appear.
4371        assert!(
4372            !messages_contain_text(messages, "hello"),
4373            "CallModel messages must not contain pre-compaction user text 'hello'"
4374        );
4375        assert!(
4376            !messages_contain_text(messages, "hi there"),
4377            "CallModel messages must not contain pre-compaction assistant text 'hi there'"
4378        );
4379        assert!(
4380            !messages_contain_text(messages, "how are you"),
4381            "CallModel messages must not contain pre-compaction user text 'how are you'"
4382        );
4383        assert!(
4384            !messages_contain_text(messages, "doing well"),
4385            "CallModel messages must not contain pre-compaction assistant text 'doing well'"
4386        );
4387    }
4388
4389    #[test]
4390    fn test_compaction_queued_message_callmodel_filters_messages() {
4391        use crate::app::domain::types::CompactionId;
4392
4393        let mut state = test_state();
4394        let session_id = state.session_id;
4395        let model = builtin::claude_sonnet_4_5();
4396
4397        // Build initial conversation with 2 turns (4 msgs, exceeds MIN_MESSAGES_FOR_COMPACT=3).
4398        // Turn 1:
4399        let op_id_1 = OpId::new();
4400        let _ = reduce(
4401            &mut state,
4402            Action::UserInput {
4403                session_id,
4404                content: vec![UserContent::Text {
4405                    text: "msg A".to_string(),
4406                }],
4407                op_id: op_id_1,
4408                message_id: MessageId::new(),
4409                model: model.clone(),
4410                timestamp: 1,
4411            },
4412        );
4413        let _ = reduce(
4414            &mut state,
4415            Action::ModelResponseComplete {
4416                session_id,
4417                op_id: op_id_1,
4418                message_id: MessageId::new(),
4419                content: vec![AssistantContent::Text {
4420                    text: "response A".to_string(),
4421                }],
4422                usage: None,
4423                context_window_tokens: None,
4424                configured_max_output_tokens: None,
4425                timestamp: 2,
4426            },
4427        );
4428
4429        // Turn 2:
4430        let op_id_1b = OpId::new();
4431        let _ = reduce(
4432            &mut state,
4433            Action::UserInput {
4434                session_id,
4435                content: vec![UserContent::Text {
4436                    text: "msg A2".to_string(),
4437                }],
4438                op_id: op_id_1b,
4439                message_id: MessageId::new(),
4440                model: model.clone(),
4441                timestamp: 3,
4442            },
4443        );
4444        let assistant_msg_id = MessageId::new();
4445        let _ = reduce(
4446            &mut state,
4447            Action::ModelResponseComplete {
4448                session_id,
4449                op_id: op_id_1b,
4450                message_id: assistant_msg_id.clone(),
4451                content: vec![AssistantContent::Text {
4452                    text: "response A2".to_string(),
4453                }],
4454                usage: None,
4455                context_window_tokens: None,
4456                configured_max_output_tokens: None,
4457                timestamp: 4,
4458            },
4459        );
4460        assert!(state.current_operation.is_none());
4461
4462        // Start compaction — Compact operation is now active.
4463        let op_id_2 = OpId::new();
4464        let _ = reduce(
4465            &mut state,
4466            Action::RequestCompaction {
4467                session_id,
4468                op_id: op_id_2,
4469                model: model.clone(),
4470            },
4471        );
4472        assert!(
4473            state.has_active_operation(),
4474            "Compact operation should be active"
4475        );
4476
4477        // UserInput("msg B") while compaction is in flight — should be queued.
4478        let op_id_3 = OpId::new();
4479        let queued_effects = reduce(
4480            &mut state,
4481            Action::UserInput {
4482                session_id,
4483                content: vec![UserContent::Text {
4484                    text: "msg B".to_string(),
4485                }],
4486                op_id: op_id_3,
4487                message_id: MessageId::new(),
4488                model: model.clone(),
4489                timestamp: 5,
4490            },
4491        );
4492        assert!(
4493            queued_effects.iter().any(|e| matches!(
4494                e,
4495                Effect::EmitEvent {
4496                    event: SessionEvent::QueueUpdated { .. },
4497                    ..
4498                }
4499            )),
4500            "queued user message should emit QueueUpdated"
4501        );
4502
4503        // CompactionComplete — completes compaction AND dequeues "msg B".
4504        let compaction_id = CompactionId::new();
4505        let summary_message_id = MessageId::new();
4506        let active_before = state.message_graph.active_message_id.clone();
4507        let completion_effects = reduce(
4508            &mut state,
4509            Action::CompactionComplete {
4510                session_id,
4511                op_id: op_id_2,
4512                compaction_id,
4513                summary_message_id: summary_message_id.clone(),
4514                summary: "Summary of A.".to_string(),
4515                compacted_head_message_id: assistant_msg_id,
4516                previous_active_message_id: active_before.map(MessageId::from_string),
4517                model: "claude-sonnet-4-5-20250929".to_string(),
4518                timestamp: 6,
4519            },
4520        );
4521
4522        // The dequeued "msg B" should produce a CallModel effect.
4523        let messages = extract_callmodel_messages(&completion_effects)
4524            .expect("CompactionComplete should dequeue 'msg B' and produce CallModel");
4525
4526        assert_eq!(
4527            messages.len(),
4528            2,
4529            "CallModel should contain exactly 2 messages (summary + 'msg B'), got: {:?}",
4530            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4531        );
4532
4533        // Summary message.
4534        assert!(
4535            messages_contain_text(messages, "Summary of A."),
4536            "CallModel messages should contain the compaction summary"
4537        );
4538
4539        // Queued user message.
4540        assert!(
4541            messages_contain_text(messages, "msg B"),
4542            "CallModel messages should contain the dequeued 'msg B'"
4543        );
4544
4545        // Pre-compaction messages must NOT appear.
4546        assert!(
4547            !messages_contain_text(messages, "response A"),
4548            "CallModel messages must not contain pre-compaction 'response A'"
4549        );
4550        assert!(
4551            !messages_contain_text(messages, "response A2"),
4552            "CallModel messages must not contain pre-compaction 'response A2'"
4553        );
4554    }
4555
4556    #[test]
4557    fn test_compaction_multi_round_conversation_then_compact() {
4558        use crate::app::domain::types::CompactionId;
4559
4560        let mut state = test_state();
4561        let session_id = state.session_id;
4562        let model = builtin::claude_sonnet_4_5();
4563
4564        // Build 3 user/assistant turns (6 messages) via reducer actions.
4565        let pre_compaction_texts = [
4566            ("alpha user", "alpha assistant"),
4567            ("beta user", "beta assistant"),
4568            ("gamma user", "gamma assistant"),
4569        ];
4570        let mut last_assistant_msg_id = MessageId::new();
4571        for (i, (user_text, assistant_text)) in pre_compaction_texts.iter().enumerate() {
4572            let op = OpId::new();
4573            let _ = reduce(
4574                &mut state,
4575                Action::UserInput {
4576                    session_id,
4577                    content: vec![UserContent::Text {
4578                        text: (*user_text).to_string(),
4579                    }],
4580                    op_id: op,
4581                    message_id: MessageId::new(),
4582                    model: model.clone(),
4583                    timestamp: (i * 2 + 1) as u64,
4584                },
4585            );
4586            last_assistant_msg_id = MessageId::new();
4587            let _ = reduce(
4588                &mut state,
4589                Action::ModelResponseComplete {
4590                    session_id,
4591                    op_id: op,
4592                    message_id: last_assistant_msg_id.clone(),
4593                    content: vec![AssistantContent::Text {
4594                        text: assistant_text.to_string(),
4595                    }],
4596                    usage: None,
4597                    context_window_tokens: None,
4598                    configured_max_output_tokens: None,
4599                    timestamp: (i * 2 + 2) as u64,
4600                },
4601            );
4602        }
4603        assert_eq!(state.message_graph.messages.len(), 6);
4604        assert!(state.current_operation.is_none());
4605
4606        // Compact the conversation.
4607        let op_compact = OpId::new();
4608        let _ = reduce(
4609            &mut state,
4610            Action::RequestCompaction {
4611                session_id,
4612                op_id: op_compact,
4613                model: model.clone(),
4614            },
4615        );
4616
4617        let compaction_id = CompactionId::new();
4618        let summary_message_id = MessageId::new();
4619        let active_before = state.message_graph.active_message_id.clone();
4620        let completion_effects = reduce(
4621            &mut state,
4622            Action::CompactionComplete {
4623                session_id,
4624                op_id: op_compact,
4625                compaction_id,
4626                summary_message_id: summary_message_id.clone(),
4627                summary: "Summary of greek turns.".to_string(),
4628                compacted_head_message_id: last_assistant_msg_id,
4629                previous_active_message_id: active_before.map(MessageId::from_string),
4630                model: "claude-sonnet-4-5-20250929".to_string(),
4631                timestamp: 100,
4632            },
4633        );
4634
4635        // Manual compaction should not start continuation automatically.
4636        assert!(
4637            !completion_effects
4638                .iter()
4639                .any(|effect| matches!(effect, Effect::CallModel { .. })),
4640            "manual compaction should not trigger continuation"
4641        );
4642
4643        let compact_result_trigger = completion_effects.iter().find_map(|effect| match effect {
4644            Effect::EmitEvent {
4645                event: SessionEvent::CompactResult { trigger, .. },
4646                ..
4647            } => Some(*trigger),
4648            _ => None,
4649        });
4650        assert_eq!(compact_result_trigger, Some(CompactTrigger::Manual));
4651
4652        // Add 2 post-compaction turns.
4653        let post_compaction_texts = [
4654            ("delta user", "delta assistant"),
4655            ("epsilon user", "epsilon assistant"),
4656        ];
4657        for (i, (user_text, assistant_text)) in post_compaction_texts.iter().enumerate() {
4658            let op = OpId::new();
4659            let _ = reduce(
4660                &mut state,
4661                Action::UserInput {
4662                    session_id,
4663                    content: vec![UserContent::Text {
4664                        text: (*user_text).to_string(),
4665                    }],
4666                    op_id: op,
4667                    message_id: MessageId::new(),
4668                    model: model.clone(),
4669                    timestamp: (101 + i * 2) as u64,
4670                },
4671            );
4672            let _ = reduce(
4673                &mut state,
4674                Action::ModelResponseComplete {
4675                    session_id,
4676                    op_id: op,
4677                    message_id: MessageId::new(),
4678                    content: vec![AssistantContent::Text {
4679                        text: assistant_text.to_string(),
4680                    }],
4681                    usage: None,
4682                    context_window_tokens: None,
4683                    configured_max_output_tokens: None,
4684                    timestamp: (102 + i * 2) as u64,
4685                },
4686            );
4687        }
4688
4689        // Final UserInput to trigger CallModel.
4690        let final_op = OpId::new();
4691        let effects = reduce(
4692            &mut state,
4693            Action::UserInput {
4694                session_id,
4695                content: vec![UserContent::Text {
4696                    text: "final question".to_string(),
4697                }],
4698                op_id: final_op,
4699                message_id: MessageId::new(),
4700                model: model.clone(),
4701                timestamp: 200,
4702            },
4703        );
4704
4705        let messages =
4706            extract_callmodel_messages(&effects).expect("expected CallModel from final UserInput");
4707
4708        // Expected: summary + 4 post-compaction messages + final user = 6.
4709        assert_eq!(
4710            messages.len(),
4711            6,
4712            "CallModel should have 6 messages (summary + 4 post-compaction + final), got: {:?}",
4713            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4714        );
4715
4716        // First message is the compaction summary.
4717        assert!(
4718            messages_contain_text(&messages[..1], "Summary of greek turns."),
4719            "first message should be the compaction summary"
4720        );
4721
4722        // Final message is the new user question.
4723        assert!(
4724            messages_contain_text(&messages[5..], "final question"),
4725            "last message should be the final user question"
4726        );
4727
4728        // Post-compaction messages should be present.
4729        assert!(
4730            messages_contain_text(messages, "delta user"),
4731            "should contain post-compaction user text"
4732        );
4733        assert!(
4734            messages_contain_text(messages, "epsilon assistant"),
4735            "should contain post-compaction assistant text"
4736        );
4737
4738        // Pre-compaction messages must NOT appear.
4739        for (user_text, assistant_text) in &pre_compaction_texts {
4740            assert!(
4741                !messages_contain_text(messages, user_text),
4742                "CallModel messages must not contain pre-compaction text '{user_text}'"
4743            );
4744            assert!(
4745                !messages_contain_text(messages, assistant_text),
4746                "CallModel messages must not contain pre-compaction text '{assistant_text}'"
4747            );
4748        }
4749    }
4750}