Skip to main content

steer_core/app/domain/
reduce.rs

1use crate::agents::default_agent_spec_id;
2use crate::api::provider::TokenUsage;
3use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
4
5use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
6
7use crate::app::domain::effect::{Effect, McpServerConfig};
8use crate::app::domain::event::{
9    CancellationInfo, ContextWindowUsage, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
10};
11use crate::app::domain::state::{
12    AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
13};
14use crate::primary_agents::{
15    default_primary_agent_id, primary_agent_spec, resolve_effective_config,
16};
17use crate::session::state::{BackendConfig, ToolDecision};
18
19use crate::app::domain::event::CompactTrigger;
20use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
21use serde_json::Value;
22use steer_tools::ToolError;
23use steer_tools::result::ToolResult;
24use steer_tools::tools::BASH_TOOL_NAME;
25use thiserror::Error;
26
27const MIN_MESSAGES_FOR_COMPACT: usize = 3;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum InvalidActionKind {
31    OperationInFlight,
32    MissingSessionConfig,
33    UnknownPrimaryAgent,
34    QueueEmpty,
35}
36
37#[derive(Debug, Error)]
38pub enum ReduceError {
39    #[error("{message}")]
40    InvalidAction {
41        message: String,
42        kind: InvalidActionKind,
43    },
44    #[error("Invariant violated: {message}")]
45    Invariant { message: String },
46}
47
48fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
49    ReduceError::InvalidAction {
50        message: message.into(),
51        kind,
52    }
53}
54
55fn derive_context_window_usage(
56    total_tokens: u32,
57    context_window_tokens: Option<u32>,
58) -> Option<ContextWindowUsage> {
59    context_window_tokens.map(|max_context_tokens| {
60        let remaining_tokens = max_context_tokens.saturating_sub(total_tokens);
61        let (utilization_ratio, estimated) = if max_context_tokens > 0 {
62            let ratio = (f64::from(total_tokens) / f64::from(max_context_tokens)).clamp(0.0, 1.0);
63            (Some(ratio), false)
64        } else {
65            (None, true)
66        };
67
68        ContextWindowUsage {
69            max_context_tokens: Some(max_context_tokens),
70            remaining_tokens: Some(remaining_tokens),
71            utilization_ratio,
72            estimated,
73        }
74    })
75}
76
77pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
78    match action {
79        Action::UserInput {
80            session_id,
81            content,
82            op_id,
83            message_id,
84            model,
85            timestamp,
86        } => Ok(handle_user_input(
87            state, session_id, content, op_id, message_id, model, timestamp,
88        )),
89
90        Action::UserEditedMessage {
91            session_id,
92            message_id,
93            new_content,
94            op_id,
95            new_message_id,
96            model,
97            timestamp,
98        } => handle_user_edited_message(
99            state,
100            session_id,
101            UserEditedMessageParams {
102                original_message_id: message_id,
103                new_content,
104                op_id,
105                new_message_id,
106                model,
107                timestamp,
108            },
109        ),
110
111        Action::ToolApprovalRequested {
112            session_id,
113            request_id,
114            tool_call,
115        } => Ok(handle_tool_approval_requested(
116            state, session_id, request_id, tool_call,
117        )),
118
119        Action::ToolApprovalDecided {
120            session_id,
121            request_id,
122            decision,
123            remember,
124        } => Ok(handle_tool_approval_decided(
125            state, session_id, request_id, decision, remember,
126        )),
127
128        Action::ToolExecutionStarted {
129            session_id,
130            tool_call_id,
131            tool_name,
132            tool_parameters,
133        } => Ok(handle_tool_execution_started(
134            state,
135            session_id,
136            tool_call_id,
137            tool_name,
138            tool_parameters,
139        )),
140
141        Action::ToolResult {
142            session_id,
143            tool_call_id,
144            tool_name,
145            result,
146        } => Ok(handle_tool_result(
147            state,
148            session_id,
149            tool_call_id,
150            tool_name,
151            result,
152        )),
153
154        Action::ModelResponseComplete {
155            session_id,
156            op_id,
157            message_id,
158            content,
159            usage,
160            context_window_tokens,
161            timestamp,
162        } => Ok(handle_model_response_complete(
163            state,
164            session_id,
165            ModelResponseCompleteParams {
166                op_id,
167                message_id,
168                content,
169                usage,
170                context_window_tokens,
171                timestamp,
172            },
173        )),
174
175        Action::ModelResponseError {
176            session_id,
177            op_id,
178            error,
179        } => Ok(handle_model_response_error(
180            state, session_id, op_id, &error,
181        )),
182
183        Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
184
185        Action::DirectBashCommand {
186            session_id,
187            op_id,
188            message_id,
189            command,
190            timestamp,
191        } => Ok(handle_direct_bash(
192            state, session_id, op_id, message_id, command, timestamp,
193        )),
194
195        Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
196
197        Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
198
199        Action::RequestCompaction {
200            session_id,
201            op_id,
202            model,
203        } => handle_request_compaction(state, session_id, op_id, model),
204
205        Action::Hydrate {
206            session_id,
207            events,
208            starting_sequence,
209        } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
210
211        Action::WorkspaceFilesListed { files, .. } => {
212            state.workspace_files = files;
213            Ok(vec![])
214        }
215
216        Action::ToolSchemasAvailable { tools, .. } => {
217            state.tools = tools;
218            Ok(vec![])
219        }
220
221        Action::ToolSchemasUpdated { schemas, .. } => {
222            state.tools = schemas;
223            Ok(vec![])
224        }
225
226        Action::SwitchPrimaryAgent {
227            session_id,
228            agent_id,
229        } => handle_switch_primary_agent(state, session_id, agent_id),
230
231        Action::McpServerStateChanged {
232            session_id,
233            server_name,
234            state: new_state,
235        } => {
236            // When connected, merge MCP tools into state.tools
237            if let McpServerState::Connected { tools } = &new_state {
238                let tools = state.session_config.as_ref().map_or_else(
239                    || tools.clone(),
240                    |config| config.filter_tools_by_visibility(tools.clone()),
241                );
242
243                // Add MCP tools that aren't already present (by name)
244                for tool in tools {
245                    if !state.tools.iter().any(|t| t.name == tool.name) {
246                        state.tools.push(tool.clone());
247                    }
248                }
249            }
250
251            // When disconnected or failed, remove tools from that server
252            if matches!(
253                &new_state,
254                McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
255            ) {
256                let prefix = format!("mcp__{server_name}__");
257                state.tools.retain(|t| !t.name.starts_with(&prefix));
258            }
259
260            state
261                .mcp_servers
262                .insert(server_name.clone(), new_state.clone());
263            Ok(vec![Effect::EmitEvent {
264                session_id,
265                event: SessionEvent::McpServerStateChanged {
266                    server_name,
267                    state: new_state,
268                },
269            }])
270        }
271
272        Action::CompactionComplete {
273            session_id,
274            op_id,
275            compaction_id,
276            summary_message_id,
277            summary,
278            compacted_head_message_id,
279            previous_active_message_id,
280            model,
281            timestamp,
282        } => Ok(handle_compaction_complete(
283            state,
284            session_id,
285            CompactionCompleteParams {
286                op_id,
287                compaction_id,
288                summary_message_id,
289                summary,
290                compacted_head_message_id,
291                previous_active_message_id,
292                model_name: model,
293                timestamp,
294            },
295        )),
296
297        Action::CompactionFailed {
298            session_id,
299            op_id,
300            error,
301        } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
302
303        Action::Shutdown => Ok(vec![]),
304    }
305}
306
307fn handle_user_input(
308    state: &mut AppState,
309    session_id: crate::app::domain::types::SessionId,
310    content: Vec<UserContent>,
311    op_id: crate::app::domain::types::OpId,
312    message_id: crate::app::domain::types::MessageId,
313    model: crate::config::model::ModelId,
314    timestamp: u64,
315) -> Vec<Effect> {
316    let mut effects = Vec::new();
317
318    if state.has_active_operation() {
319        state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
320            content,
321            op_id,
322            message_id,
323            model,
324            queued_at: timestamp,
325        });
326        effects.push(Effect::EmitEvent {
327            session_id,
328            event: SessionEvent::QueueUpdated {
329                queue: snapshot_queue(state),
330            },
331        });
332        return effects;
333    }
334
335    let parent_id = state.message_graph.active_message_id.clone();
336
337    let message = Message {
338        data: MessageData::User { content },
339        timestamp,
340        id: message_id.0.clone(),
341        parent_message_id: parent_id,
342    };
343
344    state.message_graph.add_message(message.clone());
345    state.message_graph.active_message_id = Some(message_id.0.clone());
346
347    state.start_operation(op_id, OperationKind::AgentLoop);
348    state.operation_models.insert(op_id, model.clone());
349
350    effects.push(Effect::EmitEvent {
351        session_id,
352        event: SessionEvent::UserMessageAdded {
353            message: message.clone(),
354        },
355    });
356
357    effects.push(Effect::EmitEvent {
358        session_id,
359        event: SessionEvent::OperationStarted {
360            op_id,
361            kind: OperationKind::AgentLoop,
362        },
363    });
364
365    effects.push(Effect::CallModel {
366        session_id,
367        op_id,
368        model,
369        messages: state
370            .message_graph
371            .get_thread_messages()
372            .into_iter()
373            .cloned()
374            .collect(),
375        system_context: state.cached_system_context.clone(),
376        tools: state.tools.clone(),
377    });
378
379    effects
380}
381
382struct UserEditedMessageParams {
383    original_message_id: crate::app::domain::types::MessageId,
384    new_content: Vec<UserContent>,
385    op_id: crate::app::domain::types::OpId,
386    new_message_id: crate::app::domain::types::MessageId,
387    model: crate::config::model::ModelId,
388    timestamp: u64,
389}
390
391fn handle_user_edited_message(
392    state: &mut AppState,
393    session_id: crate::app::domain::types::SessionId,
394    params: UserEditedMessageParams,
395) -> Result<Vec<Effect>, ReduceError> {
396    let UserEditedMessageParams {
397        original_message_id,
398        new_content,
399        op_id,
400        new_message_id,
401        model,
402        timestamp,
403    } = params;
404    let mut effects = Vec::new();
405
406    if state.has_active_operation() {
407        return Err(invalid_action(
408            InvalidActionKind::OperationInFlight,
409            "Cannot edit message while an operation is active.",
410        ));
411    }
412
413    let parent_id = state
414        .message_graph
415        .messages
416        .iter()
417        .find(|m| m.id() == original_message_id.0)
418        .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
419
420    let message = Message {
421        data: MessageData::User {
422            content: new_content,
423        },
424        timestamp,
425        id: new_message_id.0.clone(),
426        parent_message_id: parent_id,
427    };
428
429    state.message_graph.add_message(message.clone());
430    state.message_graph.active_message_id = Some(new_message_id.0.clone());
431
432    state.start_operation(op_id, OperationKind::AgentLoop);
433    state.operation_models.insert(op_id, model.clone());
434
435    effects.push(Effect::EmitEvent {
436        session_id,
437        event: SessionEvent::UserMessageAdded {
438            message: message.clone(),
439        },
440    });
441
442    effects.push(Effect::EmitEvent {
443        session_id,
444        event: SessionEvent::OperationStarted {
445            op_id,
446            kind: OperationKind::AgentLoop,
447        },
448    });
449
450    effects.push(Effect::CallModel {
451        session_id,
452        op_id,
453        model,
454        messages: state
455            .message_graph
456            .get_thread_messages()
457            .into_iter()
458            .cloned()
459            .collect(),
460        system_context: state.cached_system_context.clone(),
461        tools: state.tools.clone(),
462    });
463
464    Ok(effects)
465}
466
467fn handle_tool_approval_requested(
468    state: &mut AppState,
469    session_id: crate::app::domain::types::SessionId,
470    request_id: crate::app::domain::types::RequestId,
471    tool_call: steer_tools::ToolCall,
472) -> Vec<Effect> {
473    let mut effects = Vec::new();
474
475    if let Err(error) = validate_tool_call(state, &tool_call) {
476        let error_message = error.to_string();
477        return fail_tool_call_without_execution(
478            state,
479            session_id,
480            tool_call,
481            error,
482            error_message,
483            "invalid",
484            true,
485        );
486    }
487
488    let decision = get_tool_decision(state, &tool_call);
489
490    match decision {
491        ToolDecision::Allow => {
492            let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
493                return vec![Effect::EmitEvent {
494                    session_id,
495                    event: SessionEvent::Error {
496                        message: "Tool approval requested without active operation".to_string(),
497                    },
498                }];
499            };
500            state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
501                &tool_call.id,
502            ));
503
504            effects.push(Effect::ExecuteTool {
505                session_id,
506                op_id,
507                tool_call,
508            });
509        }
510        ToolDecision::Deny => {
511            let error = ToolError::DeniedByPolicy(tool_call.name.clone());
512            let tool_name = tool_call.name.clone();
513            effects.extend(fail_tool_call_without_execution(
514                state,
515                session_id,
516                tool_call,
517                error,
518                format!("Tool '{tool_name}' denied by policy"),
519                "denied",
520                true,
521            ));
522        }
523        ToolDecision::Ask => {
524            if state.pending_approval.is_some() {
525                state.approval_queue.push_back(QueuedApproval { tool_call });
526                return effects;
527            }
528
529            state.pending_approval = Some(PendingApproval {
530                request_id,
531                tool_call: tool_call.clone(),
532            });
533
534            effects.push(Effect::EmitEvent {
535                session_id,
536                event: SessionEvent::ApprovalRequested {
537                    request_id,
538                    tool_call: tool_call.clone(),
539                },
540            });
541
542            effects.push(Effect::RequestUserApproval {
543                session_id,
544                request_id,
545                tool_call,
546            });
547        }
548    }
549
550    effects
551}
552
553fn validate_tool_call(
554    state: &AppState,
555    tool_call: &steer_tools::ToolCall,
556) -> Result<(), ToolError> {
557    if tool_call.name.trim().is_empty() {
558        return Err(ToolError::invalid_params(
559            "unknown",
560            "Malformed tool call: missing tool name",
561        ));
562    }
563
564    if tool_call.id.trim().is_empty() {
565        return Err(ToolError::invalid_params(
566            tool_call.name.clone(),
567            "Malformed tool call: missing tool call id",
568        ));
569    }
570
571    if state.tools.is_empty() {
572        return Ok(());
573    }
574
575    let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
576        return Ok(());
577    };
578
579    validate_against_json_schema(
580        &tool_call.name,
581        schema.input_schema.as_value(),
582        &tool_call.parameters,
583    )
584}
585
586fn validate_against_json_schema(
587    tool_name: &str,
588    schema: &Value,
589    params: &Value,
590) -> Result<(), ToolError> {
591    let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
592        ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
593    })?;
594
595    if let Err(errors) = validator.validate(params) {
596        let message = errors
597            .into_iter()
598            .map(|error| error.to_string())
599            .next()
600            .unwrap_or_else(|| "Parameters do not match schema".to_string());
601        return Err(ToolError::invalid_params(tool_name.to_string(), message));
602    }
603
604    Ok(())
605}
606
607fn emit_tool_failure_message(
608    state: &mut AppState,
609    session_id: crate::app::domain::types::SessionId,
610    tool_call_id: &str,
611    tool_name: &str,
612    tool_error: ToolError,
613    event_error: String,
614    message_id_prefix: &str,
615) -> Vec<Effect> {
616    let mut effects = Vec::new();
617
618    let tool_result = ToolResult::Error(tool_error);
619    let parent_id = state.message_graph.active_message_id.clone();
620    let tool_message = Message {
621        data: MessageData::Tool {
622            tool_use_id: tool_call_id.to_string(),
623            result: tool_result,
624        },
625        timestamp: 0,
626        id: format!("{message_id_prefix}_{tool_call_id}"),
627        parent_message_id: parent_id,
628    };
629    state.message_graph.add_message(tool_message.clone());
630
631    if let Some(model) = state
632        .current_operation
633        .as_ref()
634        .and_then(|op| state.operation_models.get(&op.op_id).cloned())
635    {
636        effects.push(Effect::EmitEvent {
637            session_id,
638            event: SessionEvent::ToolCallFailed {
639                id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
640                name: tool_name.to_string(),
641                error: event_error,
642                model,
643            },
644        });
645    }
646
647    effects.push(Effect::EmitEvent {
648        session_id,
649        event: SessionEvent::ToolMessageAdded {
650            message: tool_message,
651        },
652    });
653
654    effects
655}
656
657fn fail_tool_call_without_execution(
658    state: &mut AppState,
659    session_id: crate::app::domain::types::SessionId,
660    tool_call: steer_tools::ToolCall,
661    tool_error: ToolError,
662    event_error: String,
663    message_id_prefix: &str,
664    call_model_if_ready: bool,
665) -> Vec<Effect> {
666    let mut effects = emit_tool_failure_message(
667        state,
668        session_id,
669        &tool_call.id,
670        &tool_call.name,
671        tool_error,
672        event_error,
673        message_id_prefix,
674    );
675
676    if !call_model_if_ready {
677        return effects;
678    }
679
680    let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
681        effects.push(Effect::EmitEvent {
682            session_id,
683            event: SessionEvent::Error {
684                message: "Tool failure recorded without active operation".to_string(),
685            },
686        });
687        return effects;
688    };
689    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
690        model
691    } else {
692        effects.push(Effect::EmitEvent {
693            session_id,
694            event: SessionEvent::Error {
695                message: format!("Missing model for operation {op_id}"),
696            },
697        });
698        return effects;
699    };
700
701    let all_tools_complete = state
702        .current_operation
703        .as_ref()
704        .is_none_or(|op| op.pending_tool_calls.is_empty());
705    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
706
707    if all_tools_complete && no_pending_approvals {
708        effects.push(Effect::CallModel {
709            session_id,
710            op_id,
711            model,
712            messages: state
713                .message_graph
714                .get_thread_messages()
715                .into_iter()
716                .cloned()
717                .collect(),
718            system_context: state.cached_system_context.clone(),
719            tools: state.tools.clone(),
720        });
721    }
722
723    effects
724}
725
726fn handle_tool_approval_decided(
727    state: &mut AppState,
728    session_id: crate::app::domain::types::SessionId,
729    request_id: crate::app::domain::types::RequestId,
730    decision: ApprovalDecision,
731    remember: Option<ApprovalMemory>,
732) -> Vec<Effect> {
733    let mut effects = Vec::new();
734
735    let pending = match state.pending_approval.take() {
736        Some(p) if p.request_id == request_id => p,
737        other => {
738            state.pending_approval = other;
739            return effects;
740        }
741    };
742
743    let resolved_memory = if decision == ApprovalDecision::Approved {
744        match remember {
745            Some(ApprovalMemory::PendingTool) => {
746                Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
747            }
748            Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
749            Some(ApprovalMemory::BashPattern(pattern)) => {
750                Some(ApprovalMemory::BashPattern(pattern))
751            }
752            None => None,
753        }
754    } else {
755        None
756    };
757
758    effects.push(Effect::EmitEvent {
759        session_id,
760        event: SessionEvent::ApprovalDecided {
761            request_id,
762            decision,
763            remember: resolved_memory.clone(),
764        },
765    });
766
767    if decision == ApprovalDecision::Approved {
768        if let Some(ref memory) = resolved_memory {
769            match memory {
770                ApprovalMemory::Tool(name) => {
771                    state.approved_tools.insert(name.clone());
772                }
773                ApprovalMemory::BashPattern(pattern) => {
774                    state.approved_bash_patterns.insert(pattern.clone());
775                }
776                ApprovalMemory::PendingTool => {}
777            }
778        }
779
780        let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
781            effects.push(Effect::EmitEvent {
782                session_id,
783                event: SessionEvent::Error {
784                    message: "Tool approval decided without active operation".to_string(),
785                },
786            });
787            return effects;
788        };
789        state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
790            &pending.tool_call.id,
791        ));
792
793        effects.push(Effect::ExecuteTool {
794            session_id,
795            op_id,
796            tool_call: pending.tool_call,
797        });
798    } else {
799        let tool_name = pending.tool_call.name.clone();
800        let error = ToolError::DeniedByUser(tool_name.clone());
801        effects.extend(fail_tool_call_without_execution(
802            state,
803            session_id,
804            pending.tool_call,
805            error,
806            format!("Tool '{tool_name}' denied by user"),
807            "denied",
808            false,
809        ));
810    }
811
812    effects.extend(process_next_queued_approval(state, session_id));
813
814    effects
815}
816
817fn process_next_queued_approval(
818    state: &mut AppState,
819    session_id: crate::app::domain::types::SessionId,
820) -> Vec<Effect> {
821    let mut effects = Vec::new();
822
823    while let Some(queued) = state.approval_queue.pop_front() {
824        let decision = get_tool_decision(state, &queued.tool_call);
825
826        match decision {
827            ToolDecision::Allow => {
828                let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
829                    effects.push(Effect::EmitEvent {
830                        session_id,
831                        event: SessionEvent::Error {
832                            message: "Queued tool approval processed without active operation"
833                                .to_string(),
834                        },
835                    });
836                    state.approval_queue.push_front(queued);
837                    break;
838                };
839                state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
840                    &queued.tool_call.id,
841                ));
842
843                effects.push(Effect::ExecuteTool {
844                    session_id,
845                    op_id,
846                    tool_call: queued.tool_call,
847                });
848            }
849            ToolDecision::Deny => {
850                let tool_name = queued.tool_call.name.clone();
851                let error = ToolError::DeniedByPolicy(tool_name.clone());
852                effects.extend(fail_tool_call_without_execution(
853                    state,
854                    session_id,
855                    queued.tool_call,
856                    error,
857                    format!("Tool '{tool_name}' denied by policy"),
858                    "denied",
859                    false,
860                ));
861            }
862            ToolDecision::Ask => {
863                let request_id = crate::app::domain::types::RequestId::new();
864                state.pending_approval = Some(PendingApproval {
865                    request_id,
866                    tool_call: queued.tool_call.clone(),
867                });
868
869                effects.push(Effect::EmitEvent {
870                    session_id,
871                    event: SessionEvent::ApprovalRequested {
872                        request_id,
873                        tool_call: queued.tool_call.clone(),
874                    },
875                });
876
877                effects.push(Effect::RequestUserApproval {
878                    session_id,
879                    request_id,
880                    tool_call: queued.tool_call,
881                });
882
883                break;
884            }
885        }
886    }
887
888    let all_tools_complete = state
889        .current_operation
890        .as_ref()
891        .is_none_or(|op| op.pending_tool_calls.is_empty());
892    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
893
894    if all_tools_complete
895        && no_pending_approvals
896        && let Some(op) = &state.current_operation
897    {
898        let op_id = op.op_id;
899        if let Some(model) = state.operation_models.get(&op_id).cloned() {
900            effects.push(Effect::CallModel {
901                session_id,
902                op_id,
903                model,
904                messages: state
905                    .message_graph
906                    .get_thread_messages()
907                    .into_iter()
908                    .cloned()
909                    .collect(),
910                system_context: state.cached_system_context.clone(),
911                tools: state.tools.clone(),
912            });
913        }
914    }
915
916    effects
917}
918
919fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
920    if state.approved_tools.contains(&tool_call.name) {
921        return ToolDecision::Allow;
922    }
923
924    if tool_call.name == DISPATCH_AGENT_TOOL_NAME
925        && let Ok(params) =
926            serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
927        && let Some(config) = state.session_config.as_ref()
928    {
929        let policy = &config.tool_config.approval_policy;
930        match params.target {
931            DispatchAgentTarget::Resume { .. } => {
932                return ToolDecision::Allow;
933            }
934            DispatchAgentTarget::New { agent, .. } => {
935                let agent_id = agent
936                    .as_deref()
937                    .filter(|value| !value.trim().is_empty())
938                    .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
939                if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
940                    return ToolDecision::Allow;
941                }
942            }
943        }
944    }
945
946    if tool_call.name == BASH_TOOL_NAME
947        && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
948            tool_call.parameters.clone(),
949        )
950        && state.is_bash_pattern_approved(&params.command)
951    {
952        return ToolDecision::Allow;
953    }
954
955    state
956        .session_config
957        .as_ref()
958        .map_or(ToolDecision::Ask, |config| {
959            config
960                .tool_config
961                .approval_policy
962                .tool_decision(&tool_call.name)
963        })
964}
965
966fn handle_tool_execution_started(
967    state: &mut AppState,
968    session_id: crate::app::domain::types::SessionId,
969    tool_call_id: crate::app::domain::types::ToolCallId,
970    tool_name: String,
971    tool_parameters: serde_json::Value,
972) -> Vec<Effect> {
973    state.add_pending_tool_call(tool_call_id.clone());
974
975    let op_id = match state.current_operation.as_ref() {
976        Some(op) => op.op_id,
977        None => {
978            return vec![Effect::EmitEvent {
979                session_id,
980                event: SessionEvent::Error {
981                    message: "Tool call started without active operation".to_string(),
982                },
983            }];
984        }
985    };
986
987    let is_direct_bash = matches!(
988        state.current_operation.as_ref().map(|op| &op.kind),
989        Some(OperationKind::DirectBash { .. })
990    );
991
992    if is_direct_bash {
993        return vec![];
994    }
995
996    let model = match state.operation_models.get(&op_id).cloned() {
997        Some(model) => model,
998        None => {
999            return vec![Effect::EmitEvent {
1000                session_id,
1001                event: SessionEvent::Error {
1002                    message: format!("Missing model for tool call on operation {op_id}"),
1003                },
1004            }];
1005        }
1006    };
1007
1008    vec![Effect::EmitEvent {
1009        session_id,
1010        event: SessionEvent::ToolCallStarted {
1011            id: tool_call_id,
1012            name: tool_name,
1013            parameters: tool_parameters,
1014            model,
1015        },
1016    }]
1017}
1018
1019fn handle_tool_result(
1020    state: &mut AppState,
1021    session_id: crate::app::domain::types::SessionId,
1022    tool_call_id: crate::app::domain::types::ToolCallId,
1023    tool_name: String,
1024    result: Result<ToolResult, ToolError>,
1025) -> Vec<Effect> {
1026    let mut effects = Vec::new();
1027
1028    let op = match &state.current_operation {
1029        Some(op) => {
1030            if state.cancelled_ops.contains(&op.op_id) {
1031                tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
1032                return effects;
1033            }
1034            op.clone()
1035        }
1036        None => return effects,
1037    };
1038    let op_id = op.op_id;
1039
1040    state.remove_pending_tool_call(&tool_call_id);
1041
1042    let tool_result = match result {
1043        Ok(r) => r,
1044        Err(e) => ToolResult::Error(e),
1045    };
1046
1047    let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1048
1049    if is_direct_bash {
1050        let command = match &op.kind {
1051            OperationKind::DirectBash { command } => command.clone(),
1052            _ => tool_name,
1053        };
1054
1055        let (stdout, stderr, exit_code) = match &tool_result {
1056            ToolResult::Bash(result) => (
1057                result.stdout.clone(),
1058                result.stderr.clone(),
1059                result.exit_code,
1060            ),
1061            ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1062            other => (format!("{other:?}"), String::new(), 0),
1063        };
1064
1065        let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1066            state
1067                .message_graph
1068                .update_command_execution(
1069                    id.as_str(),
1070                    command.clone(),
1071                    stdout.clone(),
1072                    stderr.clone(),
1073                    exit_code,
1074                )
1075                .or_else(|| {
1076                    let parent_id = state.message_graph.active_message_id.clone();
1077                    let timestamp = Message::current_timestamp();
1078                    Some(Message {
1079                        data: MessageData::User {
1080                            content: vec![UserContent::CommandExecution {
1081                                command: command.clone(),
1082                                stdout: stdout.clone(),
1083                                stderr: stderr.clone(),
1084                                exit_code,
1085                            }],
1086                        },
1087                        timestamp,
1088                        id: id.to_string(),
1089                        parent_message_id: parent_id,
1090                    })
1091                })
1092        });
1093
1094        state.complete_operation(op_id);
1095
1096        if let Some(message) = updated {
1097            effects.push(Effect::EmitEvent {
1098                session_id,
1099                event: SessionEvent::MessageUpdated { message },
1100            });
1101        }
1102
1103        effects.push(Effect::EmitEvent {
1104            session_id,
1105            event: SessionEvent::OperationCompleted { op_id },
1106        });
1107
1108        effects.extend(maybe_start_queued_work(state, session_id));
1109
1110        return effects;
1111    }
1112
1113    let model = match state.operation_models.get(&op_id).cloned() {
1114        Some(model) => model,
1115        None => {
1116            return vec![Effect::EmitEvent {
1117                session_id,
1118                event: SessionEvent::Error {
1119                    message: format!("Missing model for tool result on operation {op_id}"),
1120                },
1121            }];
1122        }
1123    };
1124
1125    let event = match &tool_result {
1126        ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1127            id: tool_call_id.clone(),
1128            name: tool_name.clone(),
1129            error: e.to_string(),
1130            model: model.clone(),
1131        },
1132        _ => SessionEvent::ToolCallCompleted {
1133            id: tool_call_id.clone(),
1134            name: tool_name,
1135            result: tool_result.clone(),
1136            model: model.clone(),
1137        },
1138    };
1139
1140    effects.push(Effect::EmitEvent { session_id, event });
1141
1142    let parent_id = state.message_graph.active_message_id.clone();
1143    let tool_message = Message {
1144        data: MessageData::Tool {
1145            tool_use_id: tool_call_id.0.clone(),
1146            result: tool_result,
1147        },
1148        timestamp: 0,
1149        id: format!("tool_result_{}", tool_call_id.0),
1150        parent_message_id: parent_id,
1151    };
1152    state.message_graph.add_message(tool_message.clone());
1153
1154    effects.push(Effect::EmitEvent {
1155        session_id,
1156        event: SessionEvent::ToolMessageAdded {
1157            message: tool_message,
1158        },
1159    });
1160
1161    let all_tools_complete = state
1162        .current_operation
1163        .as_ref()
1164        .is_none_or(|op| op.pending_tool_calls.is_empty());
1165    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1166
1167    if all_tools_complete && no_pending_approvals {
1168        effects.push(Effect::CallModel {
1169            session_id,
1170            op_id,
1171            model,
1172            messages: state
1173                .message_graph
1174                .get_thread_messages()
1175                .into_iter()
1176                .cloned()
1177                .collect(),
1178            system_context: state.cached_system_context.clone(),
1179            tools: state.tools.clone(),
1180        });
1181    }
1182
1183    effects
1184}
1185
1186struct ModelResponseCompleteParams {
1187    op_id: crate::app::domain::types::OpId,
1188    message_id: crate::app::domain::types::MessageId,
1189    content: Vec<AssistantContent>,
1190    usage: Option<TokenUsage>,
1191    context_window_tokens: Option<u32>,
1192    timestamp: u64,
1193}
1194
1195fn handle_model_response_complete(
1196    state: &mut AppState,
1197    session_id: crate::app::domain::types::SessionId,
1198    params: ModelResponseCompleteParams,
1199) -> Vec<Effect> {
1200    let ModelResponseCompleteParams {
1201        op_id,
1202        message_id,
1203        content,
1204        usage,
1205        context_window_tokens,
1206        timestamp,
1207    } = params;
1208    let mut effects = Vec::new();
1209
1210    if state.cancelled_ops.contains(&op_id) {
1211        tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1212        return effects;
1213    }
1214
1215    let tool_calls: Vec<_> = content
1216        .iter()
1217        .filter_map(|c| {
1218            if let AssistantContent::ToolCall { tool_call, .. } = c {
1219                Some(tool_call.clone())
1220            } else {
1221                None
1222            }
1223        })
1224        .collect();
1225
1226    let parent_id = state.message_graph.active_message_id.clone();
1227
1228    let message = Message {
1229        data: MessageData::Assistant {
1230            content: content.clone(),
1231        },
1232        timestamp,
1233        id: message_id.0.clone(),
1234        parent_message_id: parent_id,
1235    };
1236
1237    state.message_graph.add_message(message.clone());
1238    state.message_graph.active_message_id = Some(message_id.0.clone());
1239
1240    let model = match state.operation_models.get(&op_id).cloned() {
1241        Some(model) => model,
1242        None => {
1243            return vec![Effect::EmitEvent {
1244                session_id,
1245                event: SessionEvent::Error {
1246                    message: format!("Missing model for operation {op_id}"),
1247                },
1248            }];
1249        }
1250    };
1251
1252    effects.push(Effect::EmitEvent {
1253        session_id,
1254        event: SessionEvent::AssistantMessageAdded {
1255            message,
1256            model: model.clone(),
1257        },
1258    });
1259
1260    // Capture usage before shadowing – TokenUsage is Copy.
1261    let outer_usage = usage;
1262
1263    if let Some(usage) = outer_usage {
1264        let context_window = derive_context_window_usage(usage.total_tokens, context_window_tokens);
1265        state.record_llm_usage(op_id, model.clone(), usage, context_window.clone());
1266        effects.push(Effect::EmitEvent {
1267            session_id,
1268            event: SessionEvent::LlmUsageUpdated {
1269                op_id,
1270                model: model.clone(),
1271                usage,
1272                context_window,
1273            },
1274        });
1275    }
1276
1277    if tool_calls.is_empty() {
1278        state.complete_operation(op_id);
1279        effects.push(Effect::EmitEvent {
1280            session_id,
1281            event: SessionEvent::OperationCompleted { op_id },
1282        });
1283        // Try auto-compact first; if it doesn't fire, drain queued work.
1284        let auto = maybe_auto_compact(
1285            state,
1286            session_id,
1287            outer_usage,
1288            context_window_tokens,
1289            &model,
1290        );
1291        if auto.is_empty() {
1292            effects.extend(maybe_start_queued_work(state, session_id));
1293        } else {
1294            effects.extend(auto);
1295        }
1296    } else {
1297        for tool_call in tool_calls {
1298            let request_id = crate::app::domain::types::RequestId::new();
1299            effects.extend(handle_tool_approval_requested(
1300                state, session_id, request_id, tool_call,
1301            ));
1302        }
1303    }
1304
1305    effects
1306}
1307
1308fn handle_model_response_error(
1309    state: &mut AppState,
1310    session_id: crate::app::domain::types::SessionId,
1311    op_id: crate::app::domain::types::OpId,
1312    error: &str,
1313) -> Vec<Effect> {
1314    let mut effects = Vec::new();
1315
1316    if state.cancelled_ops.contains(&op_id) {
1317        return effects;
1318    }
1319
1320    state.complete_operation(op_id);
1321
1322    effects.push(Effect::EmitEvent {
1323        session_id,
1324        event: SessionEvent::Error {
1325            message: error.to_string(),
1326        },
1327    });
1328
1329    effects.push(Effect::EmitEvent {
1330        session_id,
1331        event: SessionEvent::OperationCompleted { op_id },
1332    });
1333
1334    effects.extend(maybe_start_queued_work(state, session_id));
1335
1336    effects
1337}
1338
1339fn handle_direct_bash(
1340    state: &mut AppState,
1341    session_id: crate::app::domain::types::SessionId,
1342    op_id: crate::app::domain::types::OpId,
1343    message_id: crate::app::domain::types::MessageId,
1344    command: String,
1345    timestamp: u64,
1346) -> Vec<Effect> {
1347    let mut effects = Vec::new();
1348
1349    if state.has_active_operation() {
1350        state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1351            command,
1352            op_id,
1353            message_id,
1354            queued_at: timestamp,
1355        });
1356        effects.push(Effect::EmitEvent {
1357            session_id,
1358            event: SessionEvent::QueueUpdated {
1359                queue: snapshot_queue(state),
1360            },
1361        });
1362        return effects;
1363    }
1364
1365    let parent_id = state.message_graph.active_message_id.clone();
1366    let message = Message {
1367        data: MessageData::User {
1368            content: vec![UserContent::CommandExecution {
1369                command: command.clone(),
1370                stdout: String::new(),
1371                stderr: String::new(),
1372                exit_code: 0,
1373            }],
1374        },
1375        timestamp,
1376        id: message_id.0.clone(),
1377        parent_message_id: parent_id,
1378    };
1379
1380    state.message_graph.add_message(message.clone());
1381    state.message_graph.active_message_id = Some(message_id.0.clone());
1382
1383    state.start_operation(
1384        op_id,
1385        OperationKind::DirectBash {
1386            command: command.clone(),
1387        },
1388    );
1389    state.operation_messages.insert(op_id, message_id);
1390
1391    effects.push(Effect::EmitEvent {
1392        session_id,
1393        event: SessionEvent::UserMessageAdded { message },
1394    });
1395
1396    effects.push(Effect::EmitEvent {
1397        session_id,
1398        event: SessionEvent::OperationStarted {
1399            op_id,
1400            kind: OperationKind::DirectBash {
1401                command: command.clone(),
1402            },
1403        },
1404    });
1405
1406    let tool_call = steer_tools::ToolCall {
1407        id: format!("direct_bash_{op_id}"),
1408        name: BASH_TOOL_NAME.to_string(),
1409        parameters: serde_json::json!({ "command": command }),
1410    };
1411
1412    effects.push(Effect::ExecuteTool {
1413        session_id,
1414        op_id,
1415        tool_call,
1416    });
1417
1418    effects
1419}
1420
1421fn handle_dequeue_queued_item(
1422    state: &mut AppState,
1423    session_id: crate::app::domain::types::SessionId,
1424) -> Result<Vec<Effect>, ReduceError> {
1425    if state.pop_next_queued_work().is_some() {
1426        Ok(vec![Effect::EmitEvent {
1427            session_id,
1428            event: SessionEvent::QueueUpdated {
1429                queue: snapshot_queue(state),
1430            },
1431        }])
1432    } else {
1433        Err(invalid_action(
1434            InvalidActionKind::QueueEmpty,
1435            "No queued item to remove.",
1436        ))
1437    }
1438}
1439
1440fn maybe_auto_compact(
1441    state: &mut AppState,
1442    session_id: crate::app::domain::types::SessionId,
1443    usage: Option<TokenUsage>,
1444    context_window_tokens: Option<u32>,
1445    model: &crate::config::model::ModelId,
1446) -> Vec<Effect> {
1447    // Guard 1: config exists and auto-compaction is enabled.
1448    let config = match &state.session_config {
1449        Some(c) if c.auto_compaction.enabled => c,
1450        _ => return vec![],
1451    };
1452    let threshold = config.auto_compaction.threshold_ratio();
1453
1454    // Guard 2: we have usage data.
1455    let usage = match usage {
1456        Some(u) => u,
1457        None => return vec![],
1458    };
1459
1460    // Guard 3: context window utilization is above threshold (non-estimated).
1461    let cw = match derive_context_window_usage(usage.total_tokens, context_window_tokens) {
1462        Some(cw) if !cw.estimated => cw,
1463        _ => return vec![],
1464    };
1465    let ratio = match cw.utilization_ratio {
1466        Some(r) if r >= threshold => r,
1467        _ => return vec![],
1468    };
1469    let _ = ratio; // used only in guard
1470
1471    // Guard 4: enough messages to compact.
1472    if state.message_graph.get_thread_messages().len() < MIN_MESSAGES_FOR_COMPACT {
1473        return vec![];
1474    }
1475
1476    // Guard 5: no queued work pending.
1477    if !state.queued_work.is_empty() {
1478        return vec![];
1479    }
1480
1481    // Guard 6: no active operation (operation was completed before this call).
1482    if state.has_active_operation() {
1483        return vec![];
1484    }
1485
1486    let op_id = crate::app::domain::types::OpId::new();
1487    let kind = OperationKind::Compact {
1488        trigger: CompactTrigger::Auto,
1489    };
1490    state.start_operation(op_id, kind.clone());
1491    state.operation_models.insert(op_id, model.clone());
1492
1493    vec![
1494        Effect::EmitEvent {
1495            session_id,
1496            event: SessionEvent::OperationStarted { op_id, kind },
1497        },
1498        Effect::RequestCompaction {
1499            session_id,
1500            op_id,
1501            model: model.clone(),
1502        },
1503    ]
1504}
1505
1506fn maybe_start_queued_work(
1507    state: &mut AppState,
1508    session_id: crate::app::domain::types::SessionId,
1509) -> Vec<Effect> {
1510    if state.has_active_operation() {
1511        return vec![];
1512    }
1513
1514    let Some(next) = state.pop_next_queued_work() else {
1515        return vec![];
1516    };
1517
1518    let mut effects = vec![Effect::EmitEvent {
1519        session_id,
1520        event: SessionEvent::QueueUpdated {
1521            queue: snapshot_queue(state),
1522        },
1523    }];
1524
1525    match next {
1526        QueuedWorkItem::UserMessage(item) => {
1527            effects.extend(handle_user_input(
1528                state,
1529                session_id,
1530                item.content,
1531                item.op_id,
1532                item.message_id,
1533                item.model,
1534                item.queued_at,
1535            ));
1536        }
1537        QueuedWorkItem::DirectBash(item) => {
1538            effects.extend(handle_direct_bash(
1539                state,
1540                session_id,
1541                item.op_id,
1542                item.message_id,
1543                item.command,
1544                item.queued_at,
1545            ));
1546        }
1547    }
1548
1549    effects
1550}
1551
1552fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1553    state
1554        .queued_work
1555        .iter()
1556        .map(snapshot_queued_work_item)
1557        .collect()
1558}
1559
1560fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1561    match item {
1562        QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1563            kind: Some(QueuedWorkKind::UserMessage),
1564            content: message
1565                .content
1566                .iter()
1567                .filter_map(|item| match item {
1568                    UserContent::Text { text } => Some(text.as_str()),
1569                    _ => None,
1570                })
1571                .collect::<String>(),
1572            queued_at: message.queued_at,
1573            model: Some(message.model.clone()),
1574            op_id: message.op_id,
1575            message_id: message.message_id.clone(),
1576            attachment_count: message
1577                .content
1578                .iter()
1579                .filter(|item| matches!(item, UserContent::Image { .. }))
1580                .count() as u32,
1581        },
1582        QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1583            kind: Some(QueuedWorkKind::DirectBash),
1584            content: command.command.clone(),
1585            queued_at: command.queued_at,
1586            model: None,
1587            op_id: command.op_id,
1588            message_id: command.message_id.clone(),
1589            attachment_count: 0,
1590        },
1591    }
1592}
1593
1594fn handle_request_compaction(
1595    state: &mut AppState,
1596    session_id: crate::app::domain::types::SessionId,
1597    op_id: crate::app::domain::types::OpId,
1598    model: crate::config::model::ModelId,
1599) -> Result<Vec<Effect>, ReduceError> {
1600    let message_count = state.message_graph.get_thread_messages().len();
1601
1602    if state.has_active_operation() {
1603        return Err(invalid_action(
1604            InvalidActionKind::OperationInFlight,
1605            "Cannot compact while an operation is active.",
1606        ));
1607    }
1608
1609    if message_count < MIN_MESSAGES_FOR_COMPACT {
1610        return Ok(vec![Effect::EmitEvent {
1611            session_id,
1612            event: SessionEvent::CompactResult {
1613                result: crate::app::domain::event::CompactResult::InsufficientMessages,
1614                trigger: CompactTrigger::Manual,
1615            },
1616        }]);
1617    }
1618
1619    let kind = OperationKind::Compact {
1620        trigger: CompactTrigger::Manual,
1621    };
1622    state.start_operation(op_id, kind.clone());
1623    state.operation_models.insert(op_id, model.clone());
1624
1625    Ok(vec![
1626        Effect::EmitEvent {
1627            session_id,
1628            event: SessionEvent::OperationStarted { op_id, kind },
1629        },
1630        Effect::RequestCompaction {
1631            session_id,
1632            op_id,
1633            model,
1634        },
1635    ])
1636}
1637
1638fn handle_cancel(
1639    state: &mut AppState,
1640    session_id: crate::app::domain::types::SessionId,
1641    target_op: Option<crate::app::domain::types::OpId>,
1642) -> Vec<Effect> {
1643    let mut effects = Vec::new();
1644
1645    let op = match &state.current_operation {
1646        Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1647        _ => return effects,
1648    };
1649
1650    state.record_cancelled_op(op.op_id);
1651
1652    if let OperationKind::Compact { trigger } = op.kind {
1653        effects.push(Effect::EmitEvent {
1654            session_id,
1655            event: SessionEvent::CompactResult {
1656                result: crate::app::domain::event::CompactResult::Cancelled,
1657                trigger,
1658            },
1659        });
1660    }
1661
1662    let pending_approval = state.pending_approval.take();
1663    let queued_approvals = std::mem::take(&mut state.approval_queue);
1664
1665    if matches!(op.kind, OperationKind::AgentLoop) {
1666        if let Some(pending) = pending_approval {
1667            let tool_name = pending.tool_call.name.clone();
1668            effects.extend(emit_tool_failure_message(
1669                state,
1670                session_id,
1671                &pending.tool_call.id,
1672                &tool_name,
1673                ToolError::Cancelled(tool_name.clone()),
1674                format!("Tool '{tool_name}' cancelled"),
1675                "cancelled",
1676            ));
1677        }
1678
1679        for queued in queued_approvals {
1680            let tool_name = queued.tool_call.name.clone();
1681            effects.extend(emit_tool_failure_message(
1682                state,
1683                session_id,
1684                &queued.tool_call.id,
1685                &tool_name,
1686                ToolError::Cancelled(tool_name.clone()),
1687                format!("Tool '{tool_name}' cancelled"),
1688                "cancelled",
1689            ));
1690        }
1691
1692        for tool_call_id in &op.pending_tool_calls {
1693            let tool_name = state
1694                .message_graph
1695                .find_tool_name_by_id(tool_call_id.as_str())
1696                .unwrap_or_else(|| tool_call_id.as_str().to_string());
1697            let event_error = if tool_name == tool_call_id.as_str() {
1698                format!("Tool call '{tool_call_id}' cancelled")
1699            } else {
1700                format!("Tool '{tool_name}' cancelled")
1701            };
1702            effects.extend(emit_tool_failure_message(
1703                state,
1704                session_id,
1705                tool_call_id.as_str(),
1706                &tool_name,
1707                ToolError::Cancelled(tool_name.clone()),
1708                event_error,
1709                "cancelled",
1710            ));
1711        }
1712    }
1713    state.active_streams.remove(&op.op_id);
1714
1715    let dequeued_item = state.pop_next_queued_work();
1716    let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1717
1718    effects.push(Effect::EmitEvent {
1719        session_id,
1720        event: SessionEvent::OperationCancelled {
1721            op_id: op.op_id,
1722            info: CancellationInfo {
1723                pending_tool_calls: op.pending_tool_calls.len(),
1724                popped_queued_item,
1725            },
1726        },
1727    });
1728
1729    effects.push(Effect::CancelOperation {
1730        session_id,
1731        op_id: op.op_id,
1732    });
1733
1734    state.complete_operation(op.op_id);
1735
1736    if dequeued_item.is_some() {
1737        effects.push(Effect::EmitEvent {
1738            session_id,
1739            event: SessionEvent::QueueUpdated {
1740                queue: snapshot_queue(state),
1741            },
1742        });
1743    }
1744
1745    effects
1746}
1747
1748fn handle_hydrate(
1749    state: &mut AppState,
1750    session_id: crate::app::domain::types::SessionId,
1751    events: Vec<SessionEvent>,
1752    starting_sequence: u64,
1753) -> Vec<Effect> {
1754    for event in events {
1755        apply_event_to_state(state, &event);
1756    }
1757
1758    state.event_sequence = starting_sequence;
1759
1760    emit_mcp_connect_effects(state, session_id)
1761}
1762
1763fn handle_switch_primary_agent(
1764    state: &mut AppState,
1765    session_id: crate::app::domain::types::SessionId,
1766    agent_id: String,
1767) -> Result<Vec<Effect>, ReduceError> {
1768    if state.current_operation.is_some() {
1769        return Err(invalid_action(
1770            InvalidActionKind::OperationInFlight,
1771            "Cannot switch primary agent while an operation is active.",
1772        ));
1773    }
1774
1775    let Some(base_config) = state
1776        .base_session_config
1777        .as_ref()
1778        .or(state.session_config.as_ref())
1779    else {
1780        return Err(invalid_action(
1781            InvalidActionKind::MissingSessionConfig,
1782            "Cannot switch primary agent without session config.",
1783        ));
1784    };
1785
1786    let Some(_spec) = primary_agent_spec(&agent_id) else {
1787        return Err(invalid_action(
1788            InvalidActionKind::UnknownPrimaryAgent,
1789            format!("Unknown primary agent '{agent_id}'."),
1790        ));
1791    };
1792
1793    let mut updated_config = base_config.clone();
1794    updated_config.primary_agent_id = Some(agent_id.clone());
1795    let new_config = resolve_effective_config(&updated_config);
1796    let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1797
1798    apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1799
1800    let mut effects = Vec::new();
1801    effects.push(Effect::EmitEvent {
1802        session_id,
1803        event: SessionEvent::SessionConfigUpdated {
1804            config: Box::new(new_config),
1805            primary_agent_id: agent_id,
1806        },
1807    });
1808    effects.extend(backend_effects);
1809    effects.push(Effect::ReloadToolSchemas { session_id });
1810
1811    Ok(effects)
1812}
1813
1814fn apply_session_config_state(
1815    state: &mut AppState,
1816    config: &crate::session::state::SessionConfig,
1817    primary_agent_id: Option<String>,
1818    update_base: bool,
1819) {
1820    state.apply_session_config(config, primary_agent_id, update_base);
1821}
1822
1823fn mcp_backend_diff_effects(
1824    session_id: crate::app::domain::types::SessionId,
1825    old_config: &crate::session::state::SessionConfig,
1826    new_config: &crate::session::state::SessionConfig,
1827) -> Vec<Effect> {
1828    let old_map = collect_mcp_backends(old_config);
1829    let new_map = collect_mcp_backends(new_config);
1830
1831    let mut effects = Vec::new();
1832
1833    for (server_name, (old_transport, old_filter)) in &old_map {
1834        match new_map.get(server_name) {
1835            None => {
1836                effects.push(Effect::DisconnectMcpServer {
1837                    session_id,
1838                    server_name: server_name.clone(),
1839                });
1840            }
1841            Some((new_transport, new_filter)) => {
1842                if new_transport != old_transport || new_filter != old_filter {
1843                    effects.push(Effect::DisconnectMcpServer {
1844                        session_id,
1845                        server_name: server_name.clone(),
1846                    });
1847                    effects.push(Effect::ConnectMcpServer {
1848                        session_id,
1849                        config: McpServerConfig {
1850                            server_name: server_name.clone(),
1851                            transport: new_transport.clone(),
1852                            tool_filter: new_filter.clone(),
1853                        },
1854                    });
1855                }
1856            }
1857        }
1858    }
1859
1860    for (server_name, (new_transport, new_filter)) in &new_map {
1861        if !old_map.contains_key(server_name) {
1862            effects.push(Effect::ConnectMcpServer {
1863                session_id,
1864                config: McpServerConfig {
1865                    server_name: server_name.clone(),
1866                    transport: new_transport.clone(),
1867                    tool_filter: new_filter.clone(),
1868                },
1869            });
1870        }
1871    }
1872
1873    effects
1874}
1875
1876fn collect_mcp_backends(
1877    config: &crate::session::state::SessionConfig,
1878) -> std::collections::HashMap<
1879    String,
1880    (
1881        crate::tools::McpTransport,
1882        crate::session::state::ToolFilter,
1883    ),
1884> {
1885    let mut map = std::collections::HashMap::new();
1886
1887    for backend_config in &config.tool_config.backends {
1888        let BackendConfig::Mcp {
1889            server_name,
1890            transport,
1891            tool_filter,
1892        } = backend_config;
1893
1894        map.insert(
1895            server_name.clone(),
1896            (transport.clone(), tool_filter.clone()),
1897        );
1898    }
1899
1900    map
1901}
1902
1903pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1904    match event {
1905        SessionEvent::SessionCreated { config, .. } => {
1906            let primary_agent_id = config
1907                .primary_agent_id
1908                .clone()
1909                .unwrap_or_else(|| default_primary_agent_id().to_string());
1910            apply_session_config_state(state, config, Some(primary_agent_id), true);
1911        }
1912        SessionEvent::SessionConfigUpdated {
1913            config,
1914            primary_agent_id,
1915        } => {
1916            apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1917        }
1918        SessionEvent::AssistantMessageAdded { message, .. }
1919        | SessionEvent::UserMessageAdded { message }
1920        | SessionEvent::ToolMessageAdded { message } => {
1921            state.message_graph.add_message(message.clone());
1922            state.message_graph.active_message_id = Some(message.id().to_string());
1923        }
1924        SessionEvent::MessageUpdated { message } => {
1925            state.message_graph.replace_message(message.clone());
1926        }
1927        SessionEvent::ApprovalDecided {
1928            decision, remember, ..
1929        } => {
1930            if *decision == ApprovalDecision::Approved
1931                && let Some(memory) = remember
1932            {
1933                match memory {
1934                    ApprovalMemory::Tool(name) => {
1935                        state.approved_tools.insert(name.clone());
1936                    }
1937                    ApprovalMemory::BashPattern(pattern) => {
1938                        state.approved_bash_patterns.insert(pattern.clone());
1939                    }
1940                    ApprovalMemory::PendingTool => {}
1941                }
1942            }
1943            state.pending_approval = None;
1944        }
1945        SessionEvent::OperationCompleted { op_id } => {
1946            state.complete_operation(*op_id);
1947        }
1948        SessionEvent::OperationCancelled { op_id, .. } => {
1949            state.record_cancelled_op(*op_id);
1950            state.complete_operation(*op_id);
1951        }
1952        SessionEvent::LlmUsageUpdated {
1953            op_id,
1954            model,
1955            usage,
1956            context_window,
1957        } => {
1958            state.record_llm_usage(*op_id, model.clone(), *usage, context_window.clone());
1959        }
1960        SessionEvent::McpServerStateChanged {
1961            server_name,
1962            state: mcp_state,
1963        } => {
1964            state
1965                .mcp_servers
1966                .insert(server_name.clone(), mcp_state.clone());
1967        }
1968        SessionEvent::QueueUpdated { queue } => {
1969            let parse_content = |content: &str| {
1970                if content.trim().is_empty() {
1971                    None
1972                } else {
1973                    Some(vec![UserContent::Text {
1974                        text: content.to_string(),
1975                    }])
1976                }
1977            };
1978
1979            state.queued_work = queue
1980                .iter()
1981                .filter_map(|item| match item.kind {
1982                    Some(QueuedWorkKind::UserMessage) => {
1983                        let content = parse_content(item.content.as_str())?;
1984                        Some(QueuedWorkItem::UserMessage(
1985                            crate::app::domain::state::QueuedUserMessage {
1986                                content,
1987                                op_id: item.op_id,
1988                                message_id: item.message_id.clone(),
1989                                model: item.model.clone().unwrap_or_else(
1990                                    crate::config::model::builtin::claude_sonnet_4_5,
1991                                ),
1992                                queued_at: item.queued_at,
1993                            },
1994                        ))
1995                    }
1996                    Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1997                        crate::app::domain::state::QueuedBashCommand {
1998                            command: item.content.clone(),
1999                            op_id: item.op_id,
2000                            message_id: item.message_id.clone(),
2001                            queued_at: item.queued_at,
2002                        },
2003                    )),
2004                    None => {
2005                        let content = parse_content(item.content.as_str())?;
2006                        Some(QueuedWorkItem::UserMessage(
2007                            crate::app::domain::state::QueuedUserMessage {
2008                                content,
2009                                op_id: item.op_id,
2010                                message_id: item.message_id.clone(),
2011                                model: item.model.clone().unwrap_or_else(
2012                                    crate::config::model::builtin::claude_sonnet_4_5,
2013                                ),
2014                                queued_at: item.queued_at,
2015                            },
2016                        ))
2017                    }
2018                })
2019                .collect();
2020        }
2021        SessionEvent::ConversationCompacted { record } => {
2022            state
2023                .compaction_summary_ids
2024                .insert(record.summary_message_id.to_string());
2025            state
2026                .message_graph
2027                .mark_compaction_summary(record.summary_message_id.to_string());
2028        }
2029        _ => {}
2030    }
2031
2032    state.event_sequence += 1;
2033}
2034
2035struct CompactionCompleteParams {
2036    op_id: crate::app::domain::types::OpId,
2037    compaction_id: crate::app::domain::types::CompactionId,
2038    summary_message_id: crate::app::domain::types::MessageId,
2039    summary: String,
2040    compacted_head_message_id: crate::app::domain::types::MessageId,
2041    previous_active_message_id: Option<crate::app::domain::types::MessageId>,
2042    model_name: String,
2043    timestamp: u64,
2044}
2045
2046fn handle_compaction_complete(
2047    state: &mut AppState,
2048    session_id: crate::app::domain::types::SessionId,
2049    params: CompactionCompleteParams,
2050) -> Vec<Effect> {
2051    use crate::app::conversation::{AssistantContent, Message, MessageData};
2052    use crate::app::domain::types::CompactionRecord;
2053
2054    let CompactionCompleteParams {
2055        op_id,
2056        compaction_id,
2057        summary_message_id,
2058        summary,
2059        compacted_head_message_id,
2060        previous_active_message_id,
2061        model_name,
2062        timestamp,
2063    } = params;
2064
2065    let summary_message = Message {
2066        data: MessageData::Assistant {
2067            content: vec![AssistantContent::Text {
2068                text: summary.clone(),
2069            }],
2070        },
2071        id: summary_message_id.to_string(),
2072        parent_message_id: None,
2073        timestamp,
2074    };
2075
2076    state.message_graph.add_message(summary_message.clone());
2077
2078    // Mark the summary so get_active_thread() stops here (LLM won't see older messages).
2079    state
2080        .compaction_summary_ids
2081        .insert(summary_message_id.to_string());
2082    state
2083        .message_graph
2084        .mark_compaction_summary(summary_message_id.to_string());
2085
2086    let record = CompactionRecord::with_timestamp(
2087        compaction_id,
2088        summary_message_id,
2089        compacted_head_message_id,
2090        previous_active_message_id,
2091        model_name,
2092        timestamp,
2093    );
2094
2095    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
2096        model
2097    } else {
2098        state.complete_operation(op_id);
2099        return vec![Effect::EmitEvent {
2100            session_id,
2101            event: SessionEvent::Error {
2102                message: format!("Missing model for compaction operation {op_id}"),
2103            },
2104        }];
2105    };
2106
2107    let trigger = state
2108        .current_operation
2109        .as_ref()
2110        .and_then(|op| match op.kind {
2111            OperationKind::Compact { trigger } => Some(trigger),
2112            _ => None,
2113        })
2114        .unwrap_or(CompactTrigger::Manual);
2115
2116    state.complete_operation(op_id);
2117
2118    let mut effects = vec![
2119        Effect::EmitEvent {
2120            session_id,
2121            event: SessionEvent::AssistantMessageAdded {
2122                message: summary_message,
2123                model,
2124            },
2125        },
2126        Effect::EmitEvent {
2127            session_id,
2128            event: SessionEvent::CompactResult {
2129                result: crate::app::domain::event::CompactResult::Success(summary),
2130                trigger,
2131            },
2132        },
2133        Effect::EmitEvent {
2134            session_id,
2135            event: SessionEvent::ConversationCompacted { record },
2136        },
2137        Effect::EmitEvent {
2138            session_id,
2139            event: SessionEvent::OperationCompleted { op_id },
2140        },
2141    ];
2142
2143    effects.extend(maybe_start_queued_work(state, session_id));
2144
2145    effects
2146}
2147
2148fn handle_compaction_failed(
2149    state: &mut AppState,
2150    session_id: crate::app::domain::types::SessionId,
2151    op_id: crate::app::domain::types::OpId,
2152    error: String,
2153) -> Vec<Effect> {
2154    let trigger = state
2155        .current_operation
2156        .as_ref()
2157        .and_then(|op| match op.kind {
2158            OperationKind::Compact { trigger } => Some(trigger),
2159            _ => None,
2160        })
2161        .unwrap_or(CompactTrigger::Manual);
2162
2163    state.complete_operation(op_id);
2164
2165    let mut effects = vec![
2166        Effect::EmitEvent {
2167            session_id,
2168            event: SessionEvent::CompactResult {
2169                result: crate::app::domain::event::CompactResult::Failed(error),
2170                trigger,
2171            },
2172        },
2173        Effect::EmitEvent {
2174            session_id,
2175            event: SessionEvent::OperationCompleted { op_id },
2176        },
2177    ];
2178
2179    effects.extend(maybe_start_queued_work(state, session_id));
2180
2181    effects
2182}
2183
2184fn emit_mcp_connect_effects(
2185    state: &AppState,
2186    session_id: crate::app::domain::types::SessionId,
2187) -> Vec<Effect> {
2188    let mut effects = Vec::new();
2189
2190    let Some(ref config) = state.session_config else {
2191        return effects;
2192    };
2193
2194    for backend_config in &config.tool_config.backends {
2195        let BackendConfig::Mcp {
2196            server_name,
2197            transport,
2198            tool_filter,
2199        } = backend_config;
2200
2201        let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2202            matches!(
2203                s,
2204                McpServerState::Connecting | McpServerState::Connected { .. }
2205            )
2206        });
2207
2208        if !already_connected {
2209            effects.push(Effect::ConnectMcpServer {
2210                session_id,
2211                config: McpServerConfig {
2212                    server_name: server_name.clone(),
2213                    transport: transport.clone(),
2214                    tool_filter: tool_filter.clone(),
2215                },
2216            });
2217        }
2218    }
2219
2220    effects
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225    use super::*;
2226    use crate::api::provider::TokenUsage;
2227    use crate::app::domain::event::ContextWindowUsage;
2228    use crate::app::domain::state::{OperationState, PendingApproval};
2229    use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2230    use crate::config::model::builtin;
2231    use crate::primary_agents::resolve_effective_config;
2232    use crate::session::state::{
2233        ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2234        ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2235    };
2236    use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2237    use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2238    use schemars::schema_for;
2239    use serde_json::json;
2240    use std::collections::HashSet;
2241    use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2242
2243    fn test_state() -> AppState {
2244        AppState::new(SessionId::new())
2245    }
2246
2247    fn test_schema(name: &str) -> ToolSchema {
2248        ToolSchema {
2249            name: name.to_string(),
2250            display_name: name.to_string(),
2251            description: String::new(),
2252            input_schema: InputSchema::empty_object(),
2253        }
2254    }
2255
2256    fn base_session_config() -> SessionConfig {
2257        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2258        config.primary_agent_id = Some("normal".to_string());
2259        config.policy_overrides = SessionPolicyOverrides::empty();
2260        resolve_effective_config(&config)
2261    }
2262
2263    fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2264        super::reduce(state, action).expect("reduce failed")
2265    }
2266
2267    #[test]
2268    fn test_user_input_starts_operation() {
2269        let mut state = test_state();
2270        let session_id = state.session_id;
2271        let op_id = OpId::new();
2272        let message_id = MessageId::new();
2273        let model = builtin::claude_sonnet_4_5();
2274
2275        let effects = reduce(
2276            &mut state,
2277            Action::UserInput {
2278                session_id,
2279                content: vec![UserContent::Text {
2280                    text: "Hello".to_string(),
2281                }],
2282                op_id,
2283                message_id,
2284                model,
2285                timestamp: 1_234_567_890,
2286            },
2287        );
2288
2289        assert_eq!(state.message_graph.messages.len(), 1);
2290        assert!(state.current_operation.is_some());
2291        assert!(
2292            effects
2293                .iter()
2294                .any(|e| matches!(e, Effect::CallModel { .. }))
2295        );
2296    }
2297
2298    #[test]
2299    fn test_switch_primary_agent_updates_visibility() {
2300        let mut state = test_state();
2301        let session_id = state.session_id;
2302        let config = base_session_config();
2303        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2304
2305        let effects = reduce(
2306            &mut state,
2307            Action::SwitchPrimaryAgent {
2308                session_id,
2309                agent_id: "plan".to_string(),
2310            },
2311        );
2312
2313        let updated = state.session_config.as_ref().expect("config");
2314        match &updated.tool_config.visibility {
2315            ToolVisibility::Whitelist(allowed) => {
2316                assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2317                for name in READ_ONLY_TOOL_NAMES {
2318                    assert!(allowed.contains(*name));
2319                }
2320                assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2321            }
2322            other => panic!("Unexpected tool visibility: {other:?}"),
2323        }
2324        assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2325        assert!(effects.iter().any(|e| matches!(
2326            e,
2327            Effect::EmitEvent {
2328                event: SessionEvent::SessionConfigUpdated { .. },
2329                ..
2330            }
2331        )));
2332        assert!(
2333            effects
2334                .iter()
2335                .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2336        );
2337    }
2338
2339    #[test]
2340    fn test_switch_primary_agent_yolo_auto_approves() {
2341        let mut state = test_state();
2342        let session_id = state.session_id;
2343        let config = base_session_config();
2344        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2345
2346        let _ = reduce(
2347            &mut state,
2348            Action::SwitchPrimaryAgent {
2349                session_id,
2350                agent_id: "yolo".to_string(),
2351            },
2352        );
2353
2354        let updated = state.session_config.as_ref().expect("config");
2355        assert_eq!(
2356            updated.tool_config.approval_policy.default_behavior,
2357            UnapprovedBehavior::Allow
2358        );
2359    }
2360
2361    #[test]
2362    fn test_switch_primary_agent_preserves_policy_overrides() {
2363        let mut state = test_state();
2364        let session_id = state.session_id;
2365
2366        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2367        config.primary_agent_id = Some("normal".to_string());
2368        config.policy_overrides = SessionPolicyOverrides {
2369            default_model: None,
2370            tool_visibility: None,
2371            approval_policy: ToolApprovalPolicyOverrides {
2372                default_behavior: Some(UnapprovedBehavior::Deny),
2373                preapproved: ApprovalRulesOverrides::empty(),
2374            },
2375        };
2376        let config = resolve_effective_config(&config);
2377        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2378
2379        let _ = reduce(
2380            &mut state,
2381            Action::SwitchPrimaryAgent {
2382                session_id,
2383                agent_id: "yolo".to_string(),
2384            },
2385        );
2386
2387        let updated = state.session_config.as_ref().expect("config");
2388        assert_eq!(
2389            updated.tool_config.approval_policy.default_behavior,
2390            UnapprovedBehavior::Deny
2391        );
2392        assert_eq!(
2393            updated.policy_overrides.approval_policy.default_behavior,
2394            Some(UnapprovedBehavior::Deny)
2395        );
2396    }
2397
2398    #[test]
2399    fn dispatch_agent_resume_is_auto_approved() {
2400        let mut state = test_state();
2401        let session_id = state.session_id;
2402        let config = base_session_config();
2403        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2404
2405        let tool_call = ToolCall {
2406            id: "tc_dispatch_resume".to_string(),
2407            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2408            parameters: json!({
2409                "prompt": "resume work",
2410                "target": {
2411                    "session": "resume",
2412                    "session_id": SessionId::new().to_string()
2413                }
2414            }),
2415        };
2416
2417        let decision = get_tool_decision(&state, &tool_call);
2418        assert_eq!(decision, ToolDecision::Allow);
2419        assert_eq!(state.session_id, session_id);
2420    }
2421
2422    #[test]
2423    fn test_switch_primary_agent_restores_base_prompt() {
2424        let mut state = test_state();
2425        let session_id = state.session_id;
2426        let mut config = base_session_config();
2427        config.system_prompt = Some("base prompt".to_string());
2428        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2429
2430        let _ = reduce(
2431            &mut state,
2432            Action::SwitchPrimaryAgent {
2433                session_id,
2434                agent_id: "plan".to_string(),
2435            },
2436        );
2437
2438        let _ = reduce(
2439            &mut state,
2440            Action::SwitchPrimaryAgent {
2441                session_id,
2442                agent_id: "normal".to_string(),
2443            },
2444        );
2445
2446        let updated = state.session_config.as_ref().expect("config");
2447        assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2448    }
2449
2450    #[test]
2451    fn test_switch_primary_agent_blocked_during_operation() {
2452        let mut state = test_state();
2453        let session_id = state.session_id;
2454        let config = base_session_config();
2455        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2456
2457        state.current_operation = Some(OperationState {
2458            op_id: OpId::new(),
2459            kind: OperationKind::AgentLoop,
2460            pending_tool_calls: HashSet::new(),
2461        });
2462
2463        let result = super::reduce(
2464            &mut state,
2465            Action::SwitchPrimaryAgent {
2466                session_id,
2467                agent_id: "plan".to_string(),
2468            },
2469        );
2470
2471        assert!(matches!(
2472            result,
2473            Err(ReduceError::InvalidAction {
2474                kind: InvalidActionKind::OperationInFlight,
2475                ..
2476            })
2477        ));
2478        assert!(state.primary_agent_id.as_deref() == Some("normal"));
2479    }
2480
2481    #[test]
2482    fn test_late_result_ignored_after_cancel() {
2483        let mut state = test_state();
2484        let session_id = state.session_id;
2485        let op_id = OpId::new();
2486        let tool_call_id = ToolCallId::from_string("tc_1");
2487
2488        state.current_operation = Some(OperationState {
2489            op_id,
2490            kind: OperationKind::AgentLoop,
2491            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2492        });
2493
2494        let _ = reduce(
2495            &mut state,
2496            Action::Cancel {
2497                session_id,
2498                op_id: None,
2499            },
2500        );
2501
2502        state.current_operation = Some(OperationState {
2503            op_id,
2504            kind: OperationKind::AgentLoop,
2505            pending_tool_calls: HashSet::new(),
2506        });
2507        state
2508            .operation_models
2509            .insert(op_id, builtin::claude_sonnet_4_5());
2510        state
2511            .operation_models
2512            .insert(op_id, builtin::claude_sonnet_4_5());
2513        state
2514            .operation_models
2515            .insert(op_id, builtin::claude_sonnet_4_5());
2516
2517        let effects = reduce(
2518            &mut state,
2519            Action::ToolResult {
2520                session_id,
2521                tool_call_id,
2522                tool_name: "test".to_string(),
2523                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2524                    tool_name: "test".to_string(),
2525                    payload: "done".to_string(),
2526                })),
2527            },
2528        );
2529
2530        assert!(effects.is_empty());
2531    }
2532
2533    #[test]
2534    fn test_pre_approved_tool_executes_immediately() {
2535        let mut state = test_state();
2536        let session_id = state.session_id;
2537        let op_id = OpId::new();
2538
2539        state.approved_tools.insert("test_tool".to_string());
2540        state.current_operation = Some(OperationState {
2541            op_id,
2542            kind: OperationKind::AgentLoop,
2543            pending_tool_calls: HashSet::new(),
2544        });
2545        state
2546            .operation_models
2547            .insert(op_id, builtin::claude_sonnet_4_5());
2548        state
2549            .operation_models
2550            .insert(op_id, builtin::claude_sonnet_4_5());
2551        state
2552            .operation_models
2553            .insert(op_id, builtin::claude_sonnet_4_5());
2554
2555        let tool_call = steer_tools::ToolCall {
2556            id: "tc_1".to_string(),
2557            name: "test_tool".to_string(),
2558            parameters: serde_json::json!({}),
2559        };
2560
2561        let effects = reduce(
2562            &mut state,
2563            Action::ToolApprovalRequested {
2564                session_id,
2565                request_id: RequestId::new(),
2566                tool_call,
2567            },
2568        );
2569
2570        assert!(
2571            effects
2572                .iter()
2573                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2574        );
2575        assert!(state.pending_approval.is_none());
2576    }
2577
2578    #[test]
2579    fn test_denied_tool_request_emits_failure_message() {
2580        let mut state = test_state();
2581        let session_id = state.session_id;
2582        let op_id = OpId::new();
2583
2584        state.current_operation = Some(OperationState {
2585            op_id,
2586            kind: OperationKind::AgentLoop,
2587            pending_tool_calls: HashSet::new(),
2588        });
2589
2590        state
2591            .operation_models
2592            .insert(op_id, builtin::claude_sonnet_4_5());
2593
2594        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2595        config.tool_config.approval_policy = ToolApprovalPolicy {
2596            default_behavior: UnapprovedBehavior::Deny,
2597            preapproved: ApprovalRules::default(),
2598        };
2599        state.session_config = Some(config);
2600
2601        let tool_call = steer_tools::ToolCall {
2602            id: "tc_1".to_string(),
2603            name: "test_tool".to_string(),
2604            parameters: serde_json::json!({}),
2605        };
2606
2607        let effects = reduce(
2608            &mut state,
2609            Action::ToolApprovalRequested {
2610                session_id,
2611                request_id: RequestId::new(),
2612                tool_call,
2613            },
2614        );
2615
2616        assert!(effects.iter().any(|e| matches!(
2617            e,
2618            Effect::EmitEvent {
2619                event: SessionEvent::ToolCallFailed { .. },
2620                ..
2621            }
2622        )));
2623        assert!(effects.iter().any(|e| matches!(
2624            e,
2625            Effect::EmitEvent {
2626                event: SessionEvent::ToolMessageAdded { .. },
2627                ..
2628            }
2629        )));
2630        assert!(
2631            !effects
2632                .iter()
2633                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2634        );
2635        assert!(
2636            !effects
2637                .iter()
2638                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2639        );
2640        assert!(state.pending_approval.is_none());
2641        assert!(state.approval_queue.is_empty());
2642        assert_eq!(state.message_graph.messages.len(), 1);
2643
2644        match &state.message_graph.messages[0].data {
2645            MessageData::Tool { result, .. } => match result {
2646                ToolResult::Error(error) => {
2647                    assert!(
2648                        matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2649                    );
2650                }
2651                _ => panic!("expected denied tool error"),
2652            },
2653            _ => panic!("expected tool message"),
2654        }
2655    }
2656
2657    #[test]
2658    fn test_user_denied_tool_request_emits_failure_message() {
2659        let mut state = test_state();
2660        let session_id = state.session_id;
2661        let op_id = OpId::new();
2662
2663        state.current_operation = Some(OperationState {
2664            op_id,
2665            kind: OperationKind::AgentLoop,
2666            pending_tool_calls: HashSet::new(),
2667        });
2668        state
2669            .operation_models
2670            .insert(op_id, builtin::claude_sonnet_4_5());
2671
2672        let tool_call = steer_tools::ToolCall {
2673            id: "tc_1".to_string(),
2674            name: "test_tool".to_string(),
2675            parameters: serde_json::json!({}),
2676        };
2677        let request_id = RequestId::new();
2678        state.pending_approval = Some(PendingApproval {
2679            request_id,
2680            tool_call: tool_call.clone(),
2681        });
2682
2683        let effects = reduce(
2684            &mut state,
2685            Action::ToolApprovalDecided {
2686                session_id,
2687                request_id,
2688                decision: ApprovalDecision::Denied,
2689                remember: None,
2690            },
2691        );
2692
2693        assert!(effects.iter().any(|e| matches!(
2694            e,
2695            Effect::EmitEvent {
2696                event: SessionEvent::ToolCallFailed { .. },
2697                ..
2698            }
2699        )));
2700        assert!(effects.iter().any(|e| matches!(
2701            e,
2702            Effect::EmitEvent {
2703                event: SessionEvent::ToolMessageAdded { .. },
2704                ..
2705            }
2706        )));
2707        assert!(
2708            !effects
2709                .iter()
2710                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2711        );
2712        assert!(state.pending_approval.is_none());
2713        assert!(state.approval_queue.is_empty());
2714        assert_eq!(state.message_graph.messages.len(), 1);
2715
2716        match &state.message_graph.messages[0].data {
2717            MessageData::Tool { result, .. } => match result {
2718                ToolResult::Error(error) => {
2719                    assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2720                }
2721                _ => panic!("expected denied tool error"),
2722            },
2723            _ => panic!("expected tool message"),
2724        }
2725    }
2726
2727    #[test]
2728    fn test_cancel_pops_queued_item_without_auto_start() {
2729        let mut state = test_state();
2730        let session_id = state.session_id;
2731        let op_id = OpId::new();
2732
2733        state.current_operation = Some(OperationState {
2734            op_id,
2735            kind: OperationKind::AgentLoop,
2736            pending_tool_calls: HashSet::new(),
2737        });
2738        state
2739            .operation_models
2740            .insert(op_id, builtin::claude_sonnet_4_5());
2741
2742        let queued_op = OpId::new();
2743        let queued_message_id = MessageId::from_string("queued_msg");
2744        let _ = reduce(
2745            &mut state,
2746            Action::UserInput {
2747                session_id,
2748                content: vec![UserContent::Text {
2749                    text: "Queued message".to_string(),
2750                }],
2751                op_id: queued_op,
2752                message_id: queued_message_id.clone(),
2753                model: builtin::claude_sonnet_4_5(),
2754                timestamp: 1,
2755            },
2756        );
2757
2758        let effects = reduce(
2759            &mut state,
2760            Action::Cancel {
2761                session_id,
2762                op_id: None,
2763            },
2764        );
2765
2766        assert!(state.current_operation.is_none());
2767        assert!(state.queued_work.is_empty());
2768
2769        let cancellation_info = effects.iter().find_map(|effect| match effect {
2770            Effect::EmitEvent {
2771                event: SessionEvent::OperationCancelled { info, .. },
2772                ..
2773            } => Some(info),
2774            _ => None,
2775        });
2776        let info = cancellation_info.expect("expected OperationCancelled event");
2777        let popped = info
2778            .popped_queued_item
2779            .as_ref()
2780            .expect("expected popped queued item");
2781        assert_eq!(popped.content, "Queued message");
2782        assert_eq!(popped.op_id, queued_op);
2783        assert_eq!(popped.message_id, queued_message_id);
2784
2785        assert!(
2786            !effects.iter().any(|effect| matches!(
2787                effect,
2788                Effect::EmitEvent {
2789                    event: SessionEvent::OperationStarted { .. },
2790                    ..
2791                }
2792            )),
2793            "queued work should not auto-start on cancel"
2794        );
2795    }
2796
2797    #[test]
2798    fn test_cancel_injects_tool_results_for_pending_calls() {
2799        let mut state = test_state();
2800        let session_id = state.session_id;
2801        let op_id = OpId::new();
2802
2803        let tool_call = ToolCall {
2804            id: "tc_1".to_string(),
2805            name: "test_tool".to_string(),
2806            parameters: serde_json::json!({}),
2807        };
2808
2809        state.message_graph.add_message(Message {
2810            data: MessageData::Assistant {
2811                content: vec![AssistantContent::ToolCall {
2812                    tool_call: tool_call.clone(),
2813                    thought_signature: None,
2814                }],
2815            },
2816            timestamp: 0,
2817            id: "msg_1".to_string(),
2818            parent_message_id: None,
2819        });
2820
2821        state.current_operation = Some(OperationState {
2822            op_id,
2823            kind: OperationKind::AgentLoop,
2824            pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2825        });
2826        state
2827            .operation_models
2828            .insert(op_id, builtin::claude_sonnet_4_5());
2829
2830        let effects = reduce(
2831            &mut state,
2832            Action::Cancel {
2833                session_id,
2834                op_id: None,
2835            },
2836        );
2837
2838        assert!(effects.iter().any(|e| matches!(
2839            e,
2840            Effect::EmitEvent {
2841                event: SessionEvent::ToolMessageAdded { .. },
2842                ..
2843            }
2844        )));
2845
2846        let tool_message = state
2847            .message_graph
2848            .messages
2849            .iter()
2850            .find(|message| matches!(message.data, MessageData::Tool { .. }))
2851            .expect("tool result should be injected on cancel");
2852
2853        match &tool_message.data {
2854            MessageData::Tool { result, .. } => match result {
2855                ToolResult::Error(error) => {
2856                    assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2857                }
2858                _ => panic!("expected cancelled tool error"),
2859            },
2860            _ => panic!("expected tool message"),
2861        }
2862    }
2863
2864    #[test]
2865    fn test_malformed_tool_call_auto_denies() {
2866        let mut state = test_state();
2867        let session_id = state.session_id;
2868        let op_id = OpId::new();
2869
2870        state.current_operation = Some(OperationState {
2871            op_id,
2872            kind: OperationKind::AgentLoop,
2873            pending_tool_calls: HashSet::new(),
2874        });
2875
2876        state
2877            .operation_models
2878            .insert(op_id, builtin::claude_sonnet_4_5());
2879
2880        let mut properties = serde_json::Map::new();
2881        properties.insert("command".to_string(), json!({ "type": "string" }));
2882
2883        state.tools.push(ToolSchema {
2884            name: "test_tool".to_string(),
2885            display_name: "test_tool".to_string(),
2886            description: String::new(),
2887            input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2888        });
2889
2890        let tool_call = ToolCall {
2891            id: "tc_1".to_string(),
2892            name: "test_tool".to_string(),
2893            parameters: json!({}),
2894        };
2895
2896        let effects = reduce(
2897            &mut state,
2898            Action::ToolApprovalRequested {
2899                session_id,
2900                request_id: RequestId::new(),
2901                tool_call,
2902            },
2903        );
2904
2905        assert!(effects.iter().any(|e| matches!(
2906            e,
2907            Effect::EmitEvent {
2908                event: SessionEvent::ToolCallFailed { .. },
2909                ..
2910            }
2911        )));
2912        assert!(effects.iter().any(|e| matches!(
2913            e,
2914            Effect::EmitEvent {
2915                event: SessionEvent::ToolMessageAdded { .. },
2916                ..
2917            }
2918        )));
2919        assert!(
2920            !effects
2921                .iter()
2922                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2923        );
2924        assert!(
2925            !effects
2926                .iter()
2927                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2928        );
2929        assert!(state.pending_approval.is_none());
2930        assert!(state.approval_queue.is_empty());
2931        assert_eq!(state.message_graph.messages.len(), 1);
2932
2933        match &state.message_graph.messages[0].data {
2934            MessageData::Tool { result, .. } => match result {
2935                ToolResult::Error(error) => {
2936                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2937                }
2938                _ => panic!("expected invalid params tool error"),
2939            },
2940            _ => panic!("expected tool message"),
2941        }
2942    }
2943
2944    #[test]
2945    fn test_approval_queuing() {
2946        let mut state = test_state();
2947        let session_id = state.session_id;
2948        let op_id = OpId::new();
2949
2950        state.current_operation = Some(OperationState {
2951            op_id,
2952            kind: OperationKind::AgentLoop,
2953            pending_tool_calls: HashSet::new(),
2954        });
2955
2956        let tool_call_1 = steer_tools::ToolCall {
2957            id: "tc_1".to_string(),
2958            name: "tool_1".to_string(),
2959            parameters: serde_json::json!({}),
2960        };
2961        let tool_call_2 = steer_tools::ToolCall {
2962            id: "tc_2".to_string(),
2963            name: "tool_2".to_string(),
2964            parameters: serde_json::json!({}),
2965        };
2966
2967        let _ = reduce(
2968            &mut state,
2969            Action::ToolApprovalRequested {
2970                session_id,
2971                request_id: RequestId::new(),
2972                tool_call: tool_call_1,
2973            },
2974        );
2975
2976        assert!(state.pending_approval.is_some());
2977
2978        let _ = reduce(
2979            &mut state,
2980            Action::ToolApprovalRequested {
2981                session_id,
2982                request_id: RequestId::new(),
2983                tool_call: tool_call_2,
2984            },
2985        );
2986
2987        assert_eq!(state.approval_queue.len(), 1);
2988    }
2989
2990    #[test]
2991    fn test_dispatch_agent_missing_target_auto_denies() {
2992        let mut state = test_state();
2993        let session_id = state.session_id;
2994        let op_id = OpId::new();
2995
2996        state.current_operation = Some(OperationState {
2997            op_id,
2998            kind: OperationKind::AgentLoop,
2999            pending_tool_calls: HashSet::new(),
3000        });
3001
3002        state
3003            .operation_models
3004            .insert(op_id, builtin::claude_sonnet_4_5());
3005
3006        let input_schema: InputSchema =
3007            schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
3008        state.tools.push(ToolSchema {
3009            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3010            display_name: "Dispatch Agent".to_string(),
3011            description: String::new(),
3012            input_schema,
3013        });
3014
3015        let tool_call = ToolCall {
3016            id: "tc_dispatch".to_string(),
3017            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3018            parameters: json!({ "prompt": "hello world" }),
3019        };
3020
3021        let effects = reduce(
3022            &mut state,
3023            Action::ToolApprovalRequested {
3024                session_id,
3025                request_id: RequestId::new(),
3026                tool_call,
3027            },
3028        );
3029
3030        assert!(effects.iter().any(|e| matches!(
3031            e,
3032            Effect::EmitEvent {
3033                event: SessionEvent::ToolCallFailed { .. },
3034                ..
3035            }
3036        )));
3037        assert!(effects.iter().any(|e| matches!(
3038            e,
3039            Effect::EmitEvent {
3040                event: SessionEvent::ToolMessageAdded { .. },
3041                ..
3042            }
3043        )));
3044        assert!(
3045            !effects
3046                .iter()
3047                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3048        );
3049        assert!(state.pending_approval.is_none());
3050        assert!(state.approval_queue.is_empty());
3051
3052        match &state.message_graph.messages[0].data {
3053            MessageData::Tool { result, .. } => match result {
3054                ToolResult::Error(error) => {
3055                    assert!(matches!(error, ToolError::InvalidParams { .. }));
3056                }
3057                _ => panic!("expected invalid params tool error"),
3058            },
3059            _ => panic!("expected tool message"),
3060        }
3061    }
3062
3063    #[test]
3064    fn test_model_response_with_tool_calls_requests_approval() {
3065        let mut state = test_state();
3066        let session_id = state.session_id;
3067        let op_id = OpId::new();
3068        let message_id = MessageId::new();
3069
3070        state.current_operation = Some(OperationState {
3071            op_id,
3072            kind: OperationKind::AgentLoop,
3073            pending_tool_calls: HashSet::new(),
3074        });
3075        state
3076            .operation_models
3077            .insert(op_id, builtin::claude_sonnet_4_5());
3078
3079        let tool_call = steer_tools::ToolCall {
3080            id: "tc_1".to_string(),
3081            name: "bash".to_string(),
3082            parameters: serde_json::json!({"command": "ls"}),
3083        };
3084
3085        let content = vec![
3086            AssistantContent::Text {
3087                text: "Let me list the files.".to_string(),
3088            },
3089            AssistantContent::ToolCall {
3090                tool_call: tool_call.clone(),
3091                thought_signature: None,
3092            },
3093        ];
3094
3095        let effects = reduce(
3096            &mut state,
3097            Action::ModelResponseComplete {
3098                session_id,
3099                op_id,
3100                message_id,
3101                content,
3102                usage: None,
3103                context_window_tokens: None,
3104                timestamp: 12345,
3105            },
3106        );
3107
3108        assert!(state.pending_approval.is_some());
3109        assert!(
3110            effects
3111                .iter()
3112                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3113        );
3114        assert!(state.current_operation.is_some());
3115    }
3116
3117    #[test]
3118    fn test_model_response_no_tools_completes_operation() {
3119        let mut state = test_state();
3120        let session_id = state.session_id;
3121        let op_id = OpId::new();
3122        let message_id = MessageId::new();
3123
3124        state.current_operation = Some(OperationState {
3125            op_id,
3126            kind: OperationKind::AgentLoop,
3127            pending_tool_calls: HashSet::new(),
3128        });
3129        state
3130            .operation_models
3131            .insert(op_id, builtin::claude_sonnet_4_5());
3132
3133        let content = vec![AssistantContent::Text {
3134            text: "Hello! How can I help?".to_string(),
3135        }];
3136
3137        let effects = reduce(
3138            &mut state,
3139            Action::ModelResponseComplete {
3140                session_id,
3141                op_id,
3142                message_id,
3143                content,
3144                usage: None,
3145                context_window_tokens: None,
3146                timestamp: 12345,
3147            },
3148        );
3149
3150        assert!(state.current_operation.is_none());
3151        assert!(effects.iter().any(|e| matches!(
3152            e,
3153            Effect::EmitEvent {
3154                event: SessionEvent::OperationCompleted { .. },
3155                ..
3156            }
3157        )));
3158        assert!(!effects.iter().any(|e| matches!(
3159            e,
3160            Effect::EmitEvent {
3161                event: SessionEvent::LlmUsageUpdated { .. },
3162                ..
3163            }
3164        )));
3165    }
3166
3167    #[test]
3168    fn test_model_response_with_usage_emits_usage_event_and_updates_state() {
3169        let mut state = test_state();
3170        let session_id = state.session_id;
3171        let op_id = OpId::new();
3172        let message_id = MessageId::new();
3173        let model = builtin::claude_sonnet_4_5();
3174        let usage = TokenUsage::new(10, 20, 30);
3175
3176        state.current_operation = Some(OperationState {
3177            op_id,
3178            kind: OperationKind::AgentLoop,
3179            pending_tool_calls: HashSet::new(),
3180        });
3181        state.operation_models.insert(op_id, model.clone());
3182
3183        let effects = reduce(
3184            &mut state,
3185            Action::ModelResponseComplete {
3186                session_id,
3187                op_id,
3188                message_id,
3189                content: vec![AssistantContent::Text {
3190                    text: "Done".to_string(),
3191                }],
3192                usage: Some(usage),
3193                context_window_tokens: None,
3194                timestamp: 12345,
3195            },
3196        );
3197
3198        assert!(effects.iter().any(|e| matches!(
3199            e,
3200            Effect::EmitEvent {
3201                event: SessionEvent::AssistantMessageAdded { .. },
3202                ..
3203            }
3204        )));
3205        assert!(effects.iter().any(|e| matches!(
3206            e,
3207            Effect::EmitEvent {
3208                event: SessionEvent::LlmUsageUpdated {
3209                    op_id: usage_op_id,
3210                    model: usage_model,
3211                    usage: usage_payload,
3212                    context_window,
3213                },
3214                ..
3215            } if *usage_op_id == op_id && usage_model == &model && *usage_payload == usage && context_window.is_none()
3216        )));
3217
3218        let snapshot = state
3219            .llm_usage_by_op
3220            .get(&op_id)
3221            .expect("usage snapshot should be recorded");
3222        assert_eq!(snapshot.model, model);
3223        assert_eq!(snapshot.usage, usage);
3224        assert!(snapshot.context_window.is_none());
3225        assert_eq!(state.llm_usage_totals, usage);
3226    }
3227
3228    #[test]
3229    fn test_model_response_with_usage_and_context_window_emits_utilization() {
3230        let mut state = test_state();
3231        let session_id = state.session_id;
3232        let op_id = OpId::new();
3233        let message_id = MessageId::new();
3234        let model = builtin::claude_sonnet_4_5();
3235        let usage = TokenUsage::new(10, 20, 30);
3236        let context_window_tokens = Some(100u32);
3237
3238        state.current_operation = Some(OperationState {
3239            op_id,
3240            kind: OperationKind::AgentLoop,
3241            pending_tool_calls: HashSet::new(),
3242        });
3243        state.operation_models.insert(op_id, model.clone());
3244
3245        let effects = reduce(
3246            &mut state,
3247            Action::ModelResponseComplete {
3248                session_id,
3249                op_id,
3250                message_id,
3251                content: vec![AssistantContent::Text {
3252                    text: "Done".to_string(),
3253                }],
3254                usage: Some(usage),
3255                context_window_tokens,
3256                timestamp: 12345,
3257            },
3258        );
3259
3260        assert!(effects.iter().any(|e| matches!(
3261            e,
3262            Effect::EmitEvent {
3263                event: SessionEvent::LlmUsageUpdated {
3264                    op_id: usage_op_id,
3265                    model: usage_model,
3266                    usage: usage_payload,
3267                    context_window,
3268                },
3269                ..
3270            } if *usage_op_id == op_id
3271                && usage_model == &model
3272                && *usage_payload == usage
3273                && matches!(context_window, Some(ContextWindowUsage {
3274                    max_context_tokens: Some(100),
3275                    remaining_tokens: Some(70),
3276                    utilization_ratio: Some(ratio),
3277                    estimated: false,
3278                }) if (*ratio - 0.3).abs() < 1e-9)
3279        )));
3280
3281        let snapshot = state
3282            .llm_usage_by_op
3283            .get(&op_id)
3284            .expect("usage snapshot should be recorded");
3285        assert_eq!(snapshot.model, model);
3286        assert_eq!(snapshot.usage, usage);
3287        assert_eq!(
3288            snapshot.context_window,
3289            Some(ContextWindowUsage {
3290                max_context_tokens: Some(100),
3291                remaining_tokens: Some(70),
3292                utilization_ratio: Some(0.3),
3293                estimated: false,
3294            })
3295        );
3296        assert_eq!(state.llm_usage_totals, usage);
3297    }
3298
3299    #[test]
3300    fn test_model_response_with_zero_context_window_marks_estimated() {
3301        let mut state = test_state();
3302        let session_id = state.session_id;
3303        let op_id = OpId::new();
3304        let message_id = MessageId::new();
3305        let model = builtin::claude_sonnet_4_5();
3306        let usage = TokenUsage::new(10, 20, 30);
3307
3308        state.current_operation = Some(OperationState {
3309            op_id,
3310            kind: OperationKind::AgentLoop,
3311            pending_tool_calls: HashSet::new(),
3312        });
3313        state.operation_models.insert(op_id, model);
3314
3315        let effects = reduce(
3316            &mut state,
3317            Action::ModelResponseComplete {
3318                session_id,
3319                op_id,
3320                message_id,
3321                content: vec![AssistantContent::Text {
3322                    text: "Done".to_string(),
3323                }],
3324                usage: Some(usage),
3325                context_window_tokens: Some(0),
3326                timestamp: 12345,
3327            },
3328        );
3329
3330        assert!(effects.iter().any(|e| matches!(
3331            e,
3332            Effect::EmitEvent {
3333                event: SessionEvent::LlmUsageUpdated {
3334                    context_window,
3335                    ..
3336                },
3337                ..
3338            } if matches!(context_window, Some(ContextWindowUsage {
3339                max_context_tokens: Some(0),
3340                remaining_tokens: Some(0),
3341                utilization_ratio: None,
3342                estimated: true,
3343            }))
3344        )));
3345    }
3346
3347    #[test]
3348    fn test_apply_usage_event_updates_replay_state_and_totals() {
3349        let mut state = test_state();
3350        let op_a = OpId::new();
3351        let op_b = OpId::new();
3352        let model = builtin::claude_sonnet_4_5();
3353
3354        apply_event_to_state(
3355            &mut state,
3356            &SessionEvent::LlmUsageUpdated {
3357                op_id: op_a,
3358                model: model.clone(),
3359                usage: TokenUsage::new(3, 5, 8),
3360                context_window: None,
3361            },
3362        );
3363
3364        assert_eq!(state.llm_usage_totals, TokenUsage::new(3, 5, 8));
3365
3366        apply_event_to_state(
3367            &mut state,
3368            &SessionEvent::LlmUsageUpdated {
3369                op_id: op_a,
3370                model: model.clone(),
3371                usage: TokenUsage::new(7, 11, 18),
3372                context_window: None,
3373            },
3374        );
3375
3376        assert_eq!(state.llm_usage_totals, TokenUsage::new(7, 11, 18));
3377
3378        apply_event_to_state(
3379            &mut state,
3380            &SessionEvent::LlmUsageUpdated {
3381                op_id: op_b,
3382                model,
3383                usage: TokenUsage::new(2, 4, 6),
3384                context_window: None,
3385            },
3386        );
3387
3388        assert_eq!(state.llm_usage_totals, TokenUsage::new(9, 15, 24));
3389    }
3390
3391    #[test]
3392    fn test_out_of_order_completion_preserves_newer_operation() {
3393        let mut state = test_state();
3394        let session_id = state.session_id;
3395        let model = builtin::claude_sonnet_4_5();
3396
3397        let op_a = OpId::new();
3398        let op_b = OpId::new();
3399
3400        let _ = reduce(
3401            &mut state,
3402            Action::UserInput {
3403                session_id,
3404                content: vec![UserContent::Text {
3405                    text: "first".to_string(),
3406                }],
3407                op_id: op_a,
3408                message_id: MessageId::new(),
3409                model: model.clone(),
3410                timestamp: 1,
3411            },
3412        );
3413
3414        let _ = reduce(
3415            &mut state,
3416            Action::UserInput {
3417                session_id,
3418                content: vec![UserContent::Text {
3419                    text: "second".to_string(),
3420                }],
3421                op_id: op_b,
3422                message_id: MessageId::new(),
3423                model: model.clone(),
3424                timestamp: 2,
3425            },
3426        );
3427
3428        let _ = reduce(
3429            &mut state,
3430            Action::ModelResponseComplete {
3431                session_id,
3432                op_id: op_a,
3433                message_id: MessageId::new(),
3434                content: vec![AssistantContent::Text {
3435                    text: "done A".to_string(),
3436                }],
3437                usage: None,
3438                context_window_tokens: None,
3439                timestamp: 3,
3440            },
3441        );
3442
3443        assert!(
3444            state
3445                .current_operation
3446                .as_ref()
3447                .is_some_and(|op| op.op_id == op_b)
3448        );
3449        assert!(state.operation_models.contains_key(&op_b));
3450        assert!(!state.operation_models.contains_key(&op_a));
3451
3452        let effects = reduce(
3453            &mut state,
3454            Action::ModelResponseComplete {
3455                session_id,
3456                op_id: op_b,
3457                message_id: MessageId::new(),
3458                content: vec![AssistantContent::Text {
3459                    text: "done B".to_string(),
3460                }],
3461                usage: None,
3462                context_window_tokens: None,
3463                timestamp: 4,
3464            },
3465        );
3466
3467        assert!(effects.iter().any(|e| matches!(
3468            e,
3469            Effect::EmitEvent {
3470                event: SessionEvent::OperationCompleted { op_id },
3471                ..
3472            } if *op_id == op_b
3473        )));
3474        assert!(!effects.iter().any(|e| matches!(
3475            e,
3476            Effect::EmitEvent {
3477                event: SessionEvent::Error { message },
3478                ..
3479            } if message.contains("Missing model for operation")
3480        )));
3481    }
3482
3483    #[test]
3484    fn test_tool_approval_does_not_call_model_before_result() {
3485        let mut state = test_state();
3486        let session_id = state.session_id;
3487        let op_id = OpId::new();
3488
3489        state.current_operation = Some(OperationState {
3490            op_id,
3491            kind: OperationKind::AgentLoop,
3492            pending_tool_calls: HashSet::new(),
3493        });
3494        state
3495            .operation_models
3496            .insert(op_id, builtin::claude_sonnet_4_5());
3497
3498        let tool_call = steer_tools::ToolCall {
3499            id: "tc_1".to_string(),
3500            name: "bash".to_string(),
3501            parameters: serde_json::json!({"command": "ls"}),
3502        };
3503        let request_id = RequestId::new();
3504        state.pending_approval = Some(PendingApproval {
3505            request_id,
3506            tool_call: tool_call.clone(),
3507        });
3508
3509        let effects = reduce(
3510            &mut state,
3511            Action::ToolApprovalDecided {
3512                session_id,
3513                request_id,
3514                decision: ApprovalDecision::Approved,
3515                remember: None,
3516            },
3517        );
3518
3519        assert!(
3520            effects
3521                .iter()
3522                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3523        );
3524        assert!(
3525            !effects
3526                .iter()
3527                .any(|e| matches!(e, Effect::CallModel { .. }))
3528        );
3529        assert!(state.current_operation.as_ref().is_some_and(|op| {
3530            op.pending_tool_calls
3531                .contains(&ToolCallId::from_string("tc_1"))
3532        }));
3533    }
3534
3535    #[test]
3536    fn test_mcp_tool_visibility_and_disconnect_removal() {
3537        let mut state = test_state();
3538        let session_id = state.session_id;
3539
3540        let mut allowed = HashSet::new();
3541        allowed.insert("mcp__alpha__allowed".to_string());
3542
3543        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3544        config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3545        state.session_config = Some(config);
3546
3547        state.tools.push(test_schema("bash"));
3548
3549        let _ = reduce(
3550            &mut state,
3551            Action::McpServerStateChanged {
3552                session_id,
3553                server_name: "alpha".to_string(),
3554                state: McpServerState::Connected {
3555                    tools: vec![
3556                        test_schema("mcp__alpha__allowed"),
3557                        test_schema("mcp__alpha__blocked"),
3558                    ],
3559                },
3560            },
3561        );
3562
3563        assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3564        assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3565
3566        let _ = reduce(
3567            &mut state,
3568            Action::McpServerStateChanged {
3569                session_id,
3570                server_name: "alpha".to_string(),
3571                state: McpServerState::Disconnected { error: None },
3572            },
3573        );
3574
3575        assert!(
3576            !state
3577                .tools
3578                .iter()
3579                .any(|t| t.name.starts_with("mcp__alpha__"))
3580        );
3581        assert!(state.tools.iter().any(|t| t.name == "bash"));
3582    }
3583
3584    #[test]
3585    fn test_tool_result_continues_agent_loop() {
3586        let mut state = test_state();
3587        let session_id = state.session_id;
3588        let op_id = OpId::new();
3589        let tool_call_id = ToolCallId::from_string("tc_1");
3590
3591        state.current_operation = Some(OperationState {
3592            op_id,
3593            kind: OperationKind::AgentLoop,
3594            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3595        });
3596        state
3597            .operation_models
3598            .insert(op_id, builtin::claude_sonnet_4_5());
3599
3600        let effects = reduce(
3601            &mut state,
3602            Action::ToolResult {
3603                session_id,
3604                tool_call_id,
3605                tool_name: "bash".to_string(),
3606                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3607                    tool_name: "bash".to_string(),
3608                    payload: "file1.txt\nfile2.txt".to_string(),
3609                })),
3610            },
3611        );
3612
3613        assert!(
3614            effects
3615                .iter()
3616                .any(|e| matches!(e, Effect::CallModel { .. }))
3617        );
3618    }
3619
3620    #[test]
3621    fn test_tool_result_waits_for_pending_tools() {
3622        let mut state = test_state();
3623        let session_id = state.session_id;
3624        let op_id = OpId::new();
3625        let tool_call_id_1 = ToolCallId::from_string("tc_1");
3626        let tool_call_id_2 = ToolCallId::from_string("tc_2");
3627
3628        state.current_operation = Some(OperationState {
3629            op_id,
3630            kind: OperationKind::AgentLoop,
3631            pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3632                .into_iter()
3633                .collect(),
3634        });
3635        state
3636            .operation_models
3637            .insert(op_id, builtin::claude_sonnet_4_5());
3638
3639        let effects = reduce(
3640            &mut state,
3641            Action::ToolResult {
3642                session_id,
3643                tool_call_id: tool_call_id_1,
3644                tool_name: "bash".to_string(),
3645                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3646                    tool_name: "bash".to_string(),
3647                    payload: "done".to_string(),
3648                })),
3649            },
3650        );
3651
3652        assert!(
3653            !effects
3654                .iter()
3655                .any(|e| matches!(e, Effect::CallModel { .. }))
3656        );
3657
3658        let effects = reduce(
3659            &mut state,
3660            Action::ToolResult {
3661                session_id,
3662                tool_call_id: tool_call_id_2,
3663                tool_name: "bash".to_string(),
3664                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3665                    tool_name: "bash".to_string(),
3666                    payload: "done".to_string(),
3667                })),
3668            },
3669        );
3670
3671        assert!(
3672            effects
3673                .iter()
3674                .any(|e| matches!(e, Effect::CallModel { .. }))
3675        );
3676    }
3677
3678    // ── Auto-compaction reducer tests ──────────────────────────────────
3679
3680    fn setup_auto_compact_state(
3681        enabled: bool,
3682        threshold_percent: u32,
3683        message_count: usize,
3684    ) -> AppState {
3685        use crate::session::state::AutoCompactionConfig;
3686
3687        let mut state = test_state();
3688
3689        // Apply session config with auto-compaction settings.
3690        let mut config = base_session_config();
3691        config.auto_compaction = AutoCompactionConfig {
3692            enabled,
3693            threshold_percent,
3694        };
3695        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
3696
3697        // Add the requested number of messages, linked via parent_message_id so
3698        // that `get_thread_messages` returns them all.
3699        let mut prev_id: Option<String> = None;
3700        for i in 0..message_count {
3701            let id = format!("msg_{}", i);
3702            state.message_graph.add_message(Message {
3703                data: MessageData::User {
3704                    content: vec![UserContent::Text {
3705                        text: format!("message {}", i),
3706                    }],
3707                },
3708                timestamp: i as u64,
3709                id: id.clone(),
3710                parent_message_id: prev_id.clone(),
3711            });
3712            prev_id = Some(id);
3713        }
3714
3715        state
3716    }
3717
3718    #[test]
3719    fn test_maybe_auto_compact_triggers_when_all_guards_pass() {
3720        let mut state = setup_auto_compact_state(true, 90, 4);
3721        let session_id = state.session_id;
3722        let model = builtin::claude_sonnet_4_5();
3723
3724        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3725        let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3726
3727        assert!(!effects.is_empty(), "expected non-empty effects");
3728        assert!(
3729            effects
3730                .iter()
3731                .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3732            "expected Effect::RequestCompaction"
3733        );
3734        assert!(
3735            effects.iter().any(|e| matches!(
3736                e,
3737                Effect::EmitEvent {
3738                    event: SessionEvent::OperationStarted { .. },
3739                    ..
3740                }
3741            )),
3742            "expected OperationStarted event"
3743        );
3744    }
3745
3746    #[test]
3747    fn test_maybe_auto_compact_disabled_config() {
3748        let mut state = setup_auto_compact_state(false, 90, 4);
3749        let session_id = state.session_id;
3750        let model = builtin::claude_sonnet_4_5();
3751
3752        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3753        let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3754
3755        assert!(
3756            effects.is_empty(),
3757            "expected empty effects when auto-compaction is disabled"
3758        );
3759    }
3760
3761    #[test]
3762    fn test_maybe_auto_compact_below_threshold() {
3763        let mut state = setup_auto_compact_state(true, 90, 4);
3764        let session_id = state.session_id;
3765        let model = builtin::claude_sonnet_4_5();
3766
3767        // 50% utilization, well below 90% threshold.
3768        let usage = Some(TokenUsage::new(40_000, 10_000, 50_000));
3769        let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3770
3771        assert!(
3772            effects.is_empty(),
3773            "expected empty effects when utilization is below threshold"
3774        );
3775    }
3776
3777    #[test]
3778    fn test_maybe_auto_compact_insufficient_messages() {
3779        // Only 2 messages — below MIN_MESSAGES_FOR_COMPACT (3).
3780        let mut state = setup_auto_compact_state(true, 90, 2);
3781        let session_id = state.session_id;
3782        let model = builtin::claude_sonnet_4_5();
3783
3784        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3785        let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3786
3787        assert!(
3788            effects.is_empty(),
3789            "expected empty effects with insufficient messages"
3790        );
3791    }
3792
3793    #[test]
3794    fn test_maybe_auto_compact_queued_work_blocks() {
3795        use crate::app::domain::state::QueuedUserMessage;
3796
3797        let mut state = setup_auto_compact_state(true, 90, 4);
3798        let session_id = state.session_id;
3799        let model = builtin::claude_sonnet_4_5();
3800
3801        // Add queued work so guard 5 fires.
3802        state.queue_user_message(QueuedUserMessage {
3803            op_id: OpId::new(),
3804            message_id: MessageId::new(),
3805            content: vec![UserContent::Text {
3806                text: "queued msg".to_string(),
3807            }],
3808            model: builtin::claude_sonnet_4_5(),
3809            queued_at: 0,
3810        });
3811
3812        let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3813        let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3814
3815        assert!(
3816            effects.is_empty(),
3817            "expected empty effects when there is queued work"
3818        );
3819    }
3820
3821    #[test]
3822    fn test_handle_compaction_failed_emits_compact_result() {
3823        use crate::app::domain::event::{CompactResult, CompactTrigger};
3824
3825        let mut state = test_state();
3826        let session_id = state.session_id;
3827        let op_id = OpId::new();
3828
3829        // Set up an active Compact { trigger: Auto } operation.
3830        state.start_operation(
3831            op_id,
3832            OperationKind::Compact {
3833                trigger: CompactTrigger::Auto,
3834            },
3835        );
3836        state
3837            .operation_models
3838            .insert(op_id, builtin::claude_sonnet_4_5());
3839
3840        let effects = reduce(
3841            &mut state,
3842            Action::CompactionFailed {
3843                session_id,
3844                op_id,
3845                error: "test error".into(),
3846            },
3847        );
3848
3849        // Should contain CompactResult::Failed with the right trigger.
3850        let has_compact_result = effects.iter().any(|e| {
3851            matches!(
3852                e,
3853                Effect::EmitEvent {
3854                    event: SessionEvent::CompactResult {
3855                        result: CompactResult::Failed(msg),
3856                        trigger: CompactTrigger::Auto,
3857                    },
3858                    ..
3859                } if msg == "test error"
3860            )
3861        });
3862        assert!(
3863            has_compact_result,
3864            "expected CompactResult::Failed event with Auto trigger"
3865        );
3866
3867        // Should NOT contain a SessionEvent::Error.
3868        let has_error_event = effects.iter().any(|e| {
3869            matches!(
3870                e,
3871                Effect::EmitEvent {
3872                    event: SessionEvent::Error { .. },
3873                    ..
3874                }
3875            )
3876        });
3877        assert!(
3878            !has_error_event,
3879            "should not emit SessionEvent::Error for compaction failure"
3880        );
3881    }
3882
3883    /// Helper: extract the `messages` field from the first `Effect::CallModel` in a list of effects.
3884    fn extract_callmodel_messages(effects: &[Effect]) -> Option<&Vec<Message>> {
3885        effects.iter().find_map(|e| match e {
3886            Effect::CallModel { messages, .. } => Some(messages),
3887            _ => None,
3888        })
3889    }
3890
3891    /// Helper: check whether any message in the list contains the given text substring.
3892    fn messages_contain_text(messages: &[Message], needle: &str) -> bool {
3893        messages.iter().any(|m| match &m.data {
3894            MessageData::User { content } => content.iter().any(|c| match c {
3895                UserContent::Text { text } => text.contains(needle),
3896                _ => false,
3897            }),
3898            MessageData::Assistant { content } => content.iter().any(|c| match c {
3899                AssistantContent::Text { text } => text.contains(needle),
3900                _ => false,
3901            }),
3902            _ => false,
3903        })
3904    }
3905
3906    #[test]
3907    fn test_compaction_full_cycle_callmodel_filters_messages() {
3908        use crate::app::domain::types::CompactionId;
3909
3910        let mut state = test_state();
3911        let session_id = state.session_id;
3912        let model = builtin::claude_sonnet_4_5();
3913
3914        // Build 2 user/assistant turns (4 messages, exceeds MIN_MESSAGES_FOR_COMPACT=3).
3915        // Turn 1:
3916        let op_id_1 = OpId::new();
3917        let _ = reduce(
3918            &mut state,
3919            Action::UserInput {
3920                session_id,
3921                content: vec![UserContent::Text {
3922                    text: "hello".to_string(),
3923                }],
3924                op_id: op_id_1,
3925                message_id: MessageId::new(),
3926                model: model.clone(),
3927                timestamp: 1,
3928            },
3929        );
3930        let _ = reduce(
3931            &mut state,
3932            Action::ModelResponseComplete {
3933                session_id,
3934                op_id: op_id_1,
3935                message_id: MessageId::new(),
3936                content: vec![AssistantContent::Text {
3937                    text: "hi there".to_string(),
3938                }],
3939                usage: None,
3940                context_window_tokens: None,
3941                timestamp: 2,
3942            },
3943        );
3944
3945        // Turn 2:
3946        let op_id_1b = OpId::new();
3947        let _ = reduce(
3948            &mut state,
3949            Action::UserInput {
3950                session_id,
3951                content: vec![UserContent::Text {
3952                    text: "how are you".to_string(),
3953                }],
3954                op_id: op_id_1b,
3955                message_id: MessageId::new(),
3956                model: model.clone(),
3957                timestamp: 3,
3958            },
3959        );
3960        let assistant_msg_id = MessageId::new();
3961        let _ = reduce(
3962            &mut state,
3963            Action::ModelResponseComplete {
3964                session_id,
3965                op_id: op_id_1b,
3966                message_id: assistant_msg_id.clone(),
3967                content: vec![AssistantContent::Text {
3968                    text: "doing well".to_string(),
3969                }],
3970                usage: None,
3971                context_window_tokens: None,
3972                timestamp: 4,
3973            },
3974        );
3975        assert!(
3976            state.current_operation.is_none(),
3977            "operation should be complete after text-only response"
3978        );
3979        assert!(state.message_graph.messages.len() >= 3);
3980
3981        // RequestCompaction — starts Compact operation.
3982        let op_id_2 = OpId::new();
3983        let compact_effects = reduce(
3984            &mut state,
3985            Action::RequestCompaction {
3986                session_id,
3987                op_id: op_id_2,
3988                model: model.clone(),
3989            },
3990        );
3991        assert!(
3992            compact_effects
3993                .iter()
3994                .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3995            "expected Effect::RequestCompaction"
3996        );
3997
3998        // Step 4: CompactionComplete — summary replaces old messages as boundary.
3999        let compaction_id = CompactionId::new();
4000        let summary_message_id = MessageId::new();
4001        let active_before = state.message_graph.active_message_id.clone();
4002        let _ = reduce(
4003            &mut state,
4004            Action::CompactionComplete {
4005                session_id,
4006                op_id: op_id_2,
4007                compaction_id,
4008                summary_message_id: summary_message_id.clone(),
4009                summary: "Summary of conversation.".to_string(),
4010                compacted_head_message_id: assistant_msg_id,
4011                previous_active_message_id: active_before.map(MessageId::from_string),
4012                model: "claude-sonnet-4-5-20250929".to_string(),
4013                timestamp: 3,
4014            },
4015        );
4016        assert!(
4017            state.compaction_summary_ids.contains(&summary_message_id.0),
4018            "compaction_summary_ids should contain the summary id"
4019        );
4020
4021        // Step 5: New UserInput("new question") — triggers CallModel.
4022        let op_id_3 = OpId::new();
4023        let effects = reduce(
4024            &mut state,
4025            Action::UserInput {
4026                session_id,
4027                content: vec![UserContent::Text {
4028                    text: "new question".to_string(),
4029                }],
4030                op_id: op_id_3,
4031                message_id: MessageId::new(),
4032                model: model.clone(),
4033                timestamp: 10,
4034            },
4035        );
4036
4037        // Assertions on CallModel messages.
4038        let messages = extract_callmodel_messages(&effects)
4039            .expect("expected Effect::CallModel from UserInput after compaction");
4040
4041        assert_eq!(
4042            messages.len(),
4043            2,
4044            "CallModel should contain exactly 2 messages (summary + new user msg), got: {:?}",
4045            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4046        );
4047
4048        // First message is the summary (assistant).
4049        assert!(
4050            matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Summary of conversation."))),
4051            "messages[0] should be the compaction summary"
4052        );
4053
4054        // Second message is the new user message.
4055        assert!(
4056            matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == "new question"))),
4057            "messages[1] should be the new user message"
4058        );
4059
4060        // Pre-compaction messages must NOT appear.
4061        assert!(
4062            !messages_contain_text(messages, "hello"),
4063            "CallModel messages must not contain pre-compaction user text 'hello'"
4064        );
4065        assert!(
4066            !messages_contain_text(messages, "hi there"),
4067            "CallModel messages must not contain pre-compaction assistant text 'hi there'"
4068        );
4069        assert!(
4070            !messages_contain_text(messages, "how are you"),
4071            "CallModel messages must not contain pre-compaction user text 'how are you'"
4072        );
4073        assert!(
4074            !messages_contain_text(messages, "doing well"),
4075            "CallModel messages must not contain pre-compaction assistant text 'doing well'"
4076        );
4077    }
4078
4079    #[test]
4080    fn test_compaction_queued_message_callmodel_filters_messages() {
4081        use crate::app::domain::types::CompactionId;
4082
4083        let mut state = test_state();
4084        let session_id = state.session_id;
4085        let model = builtin::claude_sonnet_4_5();
4086
4087        // Build initial conversation with 2 turns (4 msgs, exceeds MIN_MESSAGES_FOR_COMPACT=3).
4088        // Turn 1:
4089        let op_id_1 = OpId::new();
4090        let _ = reduce(
4091            &mut state,
4092            Action::UserInput {
4093                session_id,
4094                content: vec![UserContent::Text {
4095                    text: "msg A".to_string(),
4096                }],
4097                op_id: op_id_1,
4098                message_id: MessageId::new(),
4099                model: model.clone(),
4100                timestamp: 1,
4101            },
4102        );
4103        let _ = reduce(
4104            &mut state,
4105            Action::ModelResponseComplete {
4106                session_id,
4107                op_id: op_id_1,
4108                message_id: MessageId::new(),
4109                content: vec![AssistantContent::Text {
4110                    text: "response A".to_string(),
4111                }],
4112                usage: None,
4113                context_window_tokens: None,
4114                timestamp: 2,
4115            },
4116        );
4117
4118        // Turn 2:
4119        let op_id_1b = OpId::new();
4120        let _ = reduce(
4121            &mut state,
4122            Action::UserInput {
4123                session_id,
4124                content: vec![UserContent::Text {
4125                    text: "msg A2".to_string(),
4126                }],
4127                op_id: op_id_1b,
4128                message_id: MessageId::new(),
4129                model: model.clone(),
4130                timestamp: 3,
4131            },
4132        );
4133        let assistant_msg_id = MessageId::new();
4134        let _ = reduce(
4135            &mut state,
4136            Action::ModelResponseComplete {
4137                session_id,
4138                op_id: op_id_1b,
4139                message_id: assistant_msg_id.clone(),
4140                content: vec![AssistantContent::Text {
4141                    text: "response A2".to_string(),
4142                }],
4143                usage: None,
4144                context_window_tokens: None,
4145                timestamp: 4,
4146            },
4147        );
4148        assert!(state.current_operation.is_none());
4149
4150        // Start compaction — Compact operation is now active.
4151        let op_id_2 = OpId::new();
4152        let _ = reduce(
4153            &mut state,
4154            Action::RequestCompaction {
4155                session_id,
4156                op_id: op_id_2,
4157                model: model.clone(),
4158            },
4159        );
4160        assert!(
4161            state.has_active_operation(),
4162            "Compact operation should be active"
4163        );
4164
4165        // UserInput("msg B") while compaction is in flight — should be queued.
4166        let op_id_3 = OpId::new();
4167        let queued_effects = reduce(
4168            &mut state,
4169            Action::UserInput {
4170                session_id,
4171                content: vec![UserContent::Text {
4172                    text: "msg B".to_string(),
4173                }],
4174                op_id: op_id_3,
4175                message_id: MessageId::new(),
4176                model: model.clone(),
4177                timestamp: 5,
4178            },
4179        );
4180        assert!(
4181            queued_effects.iter().any(|e| matches!(
4182                e,
4183                Effect::EmitEvent {
4184                    event: SessionEvent::QueueUpdated { .. },
4185                    ..
4186                }
4187            )),
4188            "queued user message should emit QueueUpdated"
4189        );
4190
4191        // CompactionComplete — completes compaction AND dequeues "msg B".
4192        let compaction_id = CompactionId::new();
4193        let summary_message_id = MessageId::new();
4194        let active_before = state.message_graph.active_message_id.clone();
4195        let completion_effects = reduce(
4196            &mut state,
4197            Action::CompactionComplete {
4198                session_id,
4199                op_id: op_id_2,
4200                compaction_id,
4201                summary_message_id: summary_message_id.clone(),
4202                summary: "Summary of A.".to_string(),
4203                compacted_head_message_id: assistant_msg_id,
4204                previous_active_message_id: active_before.map(MessageId::from_string),
4205                model: "claude-sonnet-4-5-20250929".to_string(),
4206                timestamp: 6,
4207            },
4208        );
4209
4210        // The dequeued "msg B" should produce a CallModel effect.
4211        let messages = extract_callmodel_messages(&completion_effects)
4212            .expect("CompactionComplete should dequeue 'msg B' and produce CallModel");
4213
4214        assert_eq!(
4215            messages.len(),
4216            2,
4217            "CallModel should contain exactly 2 messages (summary + 'msg B'), got: {:?}",
4218            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4219        );
4220
4221        // Summary message.
4222        assert!(
4223            messages_contain_text(messages, "Summary of A."),
4224            "CallModel messages should contain the compaction summary"
4225        );
4226
4227        // Queued user message.
4228        assert!(
4229            messages_contain_text(messages, "msg B"),
4230            "CallModel messages should contain the dequeued 'msg B'"
4231        );
4232
4233        // Pre-compaction messages must NOT appear.
4234        assert!(
4235            !messages_contain_text(messages, "response A"),
4236            "CallModel messages must not contain pre-compaction 'response A'"
4237        );
4238        assert!(
4239            !messages_contain_text(messages, "response A2"),
4240            "CallModel messages must not contain pre-compaction 'response A2'"
4241        );
4242    }
4243
4244    #[test]
4245    fn test_compaction_multi_round_conversation_then_compact() {
4246        use crate::app::domain::types::CompactionId;
4247
4248        let mut state = test_state();
4249        let session_id = state.session_id;
4250        let model = builtin::claude_sonnet_4_5();
4251
4252        // Build 3 user/assistant turns (6 messages) via reducer actions.
4253        let pre_compaction_texts = [
4254            ("alpha user", "alpha assistant"),
4255            ("beta user", "beta assistant"),
4256            ("gamma user", "gamma assistant"),
4257        ];
4258        let mut last_assistant_msg_id = MessageId::new();
4259        for (i, (user_text, assistant_text)) in pre_compaction_texts.iter().enumerate() {
4260            let op = OpId::new();
4261            let _ = reduce(
4262                &mut state,
4263                Action::UserInput {
4264                    session_id,
4265                    content: vec![UserContent::Text {
4266                        text: (*user_text).to_string(),
4267                    }],
4268                    op_id: op,
4269                    message_id: MessageId::new(),
4270                    model: model.clone(),
4271                    timestamp: (i * 2 + 1) as u64,
4272                },
4273            );
4274            last_assistant_msg_id = MessageId::new();
4275            let _ = reduce(
4276                &mut state,
4277                Action::ModelResponseComplete {
4278                    session_id,
4279                    op_id: op,
4280                    message_id: last_assistant_msg_id.clone(),
4281                    content: vec![AssistantContent::Text {
4282                        text: assistant_text.to_string(),
4283                    }],
4284                    usage: None,
4285                    context_window_tokens: None,
4286                    timestamp: (i * 2 + 2) as u64,
4287                },
4288            );
4289        }
4290        assert_eq!(state.message_graph.messages.len(), 6);
4291        assert!(state.current_operation.is_none());
4292
4293        // Compact the conversation.
4294        let op_compact = OpId::new();
4295        let _ = reduce(
4296            &mut state,
4297            Action::RequestCompaction {
4298                session_id,
4299                op_id: op_compact,
4300                model: model.clone(),
4301            },
4302        );
4303
4304        let compaction_id = CompactionId::new();
4305        let summary_message_id = MessageId::new();
4306        let active_before = state.message_graph.active_message_id.clone();
4307        let _ = reduce(
4308            &mut state,
4309            Action::CompactionComplete {
4310                session_id,
4311                op_id: op_compact,
4312                compaction_id,
4313                summary_message_id: summary_message_id.clone(),
4314                summary: "Summary of greek turns.".to_string(),
4315                compacted_head_message_id: last_assistant_msg_id,
4316                previous_active_message_id: active_before.map(MessageId::from_string),
4317                model: "claude-sonnet-4-5-20250929".to_string(),
4318                timestamp: 100,
4319            },
4320        );
4321
4322        // Add 2 post-compaction turns.
4323        let post_compaction_texts = [
4324            ("delta user", "delta assistant"),
4325            ("epsilon user", "epsilon assistant"),
4326        ];
4327        for (i, (user_text, assistant_text)) in post_compaction_texts.iter().enumerate() {
4328            let op = OpId::new();
4329            let _ = reduce(
4330                &mut state,
4331                Action::UserInput {
4332                    session_id,
4333                    content: vec![UserContent::Text {
4334                        text: (*user_text).to_string(),
4335                    }],
4336                    op_id: op,
4337                    message_id: MessageId::new(),
4338                    model: model.clone(),
4339                    timestamp: (101 + i * 2) as u64,
4340                },
4341            );
4342            let _ = reduce(
4343                &mut state,
4344                Action::ModelResponseComplete {
4345                    session_id,
4346                    op_id: op,
4347                    message_id: MessageId::new(),
4348                    content: vec![AssistantContent::Text {
4349                        text: assistant_text.to_string(),
4350                    }],
4351                    usage: None,
4352                    context_window_tokens: None,
4353                    timestamp: (102 + i * 2) as u64,
4354                },
4355            );
4356        }
4357
4358        // Final UserInput to trigger CallModel.
4359        let final_op = OpId::new();
4360        let effects = reduce(
4361            &mut state,
4362            Action::UserInput {
4363                session_id,
4364                content: vec![UserContent::Text {
4365                    text: "final question".to_string(),
4366                }],
4367                op_id: final_op,
4368                message_id: MessageId::new(),
4369                model: model.clone(),
4370                timestamp: 200,
4371            },
4372        );
4373
4374        let messages =
4375            extract_callmodel_messages(&effects).expect("expected CallModel from final UserInput");
4376
4377        // Expected: summary + 4 post-compaction messages + final user = 6.
4378        assert_eq!(
4379            messages.len(),
4380            6,
4381            "CallModel should have 6 messages (summary + 4 post-compaction + final), got: {:?}",
4382            messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4383        );
4384
4385        // First message is the compaction summary.
4386        assert!(
4387            messages_contain_text(&messages[..1], "Summary of greek turns."),
4388            "first message should be the compaction summary"
4389        );
4390
4391        // Final message is the new user question.
4392        assert!(
4393            messages_contain_text(&messages[5..], "final question"),
4394            "last message should be the final user question"
4395        );
4396
4397        // Post-compaction messages should be present.
4398        assert!(
4399            messages_contain_text(messages, "delta user"),
4400            "should contain post-compaction user text"
4401        );
4402        assert!(
4403            messages_contain_text(messages, "epsilon assistant"),
4404            "should contain post-compaction assistant text"
4405        );
4406
4407        // Pre-compaction messages must NOT appear.
4408        for (user_text, assistant_text) in &pre_compaction_texts {
4409            assert!(
4410                !messages_contain_text(messages, user_text),
4411                "CallModel messages must not contain pre-compaction text '{user_text}'"
4412            );
4413            assert!(
4414                !messages_contain_text(messages, assistant_text),
4415                "CallModel messages must not contain pre-compaction text '{assistant_text}'"
4416            );
4417        }
4418    }
4419}