Skip to main content

steer_core/app/domain/
reduce.rs

1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
4use crate::app::domain::effect::{Effect, McpServerConfig};
5use crate::app::domain::event::{
6    CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
7};
8use crate::app::domain::state::{
9    AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
10};
11use crate::app::domain::types::NonEmptyString;
12use crate::primary_agents::{
13    default_primary_agent_id, primary_agent_spec, resolve_effective_config,
14};
15use crate::session::state::{BackendConfig, ToolDecision};
16use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
17use serde_json::Value;
18use steer_tools::ToolError;
19use steer_tools::result::ToolResult;
20use steer_tools::tools::BASH_TOOL_NAME;
21use thiserror::Error;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum InvalidActionKind {
25    OperationInFlight,
26    MissingSessionConfig,
27    UnknownPrimaryAgent,
28    QueueEmpty,
29}
30
31#[derive(Debug, Error)]
32pub enum ReduceError {
33    #[error("{message}")]
34    InvalidAction {
35        message: String,
36        kind: InvalidActionKind,
37    },
38    #[error("Invariant violated: {message}")]
39    Invariant { message: String },
40}
41
42fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
43    ReduceError::InvalidAction {
44        message: message.into(),
45        kind,
46    }
47}
48
49pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
50    match action {
51        Action::UserInput {
52            session_id,
53            text,
54            op_id,
55            message_id,
56            model,
57            timestamp,
58        } => Ok(handle_user_input(
59            state, session_id, text, op_id, message_id, model, timestamp,
60        )),
61
62        Action::UserEditedMessage {
63            session_id,
64            message_id,
65            new_content,
66            op_id,
67            new_message_id,
68            model,
69            timestamp,
70        } => handle_user_edited_message(
71            state,
72            session_id,
73            UserEditedMessageParams {
74                original_message_id: message_id,
75                new_content,
76                op_id,
77                new_message_id,
78                model,
79                timestamp,
80            },
81        ),
82
83        Action::ToolApprovalRequested {
84            session_id,
85            request_id,
86            tool_call,
87        } => Ok(handle_tool_approval_requested(
88            state, session_id, request_id, tool_call,
89        )),
90
91        Action::ToolApprovalDecided {
92            session_id,
93            request_id,
94            decision,
95            remember,
96        } => Ok(handle_tool_approval_decided(
97            state, session_id, request_id, decision, remember,
98        )),
99
100        Action::ToolExecutionStarted {
101            session_id,
102            tool_call_id,
103            tool_name,
104            tool_parameters,
105        } => Ok(handle_tool_execution_started(
106            state,
107            session_id,
108            tool_call_id,
109            tool_name,
110            tool_parameters,
111        )),
112
113        Action::ToolResult {
114            session_id,
115            tool_call_id,
116            tool_name,
117            result,
118        } => Ok(handle_tool_result(
119            state,
120            session_id,
121            tool_call_id,
122            tool_name,
123            result,
124        )),
125
126        Action::ModelResponseComplete {
127            session_id,
128            op_id,
129            message_id,
130            content,
131            timestamp,
132        } => Ok(handle_model_response_complete(
133            state, session_id, op_id, message_id, content, timestamp,
134        )),
135
136        Action::ModelResponseError {
137            session_id,
138            op_id,
139            error,
140        } => Ok(handle_model_response_error(
141            state, session_id, op_id, &error,
142        )),
143
144        Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
145
146        Action::DirectBashCommand {
147            session_id,
148            op_id,
149            message_id,
150            command,
151            timestamp,
152        } => Ok(handle_direct_bash(
153            state, session_id, op_id, message_id, command, timestamp,
154        )),
155
156        Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
157
158        Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
159
160        Action::RequestCompaction {
161            session_id,
162            op_id,
163            model,
164        } => handle_request_compaction(state, session_id, op_id, model),
165
166        Action::Hydrate {
167            session_id,
168            events,
169            starting_sequence,
170        } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
171
172        Action::WorkspaceFilesListed { files, .. } => {
173            state.workspace_files = files;
174            Ok(vec![])
175        }
176
177        Action::ToolSchemasAvailable { tools, .. } => {
178            state.tools = tools;
179            Ok(vec![])
180        }
181
182        Action::ToolSchemasUpdated { schemas, .. } => {
183            state.tools = schemas;
184            Ok(vec![])
185        }
186
187        Action::SwitchPrimaryAgent {
188            session_id,
189            agent_id,
190        } => handle_switch_primary_agent(state, session_id, agent_id),
191
192        Action::McpServerStateChanged {
193            session_id,
194            server_name,
195            state: new_state,
196        } => {
197            // When connected, merge MCP tools into state.tools
198            if let McpServerState::Connected { tools } = &new_state {
199                let tools = state.session_config.as_ref().map_or_else(
200                    || tools.clone(),
201                    |config| config.filter_tools_by_visibility(tools.clone()),
202                );
203
204                // Add MCP tools that aren't already present (by name)
205                for tool in tools {
206                    if !state.tools.iter().any(|t| t.name == tool.name) {
207                        state.tools.push(tool.clone());
208                    }
209                }
210            }
211
212            // When disconnected or failed, remove tools from that server
213            if matches!(
214                &new_state,
215                McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
216            ) {
217                let prefix = format!("mcp__{server_name}__");
218                state.tools.retain(|t| !t.name.starts_with(&prefix));
219            }
220
221            state
222                .mcp_servers
223                .insert(server_name.clone(), new_state.clone());
224            Ok(vec![Effect::EmitEvent {
225                session_id,
226                event: SessionEvent::McpServerStateChanged {
227                    server_name,
228                    state: new_state,
229                },
230            }])
231        }
232
233        Action::CompactionComplete {
234            session_id,
235            op_id,
236            compaction_id,
237            summary_message_id,
238            summary,
239            compacted_head_message_id,
240            previous_active_message_id,
241            model,
242            timestamp,
243        } => Ok(handle_compaction_complete(
244            state,
245            session_id,
246            CompactionCompleteParams {
247                op_id,
248                compaction_id,
249                summary_message_id,
250                summary,
251                compacted_head_message_id,
252                previous_active_message_id,
253                model_name: model,
254                timestamp,
255            },
256        )),
257
258        Action::CompactionFailed {
259            session_id,
260            op_id,
261            error,
262        } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
263
264        Action::Shutdown => Ok(vec![]),
265    }
266}
267
268fn handle_user_input(
269    state: &mut AppState,
270    session_id: crate::app::domain::types::SessionId,
271    text: crate::app::domain::types::NonEmptyString,
272    op_id: crate::app::domain::types::OpId,
273    message_id: crate::app::domain::types::MessageId,
274    model: crate::config::model::ModelId,
275    timestamp: u64,
276) -> Vec<Effect> {
277    let mut effects = Vec::new();
278
279    if state.has_active_operation() {
280        state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
281            text,
282            op_id,
283            message_id,
284            model,
285            queued_at: timestamp,
286        });
287        effects.push(Effect::EmitEvent {
288            session_id,
289            event: SessionEvent::QueueUpdated {
290                queue: snapshot_queue(state),
291            },
292        });
293        return effects;
294    }
295
296    let parent_id = state.message_graph.active_message_id.clone();
297
298    let message = Message {
299        data: MessageData::User {
300            content: vec![UserContent::Text {
301                text: text.as_str().to_string(),
302            }],
303        },
304        timestamp,
305        id: message_id.0.clone(),
306        parent_message_id: parent_id,
307    };
308
309    state.message_graph.add_message(message.clone());
310    state.message_graph.active_message_id = Some(message_id.0.clone());
311
312    state.start_operation(op_id, OperationKind::AgentLoop);
313    state.operation_models.insert(op_id, model.clone());
314
315    effects.push(Effect::EmitEvent {
316        session_id,
317        event: SessionEvent::UserMessageAdded {
318            message: message.clone(),
319        },
320    });
321
322    effects.push(Effect::EmitEvent {
323        session_id,
324        event: SessionEvent::OperationStarted {
325            op_id,
326            kind: OperationKind::AgentLoop,
327        },
328    });
329
330    effects.push(Effect::CallModel {
331        session_id,
332        op_id,
333        model,
334        messages: state
335            .message_graph
336            .get_thread_messages()
337            .into_iter()
338            .cloned()
339            .collect(),
340        system_context: state.cached_system_context.clone(),
341        tools: state.tools.clone(),
342    });
343
344    effects
345}
346
347struct UserEditedMessageParams {
348    original_message_id: crate::app::domain::types::MessageId,
349    new_content: String,
350    op_id: crate::app::domain::types::OpId,
351    new_message_id: crate::app::domain::types::MessageId,
352    model: crate::config::model::ModelId,
353    timestamp: u64,
354}
355
356fn handle_user_edited_message(
357    state: &mut AppState,
358    session_id: crate::app::domain::types::SessionId,
359    params: UserEditedMessageParams,
360) -> Result<Vec<Effect>, ReduceError> {
361    let UserEditedMessageParams {
362        original_message_id,
363        new_content,
364        op_id,
365        new_message_id,
366        model,
367        timestamp,
368    } = params;
369    let mut effects = Vec::new();
370
371    if state.has_active_operation() {
372        return Err(invalid_action(
373            InvalidActionKind::OperationInFlight,
374            "Cannot edit message while an operation is active.",
375        ));
376    }
377
378    let parent_id = state
379        .message_graph
380        .messages
381        .iter()
382        .find(|m| m.id() == original_message_id.0)
383        .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
384
385    let message = Message {
386        data: MessageData::User {
387            content: vec![UserContent::Text { text: new_content }],
388        },
389        timestamp,
390        id: new_message_id.0.clone(),
391        parent_message_id: parent_id,
392    };
393
394    state.message_graph.add_message(message.clone());
395    state.message_graph.active_message_id = Some(new_message_id.0.clone());
396
397    state.start_operation(op_id, OperationKind::AgentLoop);
398    state.operation_models.insert(op_id, model.clone());
399
400    effects.push(Effect::EmitEvent {
401        session_id,
402        event: SessionEvent::UserMessageAdded {
403            message: message.clone(),
404        },
405    });
406
407    effects.push(Effect::EmitEvent {
408        session_id,
409        event: SessionEvent::OperationStarted {
410            op_id,
411            kind: OperationKind::AgentLoop,
412        },
413    });
414
415    effects.push(Effect::CallModel {
416        session_id,
417        op_id,
418        model,
419        messages: state
420            .message_graph
421            .get_thread_messages()
422            .into_iter()
423            .cloned()
424            .collect(),
425        system_context: state.cached_system_context.clone(),
426        tools: state.tools.clone(),
427    });
428
429    Ok(effects)
430}
431
432fn handle_tool_approval_requested(
433    state: &mut AppState,
434    session_id: crate::app::domain::types::SessionId,
435    request_id: crate::app::domain::types::RequestId,
436    tool_call: steer_tools::ToolCall,
437) -> Vec<Effect> {
438    let mut effects = Vec::new();
439
440    if let Err(error) = validate_tool_call(state, &tool_call) {
441        let error_message = error.to_string();
442        return fail_tool_call_without_execution(
443            state,
444            session_id,
445            tool_call,
446            error,
447            error_message,
448            "invalid",
449            true,
450        );
451    }
452
453    let decision = get_tool_decision(state, &tool_call);
454
455    match decision {
456        ToolDecision::Allow => {
457            let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
458                return vec![Effect::EmitEvent {
459                    session_id,
460                    event: SessionEvent::Error {
461                        message: "Tool approval requested without active operation".to_string(),
462                    },
463                }];
464            };
465            state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
466                &tool_call.id,
467            ));
468
469            effects.push(Effect::ExecuteTool {
470                session_id,
471                op_id,
472                tool_call,
473            });
474        }
475        ToolDecision::Deny => {
476            let error = ToolError::DeniedByPolicy(tool_call.name.clone());
477            let tool_name = tool_call.name.clone();
478            effects.extend(fail_tool_call_without_execution(
479                state,
480                session_id,
481                tool_call,
482                error,
483                format!("Tool '{tool_name}' denied by policy"),
484                "denied",
485                true,
486            ));
487        }
488        ToolDecision::Ask => {
489            if state.pending_approval.is_some() {
490                state.approval_queue.push_back(QueuedApproval { tool_call });
491                return effects;
492            }
493
494            state.pending_approval = Some(PendingApproval {
495                request_id,
496                tool_call: tool_call.clone(),
497            });
498
499            effects.push(Effect::EmitEvent {
500                session_id,
501                event: SessionEvent::ApprovalRequested {
502                    request_id,
503                    tool_call: tool_call.clone(),
504                },
505            });
506
507            effects.push(Effect::RequestUserApproval {
508                session_id,
509                request_id,
510                tool_call,
511            });
512        }
513    }
514
515    effects
516}
517
518fn validate_tool_call(
519    state: &AppState,
520    tool_call: &steer_tools::ToolCall,
521) -> Result<(), ToolError> {
522    if tool_call.name.trim().is_empty() {
523        return Err(ToolError::invalid_params(
524            "unknown",
525            "Malformed tool call: missing tool name",
526        ));
527    }
528
529    if tool_call.id.trim().is_empty() {
530        return Err(ToolError::invalid_params(
531            tool_call.name.clone(),
532            "Malformed tool call: missing tool call id",
533        ));
534    }
535
536    if state.tools.is_empty() {
537        return Ok(());
538    }
539
540    let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
541        return Ok(());
542    };
543
544    validate_against_json_schema(
545        &tool_call.name,
546        schema.input_schema.as_value(),
547        &tool_call.parameters,
548    )
549}
550
551fn validate_against_json_schema(
552    tool_name: &str,
553    schema: &Value,
554    params: &Value,
555) -> Result<(), ToolError> {
556    let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
557        ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
558    })?;
559
560    if let Err(errors) = validator.validate(params) {
561        let message = errors
562            .into_iter()
563            .map(|error| error.to_string())
564            .next()
565            .unwrap_or_else(|| "Parameters do not match schema".to_string());
566        return Err(ToolError::invalid_params(tool_name.to_string(), message));
567    }
568
569    Ok(())
570}
571
572fn emit_tool_failure_message(
573    state: &mut AppState,
574    session_id: crate::app::domain::types::SessionId,
575    tool_call_id: &str,
576    tool_name: &str,
577    tool_error: ToolError,
578    event_error: String,
579    message_id_prefix: &str,
580) -> Vec<Effect> {
581    let mut effects = Vec::new();
582
583    let tool_result = ToolResult::Error(tool_error);
584    let parent_id = state.message_graph.active_message_id.clone();
585    let tool_message = Message {
586        data: MessageData::Tool {
587            tool_use_id: tool_call_id.to_string(),
588            result: tool_result,
589        },
590        timestamp: 0,
591        id: format!("{message_id_prefix}_{tool_call_id}"),
592        parent_message_id: parent_id,
593    };
594    state.message_graph.add_message(tool_message.clone());
595
596    if let Some(model) = state
597        .current_operation
598        .as_ref()
599        .and_then(|op| state.operation_models.get(&op.op_id).cloned())
600    {
601        effects.push(Effect::EmitEvent {
602            session_id,
603            event: SessionEvent::ToolCallFailed {
604                id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
605                name: tool_name.to_string(),
606                error: event_error,
607                model,
608            },
609        });
610    }
611
612    effects.push(Effect::EmitEvent {
613        session_id,
614        event: SessionEvent::ToolMessageAdded {
615            message: tool_message,
616        },
617    });
618
619    effects
620}
621
622fn fail_tool_call_without_execution(
623    state: &mut AppState,
624    session_id: crate::app::domain::types::SessionId,
625    tool_call: steer_tools::ToolCall,
626    tool_error: ToolError,
627    event_error: String,
628    message_id_prefix: &str,
629    call_model_if_ready: bool,
630) -> Vec<Effect> {
631    let mut effects = emit_tool_failure_message(
632        state,
633        session_id,
634        &tool_call.id,
635        &tool_call.name,
636        tool_error,
637        event_error,
638        message_id_prefix,
639    );
640
641    if !call_model_if_ready {
642        return effects;
643    }
644
645    let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
646        effects.push(Effect::EmitEvent {
647            session_id,
648            event: SessionEvent::Error {
649                message: "Tool failure recorded without active operation".to_string(),
650            },
651        });
652        return effects;
653    };
654    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
655        model
656    } else {
657        effects.push(Effect::EmitEvent {
658            session_id,
659            event: SessionEvent::Error {
660                message: format!("Missing model for operation {op_id}"),
661            },
662        });
663        return effects;
664    };
665
666    let all_tools_complete = state
667        .current_operation
668        .as_ref()
669        .is_none_or(|op| op.pending_tool_calls.is_empty());
670    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
671
672    if all_tools_complete && no_pending_approvals {
673        effects.push(Effect::CallModel {
674            session_id,
675            op_id,
676            model,
677            messages: state
678                .message_graph
679                .get_thread_messages()
680                .into_iter()
681                .cloned()
682                .collect(),
683            system_context: state.cached_system_context.clone(),
684            tools: state.tools.clone(),
685        });
686    }
687
688    effects
689}
690
691fn handle_tool_approval_decided(
692    state: &mut AppState,
693    session_id: crate::app::domain::types::SessionId,
694    request_id: crate::app::domain::types::RequestId,
695    decision: ApprovalDecision,
696    remember: Option<ApprovalMemory>,
697) -> Vec<Effect> {
698    let mut effects = Vec::new();
699
700    let pending = match state.pending_approval.take() {
701        Some(p) if p.request_id == request_id => p,
702        other => {
703            state.pending_approval = other;
704            return effects;
705        }
706    };
707
708    let resolved_memory = if decision == ApprovalDecision::Approved {
709        match remember {
710            Some(ApprovalMemory::PendingTool) => {
711                Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
712            }
713            Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
714            Some(ApprovalMemory::BashPattern(pattern)) => {
715                Some(ApprovalMemory::BashPattern(pattern))
716            }
717            None => None,
718        }
719    } else {
720        None
721    };
722
723    effects.push(Effect::EmitEvent {
724        session_id,
725        event: SessionEvent::ApprovalDecided {
726            request_id,
727            decision,
728            remember: resolved_memory.clone(),
729        },
730    });
731
732    if decision == ApprovalDecision::Approved {
733        if let Some(ref memory) = resolved_memory {
734            match memory {
735                ApprovalMemory::Tool(name) => {
736                    state.approved_tools.insert(name.clone());
737                }
738                ApprovalMemory::BashPattern(pattern) => {
739                    state.approved_bash_patterns.insert(pattern.clone());
740                }
741                ApprovalMemory::PendingTool => {}
742            }
743        }
744
745        let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
746            effects.push(Effect::EmitEvent {
747                session_id,
748                event: SessionEvent::Error {
749                    message: "Tool approval decided without active operation".to_string(),
750                },
751            });
752            return effects;
753        };
754        state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
755            &pending.tool_call.id,
756        ));
757
758        effects.push(Effect::ExecuteTool {
759            session_id,
760            op_id,
761            tool_call: pending.tool_call,
762        });
763    } else {
764        let tool_name = pending.tool_call.name.clone();
765        let error = ToolError::DeniedByUser(tool_name.clone());
766        effects.extend(fail_tool_call_without_execution(
767            state,
768            session_id,
769            pending.tool_call,
770            error,
771            format!("Tool '{tool_name}' denied by user"),
772            "denied",
773            false,
774        ));
775    }
776
777    effects.extend(process_next_queued_approval(state, session_id));
778
779    effects
780}
781
782fn process_next_queued_approval(
783    state: &mut AppState,
784    session_id: crate::app::domain::types::SessionId,
785) -> Vec<Effect> {
786    let mut effects = Vec::new();
787
788    while let Some(queued) = state.approval_queue.pop_front() {
789        let decision = get_tool_decision(state, &queued.tool_call);
790
791        match decision {
792            ToolDecision::Allow => {
793                let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
794                    effects.push(Effect::EmitEvent {
795                        session_id,
796                        event: SessionEvent::Error {
797                            message: "Queued tool approval processed without active operation"
798                                .to_string(),
799                        },
800                    });
801                    state.approval_queue.push_front(queued);
802                    break;
803                };
804                state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
805                    &queued.tool_call.id,
806                ));
807
808                effects.push(Effect::ExecuteTool {
809                    session_id,
810                    op_id,
811                    tool_call: queued.tool_call,
812                });
813            }
814            ToolDecision::Deny => {
815                let tool_name = queued.tool_call.name.clone();
816                let error = ToolError::DeniedByPolicy(tool_name.clone());
817                effects.extend(fail_tool_call_without_execution(
818                    state,
819                    session_id,
820                    queued.tool_call,
821                    error,
822                    format!("Tool '{tool_name}' denied by policy"),
823                    "denied",
824                    false,
825                ));
826            }
827            ToolDecision::Ask => {
828                let request_id = crate::app::domain::types::RequestId::new();
829                state.pending_approval = Some(PendingApproval {
830                    request_id,
831                    tool_call: queued.tool_call.clone(),
832                });
833
834                effects.push(Effect::EmitEvent {
835                    session_id,
836                    event: SessionEvent::ApprovalRequested {
837                        request_id,
838                        tool_call: queued.tool_call.clone(),
839                    },
840                });
841
842                effects.push(Effect::RequestUserApproval {
843                    session_id,
844                    request_id,
845                    tool_call: queued.tool_call,
846                });
847
848                break;
849            }
850        }
851    }
852
853    let all_tools_complete = state
854        .current_operation
855        .as_ref()
856        .is_none_or(|op| op.pending_tool_calls.is_empty());
857    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
858
859    if all_tools_complete
860        && no_pending_approvals
861        && let Some(op) = &state.current_operation
862    {
863        let op_id = op.op_id;
864        if let Some(model) = state.operation_models.get(&op_id).cloned() {
865            effects.push(Effect::CallModel {
866                session_id,
867                op_id,
868                model,
869                messages: state
870                    .message_graph
871                    .get_thread_messages()
872                    .into_iter()
873                    .cloned()
874                    .collect(),
875                system_context: state.cached_system_context.clone(),
876                tools: state.tools.clone(),
877            });
878        }
879    }
880
881    effects
882}
883
884fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
885    if state.approved_tools.contains(&tool_call.name) {
886        return ToolDecision::Allow;
887    }
888
889    if tool_call.name == DISPATCH_AGENT_TOOL_NAME
890        && let Ok(params) =
891            serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
892        && let Some(config) = state.session_config.as_ref()
893    {
894        let policy = &config.tool_config.approval_policy;
895        match params.target {
896            DispatchAgentTarget::Resume { .. } => {
897                return ToolDecision::Allow;
898            }
899            DispatchAgentTarget::New { agent, .. } => {
900                let agent_id = agent
901                    .as_deref()
902                    .filter(|value| !value.trim().is_empty())
903                    .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
904                if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
905                    return ToolDecision::Allow;
906                }
907            }
908        }
909    }
910
911    if tool_call.name == BASH_TOOL_NAME
912        && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
913            tool_call.parameters.clone(),
914        )
915        && state.is_bash_pattern_approved(&params.command)
916    {
917        return ToolDecision::Allow;
918    }
919
920    state
921        .session_config
922        .as_ref()
923        .map_or(ToolDecision::Ask, |config| {
924            config
925                .tool_config
926                .approval_policy
927                .tool_decision(&tool_call.name)
928        })
929}
930
931fn handle_tool_execution_started(
932    state: &mut AppState,
933    session_id: crate::app::domain::types::SessionId,
934    tool_call_id: crate::app::domain::types::ToolCallId,
935    tool_name: String,
936    tool_parameters: serde_json::Value,
937) -> Vec<Effect> {
938    state.add_pending_tool_call(tool_call_id.clone());
939
940    let op_id = match state.current_operation.as_ref() {
941        Some(op) => op.op_id,
942        None => {
943            return vec![Effect::EmitEvent {
944                session_id,
945                event: SessionEvent::Error {
946                    message: "Tool call started without active operation".to_string(),
947                },
948            }];
949        }
950    };
951
952    let is_direct_bash = matches!(
953        state.current_operation.as_ref().map(|op| &op.kind),
954        Some(OperationKind::DirectBash { .. })
955    );
956
957    if is_direct_bash {
958        return vec![];
959    }
960
961    let model = match state.operation_models.get(&op_id).cloned() {
962        Some(model) => model,
963        None => {
964            return vec![Effect::EmitEvent {
965                session_id,
966                event: SessionEvent::Error {
967                    message: format!("Missing model for tool call on operation {op_id}"),
968                },
969            }];
970        }
971    };
972
973    vec![Effect::EmitEvent {
974        session_id,
975        event: SessionEvent::ToolCallStarted {
976            id: tool_call_id,
977            name: tool_name,
978            parameters: tool_parameters,
979            model,
980        },
981    }]
982}
983
984fn handle_tool_result(
985    state: &mut AppState,
986    session_id: crate::app::domain::types::SessionId,
987    tool_call_id: crate::app::domain::types::ToolCallId,
988    tool_name: String,
989    result: Result<ToolResult, ToolError>,
990) -> Vec<Effect> {
991    let mut effects = Vec::new();
992
993    let op = match &state.current_operation {
994        Some(op) => {
995            if state.cancelled_ops.contains(&op.op_id) {
996                tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
997                return effects;
998            }
999            op.clone()
1000        }
1001        None => return effects,
1002    };
1003    let op_id = op.op_id;
1004
1005    state.remove_pending_tool_call(&tool_call_id);
1006
1007    let tool_result = match result {
1008        Ok(r) => r,
1009        Err(e) => ToolResult::Error(e),
1010    };
1011
1012    let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1013
1014    if is_direct_bash {
1015        let command = match &op.kind {
1016            OperationKind::DirectBash { command } => command.clone(),
1017            _ => tool_name,
1018        };
1019
1020        let (stdout, stderr, exit_code) = match &tool_result {
1021            ToolResult::Bash(result) => (
1022                result.stdout.clone(),
1023                result.stderr.clone(),
1024                result.exit_code,
1025            ),
1026            ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1027            other => (format!("{other:?}"), String::new(), 0),
1028        };
1029
1030        let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1031            state
1032                .message_graph
1033                .update_command_execution(
1034                    id.as_str(),
1035                    command.clone(),
1036                    stdout.clone(),
1037                    stderr.clone(),
1038                    exit_code,
1039                )
1040                .or_else(|| {
1041                    let parent_id = state.message_graph.active_message_id.clone();
1042                    let timestamp = Message::current_timestamp();
1043                    Some(Message {
1044                        data: MessageData::User {
1045                            content: vec![UserContent::CommandExecution {
1046                                command: command.clone(),
1047                                stdout: stdout.clone(),
1048                                stderr: stderr.clone(),
1049                                exit_code,
1050                            }],
1051                        },
1052                        timestamp,
1053                        id: id.to_string(),
1054                        parent_message_id: parent_id,
1055                    })
1056                })
1057        });
1058
1059        state.complete_operation(op_id);
1060
1061        if let Some(message) = updated {
1062            effects.push(Effect::EmitEvent {
1063                session_id,
1064                event: SessionEvent::MessageUpdated { message },
1065            });
1066        }
1067
1068        effects.push(Effect::EmitEvent {
1069            session_id,
1070            event: SessionEvent::OperationCompleted { op_id },
1071        });
1072
1073        effects.extend(maybe_start_queued_work(state, session_id));
1074
1075        return effects;
1076    }
1077
1078    let model = match state.operation_models.get(&op_id).cloned() {
1079        Some(model) => model,
1080        None => {
1081            return vec![Effect::EmitEvent {
1082                session_id,
1083                event: SessionEvent::Error {
1084                    message: format!("Missing model for tool result on operation {op_id}"),
1085                },
1086            }];
1087        }
1088    };
1089
1090    let event = match &tool_result {
1091        ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1092            id: tool_call_id.clone(),
1093            name: tool_name.clone(),
1094            error: e.to_string(),
1095            model: model.clone(),
1096        },
1097        _ => SessionEvent::ToolCallCompleted {
1098            id: tool_call_id.clone(),
1099            name: tool_name,
1100            result: tool_result.clone(),
1101            model: model.clone(),
1102        },
1103    };
1104
1105    effects.push(Effect::EmitEvent { session_id, event });
1106
1107    let parent_id = state.message_graph.active_message_id.clone();
1108    let tool_message = Message {
1109        data: MessageData::Tool {
1110            tool_use_id: tool_call_id.0.clone(),
1111            result: tool_result,
1112        },
1113        timestamp: 0,
1114        id: format!("tool_result_{}", tool_call_id.0),
1115        parent_message_id: parent_id,
1116    };
1117    state.message_graph.add_message(tool_message.clone());
1118
1119    effects.push(Effect::EmitEvent {
1120        session_id,
1121        event: SessionEvent::ToolMessageAdded {
1122            message: tool_message,
1123        },
1124    });
1125
1126    let all_tools_complete = state
1127        .current_operation
1128        .as_ref()
1129        .is_none_or(|op| op.pending_tool_calls.is_empty());
1130    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1131
1132    if all_tools_complete && no_pending_approvals {
1133        effects.push(Effect::CallModel {
1134            session_id,
1135            op_id,
1136            model,
1137            messages: state
1138                .message_graph
1139                .get_thread_messages()
1140                .into_iter()
1141                .cloned()
1142                .collect(),
1143            system_context: state.cached_system_context.clone(),
1144            tools: state.tools.clone(),
1145        });
1146    }
1147
1148    effects
1149}
1150
1151fn handle_model_response_complete(
1152    state: &mut AppState,
1153    session_id: crate::app::domain::types::SessionId,
1154    op_id: crate::app::domain::types::OpId,
1155    message_id: crate::app::domain::types::MessageId,
1156    content: Vec<AssistantContent>,
1157    timestamp: u64,
1158) -> Vec<Effect> {
1159    let mut effects = Vec::new();
1160
1161    if state.cancelled_ops.contains(&op_id) {
1162        tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1163        return effects;
1164    }
1165
1166    let tool_calls: Vec<_> = content
1167        .iter()
1168        .filter_map(|c| {
1169            if let AssistantContent::ToolCall { tool_call, .. } = c {
1170                Some(tool_call.clone())
1171            } else {
1172                None
1173            }
1174        })
1175        .collect();
1176
1177    let parent_id = state.message_graph.active_message_id.clone();
1178
1179    let message = Message {
1180        data: MessageData::Assistant {
1181            content: content.clone(),
1182        },
1183        timestamp,
1184        id: message_id.0.clone(),
1185        parent_message_id: parent_id,
1186    };
1187
1188    state.message_graph.add_message(message.clone());
1189    state.message_graph.active_message_id = Some(message_id.0.clone());
1190
1191    let model = match state.operation_models.get(&op_id).cloned() {
1192        Some(model) => model,
1193        None => {
1194            return vec![Effect::EmitEvent {
1195                session_id,
1196                event: SessionEvent::Error {
1197                    message: format!("Missing model for operation {op_id}"),
1198                },
1199            }];
1200        }
1201    };
1202
1203    effects.push(Effect::EmitEvent {
1204        session_id,
1205        event: SessionEvent::AssistantMessageAdded { message, model },
1206    });
1207
1208    if tool_calls.is_empty() {
1209        state.complete_operation(op_id);
1210        effects.push(Effect::EmitEvent {
1211            session_id,
1212            event: SessionEvent::OperationCompleted { op_id },
1213        });
1214        effects.extend(maybe_start_queued_work(state, session_id));
1215    } else {
1216        for tool_call in tool_calls {
1217            let request_id = crate::app::domain::types::RequestId::new();
1218            effects.extend(handle_tool_approval_requested(
1219                state, session_id, request_id, tool_call,
1220            ));
1221        }
1222    }
1223
1224    effects
1225}
1226
1227fn handle_model_response_error(
1228    state: &mut AppState,
1229    session_id: crate::app::domain::types::SessionId,
1230    op_id: crate::app::domain::types::OpId,
1231    error: &str,
1232) -> Vec<Effect> {
1233    let mut effects = Vec::new();
1234
1235    if state.cancelled_ops.contains(&op_id) {
1236        return effects;
1237    }
1238
1239    state.complete_operation(op_id);
1240
1241    effects.push(Effect::EmitEvent {
1242        session_id,
1243        event: SessionEvent::Error {
1244            message: error.to_string(),
1245        },
1246    });
1247
1248    effects.push(Effect::EmitEvent {
1249        session_id,
1250        event: SessionEvent::OperationCompleted { op_id },
1251    });
1252
1253    effects.extend(maybe_start_queued_work(state, session_id));
1254
1255    effects
1256}
1257
1258fn handle_direct_bash(
1259    state: &mut AppState,
1260    session_id: crate::app::domain::types::SessionId,
1261    op_id: crate::app::domain::types::OpId,
1262    message_id: crate::app::domain::types::MessageId,
1263    command: String,
1264    timestamp: u64,
1265) -> Vec<Effect> {
1266    let mut effects = Vec::new();
1267
1268    if state.has_active_operation() {
1269        state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1270            command,
1271            op_id,
1272            message_id,
1273            queued_at: timestamp,
1274        });
1275        effects.push(Effect::EmitEvent {
1276            session_id,
1277            event: SessionEvent::QueueUpdated {
1278                queue: snapshot_queue(state),
1279            },
1280        });
1281        return effects;
1282    }
1283
1284    let parent_id = state.message_graph.active_message_id.clone();
1285    let message = Message {
1286        data: MessageData::User {
1287            content: vec![UserContent::CommandExecution {
1288                command: command.clone(),
1289                stdout: String::new(),
1290                stderr: String::new(),
1291                exit_code: 0,
1292            }],
1293        },
1294        timestamp,
1295        id: message_id.0.clone(),
1296        parent_message_id: parent_id,
1297    };
1298
1299    state.message_graph.add_message(message.clone());
1300    state.message_graph.active_message_id = Some(message_id.0.clone());
1301
1302    state.start_operation(
1303        op_id,
1304        OperationKind::DirectBash {
1305            command: command.clone(),
1306        },
1307    );
1308    state.operation_messages.insert(op_id, message_id);
1309
1310    effects.push(Effect::EmitEvent {
1311        session_id,
1312        event: SessionEvent::UserMessageAdded { message },
1313    });
1314
1315    effects.push(Effect::EmitEvent {
1316        session_id,
1317        event: SessionEvent::OperationStarted {
1318            op_id,
1319            kind: OperationKind::DirectBash {
1320                command: command.clone(),
1321            },
1322        },
1323    });
1324
1325    let tool_call = steer_tools::ToolCall {
1326        id: format!("direct_bash_{op_id}"),
1327        name: BASH_TOOL_NAME.to_string(),
1328        parameters: serde_json::json!({ "command": command }),
1329    };
1330
1331    effects.push(Effect::ExecuteTool {
1332        session_id,
1333        op_id,
1334        tool_call,
1335    });
1336
1337    effects
1338}
1339
1340fn handle_dequeue_queued_item(
1341    state: &mut AppState,
1342    session_id: crate::app::domain::types::SessionId,
1343) -> Result<Vec<Effect>, ReduceError> {
1344    if state.pop_next_queued_work().is_some() {
1345        Ok(vec![Effect::EmitEvent {
1346            session_id,
1347            event: SessionEvent::QueueUpdated {
1348                queue: snapshot_queue(state),
1349            },
1350        }])
1351    } else {
1352        Err(invalid_action(
1353            InvalidActionKind::QueueEmpty,
1354            "No queued item to remove.",
1355        ))
1356    }
1357}
1358
1359fn maybe_start_queued_work(
1360    state: &mut AppState,
1361    session_id: crate::app::domain::types::SessionId,
1362) -> Vec<Effect> {
1363    if state.has_active_operation() {
1364        return vec![];
1365    }
1366
1367    let Some(next) = state.pop_next_queued_work() else {
1368        return vec![];
1369    };
1370
1371    let mut effects = vec![Effect::EmitEvent {
1372        session_id,
1373        event: SessionEvent::QueueUpdated {
1374            queue: snapshot_queue(state),
1375        },
1376    }];
1377
1378    match next {
1379        QueuedWorkItem::UserMessage(item) => {
1380            effects.extend(handle_user_input(
1381                state,
1382                session_id,
1383                item.text,
1384                item.op_id,
1385                item.message_id,
1386                item.model,
1387                item.queued_at,
1388            ));
1389        }
1390        QueuedWorkItem::DirectBash(item) => {
1391            effects.extend(handle_direct_bash(
1392                state,
1393                session_id,
1394                item.op_id,
1395                item.message_id,
1396                item.command,
1397                item.queued_at,
1398            ));
1399        }
1400    }
1401
1402    effects
1403}
1404
1405fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1406    state
1407        .queued_work
1408        .iter()
1409        .map(snapshot_queued_work_item)
1410        .collect()
1411}
1412
1413fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1414    match item {
1415        QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1416            kind: Some(QueuedWorkKind::UserMessage),
1417            content: message.text.as_str().to_string(),
1418            queued_at: message.queued_at,
1419            model: Some(message.model.clone()),
1420            op_id: message.op_id,
1421            message_id: message.message_id.clone(),
1422        },
1423        QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1424            kind: Some(QueuedWorkKind::DirectBash),
1425            content: command.command.clone(),
1426            queued_at: command.queued_at,
1427            model: None,
1428            op_id: command.op_id,
1429            message_id: command.message_id.clone(),
1430        },
1431    }
1432}
1433
1434fn handle_request_compaction(
1435    state: &mut AppState,
1436    session_id: crate::app::domain::types::SessionId,
1437    op_id: crate::app::domain::types::OpId,
1438    model: crate::config::model::ModelId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440    const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1441    let message_count = state.message_graph.get_thread_messages().len();
1442
1443    if state.has_active_operation() {
1444        return Err(invalid_action(
1445            InvalidActionKind::OperationInFlight,
1446            "Cannot compact while an operation is active.",
1447        ));
1448    }
1449
1450    if message_count < MIN_MESSAGES_FOR_COMPACT {
1451        return Ok(vec![Effect::EmitEvent {
1452            session_id,
1453            event: SessionEvent::CompactResult {
1454                result: crate::app::domain::event::CompactResult::InsufficientMessages,
1455            },
1456        }]);
1457    }
1458
1459    state.start_operation(op_id, OperationKind::Compact);
1460    state.operation_models.insert(op_id, model.clone());
1461
1462    Ok(vec![
1463        Effect::EmitEvent {
1464            session_id,
1465            event: SessionEvent::OperationStarted {
1466                op_id,
1467                kind: OperationKind::Compact,
1468            },
1469        },
1470        Effect::RequestCompaction {
1471            session_id,
1472            op_id,
1473            model,
1474        },
1475    ])
1476}
1477
1478fn handle_cancel(
1479    state: &mut AppState,
1480    session_id: crate::app::domain::types::SessionId,
1481    target_op: Option<crate::app::domain::types::OpId>,
1482) -> Vec<Effect> {
1483    let mut effects = Vec::new();
1484
1485    let op = match &state.current_operation {
1486        Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1487        _ => return effects,
1488    };
1489
1490    state.record_cancelled_op(op.op_id);
1491
1492    if matches!(op.kind, OperationKind::Compact) {
1493        effects.push(Effect::EmitEvent {
1494            session_id,
1495            event: SessionEvent::CompactResult {
1496                result: crate::app::domain::event::CompactResult::Cancelled,
1497            },
1498        });
1499    }
1500
1501    let pending_approval = state.pending_approval.take();
1502    let queued_approvals = std::mem::take(&mut state.approval_queue);
1503
1504    if matches!(op.kind, OperationKind::AgentLoop) {
1505        if let Some(pending) = pending_approval {
1506            let tool_name = pending.tool_call.name.clone();
1507            effects.extend(emit_tool_failure_message(
1508                state,
1509                session_id,
1510                &pending.tool_call.id,
1511                &tool_name,
1512                ToolError::Cancelled(tool_name.clone()),
1513                format!("Tool '{tool_name}' cancelled"),
1514                "cancelled",
1515            ));
1516        }
1517
1518        for queued in queued_approvals {
1519            let tool_name = queued.tool_call.name.clone();
1520            effects.extend(emit_tool_failure_message(
1521                state,
1522                session_id,
1523                &queued.tool_call.id,
1524                &tool_name,
1525                ToolError::Cancelled(tool_name.clone()),
1526                format!("Tool '{tool_name}' cancelled"),
1527                "cancelled",
1528            ));
1529        }
1530
1531        for tool_call_id in &op.pending_tool_calls {
1532            let tool_name = state
1533                .message_graph
1534                .find_tool_name_by_id(tool_call_id.as_str())
1535                .unwrap_or_else(|| tool_call_id.as_str().to_string());
1536            let event_error = if tool_name == tool_call_id.as_str() {
1537                format!("Tool call '{tool_call_id}' cancelled")
1538            } else {
1539                format!("Tool '{tool_name}' cancelled")
1540            };
1541            effects.extend(emit_tool_failure_message(
1542                state,
1543                session_id,
1544                tool_call_id.as_str(),
1545                &tool_name,
1546                ToolError::Cancelled(tool_name.clone()),
1547                event_error,
1548                "cancelled",
1549            ));
1550        }
1551    }
1552    state.active_streams.remove(&op.op_id);
1553
1554    let dequeued_item = state.pop_next_queued_work();
1555    let popped_queued_item = dequeued_item
1556        .as_ref()
1557        .map(snapshot_queued_work_item);
1558
1559    effects.push(Effect::EmitEvent {
1560        session_id,
1561        event: SessionEvent::OperationCancelled {
1562            op_id: op.op_id,
1563            info: CancellationInfo {
1564                pending_tool_calls: op.pending_tool_calls.len(),
1565                popped_queued_item,
1566            },
1567        },
1568    });
1569
1570    effects.push(Effect::CancelOperation {
1571        session_id,
1572        op_id: op.op_id,
1573    });
1574
1575    state.complete_operation(op.op_id);
1576
1577    if dequeued_item.is_some() {
1578        effects.push(Effect::EmitEvent {
1579            session_id,
1580            event: SessionEvent::QueueUpdated {
1581                queue: snapshot_queue(state),
1582            },
1583        });
1584    }
1585
1586    effects
1587}
1588
1589fn handle_hydrate(
1590    state: &mut AppState,
1591    session_id: crate::app::domain::types::SessionId,
1592    events: Vec<SessionEvent>,
1593    starting_sequence: u64,
1594) -> Vec<Effect> {
1595    for event in events {
1596        apply_event_to_state(state, &event);
1597    }
1598
1599    state.event_sequence = starting_sequence;
1600
1601    emit_mcp_connect_effects(state, session_id)
1602}
1603
1604fn handle_switch_primary_agent(
1605    state: &mut AppState,
1606    session_id: crate::app::domain::types::SessionId,
1607    agent_id: String,
1608) -> Result<Vec<Effect>, ReduceError> {
1609    if state.current_operation.is_some() {
1610        return Err(invalid_action(
1611            InvalidActionKind::OperationInFlight,
1612            "Cannot switch primary agent while an operation is active.",
1613        ));
1614    }
1615
1616    let Some(base_config) = state
1617        .base_session_config
1618        .as_ref()
1619        .or(state.session_config.as_ref())
1620    else {
1621        return Err(invalid_action(
1622            InvalidActionKind::MissingSessionConfig,
1623            "Cannot switch primary agent without session config.",
1624        ));
1625    };
1626
1627    let Some(_spec) = primary_agent_spec(&agent_id) else {
1628        return Err(invalid_action(
1629            InvalidActionKind::UnknownPrimaryAgent,
1630            format!("Unknown primary agent '{agent_id}'."),
1631        ));
1632    };
1633
1634    let mut updated_config = base_config.clone();
1635    updated_config.primary_agent_id = Some(agent_id.clone());
1636    let new_config = resolve_effective_config(&updated_config);
1637    let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1638
1639    apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1640
1641    let mut effects = Vec::new();
1642    effects.push(Effect::EmitEvent {
1643        session_id,
1644        event: SessionEvent::SessionConfigUpdated {
1645            config: Box::new(new_config),
1646            primary_agent_id: agent_id,
1647        },
1648    });
1649    effects.extend(backend_effects);
1650    effects.push(Effect::ReloadToolSchemas { session_id });
1651
1652    Ok(effects)
1653}
1654
1655fn apply_session_config_state(
1656    state: &mut AppState,
1657    config: &crate::session::state::SessionConfig,
1658    primary_agent_id: Option<String>,
1659    update_base: bool,
1660) {
1661    state.apply_session_config(config, primary_agent_id, update_base);
1662}
1663
1664fn mcp_backend_diff_effects(
1665    session_id: crate::app::domain::types::SessionId,
1666    old_config: &crate::session::state::SessionConfig,
1667    new_config: &crate::session::state::SessionConfig,
1668) -> Vec<Effect> {
1669    let old_map = collect_mcp_backends(old_config);
1670    let new_map = collect_mcp_backends(new_config);
1671
1672    let mut effects = Vec::new();
1673
1674    for (server_name, (old_transport, old_filter)) in &old_map {
1675        match new_map.get(server_name) {
1676            None => {
1677                effects.push(Effect::DisconnectMcpServer {
1678                    session_id,
1679                    server_name: server_name.clone(),
1680                });
1681            }
1682            Some((new_transport, new_filter)) => {
1683                if new_transport != old_transport || new_filter != old_filter {
1684                    effects.push(Effect::DisconnectMcpServer {
1685                        session_id,
1686                        server_name: server_name.clone(),
1687                    });
1688                    effects.push(Effect::ConnectMcpServer {
1689                        session_id,
1690                        config: McpServerConfig {
1691                            server_name: server_name.clone(),
1692                            transport: new_transport.clone(),
1693                            tool_filter: new_filter.clone(),
1694                        },
1695                    });
1696                }
1697            }
1698        }
1699    }
1700
1701    for (server_name, (new_transport, new_filter)) in &new_map {
1702        if !old_map.contains_key(server_name) {
1703            effects.push(Effect::ConnectMcpServer {
1704                session_id,
1705                config: McpServerConfig {
1706                    server_name: server_name.clone(),
1707                    transport: new_transport.clone(),
1708                    tool_filter: new_filter.clone(),
1709                },
1710            });
1711        }
1712    }
1713
1714    effects
1715}
1716
1717fn collect_mcp_backends(
1718    config: &crate::session::state::SessionConfig,
1719) -> std::collections::HashMap<
1720    String,
1721    (
1722        crate::tools::McpTransport,
1723        crate::session::state::ToolFilter,
1724    ),
1725> {
1726    let mut map = std::collections::HashMap::new();
1727
1728    for backend_config in &config.tool_config.backends {
1729        let BackendConfig::Mcp {
1730            server_name,
1731            transport,
1732            tool_filter,
1733        } = backend_config;
1734
1735        map.insert(
1736            server_name.clone(),
1737            (transport.clone(), tool_filter.clone()),
1738        );
1739    }
1740
1741    map
1742}
1743
1744pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1745    match event {
1746        SessionEvent::SessionCreated { config, .. } => {
1747            let primary_agent_id = config
1748                .primary_agent_id
1749                .clone()
1750                .unwrap_or_else(|| default_primary_agent_id().to_string());
1751            apply_session_config_state(state, config, Some(primary_agent_id), true);
1752        }
1753        SessionEvent::SessionConfigUpdated {
1754            config,
1755            primary_agent_id,
1756        } => {
1757            apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1758        }
1759        SessionEvent::AssistantMessageAdded { message, .. }
1760        | SessionEvent::UserMessageAdded { message }
1761        | SessionEvent::ToolMessageAdded { message } => {
1762            state.message_graph.add_message(message.clone());
1763            state.message_graph.active_message_id = Some(message.id().to_string());
1764        }
1765        SessionEvent::MessageUpdated { message } => {
1766            state.message_graph.replace_message(message.clone());
1767        }
1768        SessionEvent::ApprovalDecided {
1769            decision, remember, ..
1770        } => {
1771            if *decision == ApprovalDecision::Approved
1772                && let Some(memory) = remember
1773            {
1774                match memory {
1775                    ApprovalMemory::Tool(name) => {
1776                        state.approved_tools.insert(name.clone());
1777                    }
1778                    ApprovalMemory::BashPattern(pattern) => {
1779                        state.approved_bash_patterns.insert(pattern.clone());
1780                    }
1781                    ApprovalMemory::PendingTool => {}
1782                }
1783            }
1784            state.pending_approval = None;
1785        }
1786        SessionEvent::OperationCompleted { op_id } => {
1787            state.complete_operation(*op_id);
1788        }
1789        SessionEvent::OperationCancelled { op_id, .. } => {
1790            state.record_cancelled_op(*op_id);
1791            state.complete_operation(*op_id);
1792        }
1793        SessionEvent::McpServerStateChanged {
1794            server_name,
1795            state: mcp_state,
1796        } => {
1797            state
1798                .mcp_servers
1799                .insert(server_name.clone(), mcp_state.clone());
1800        }
1801        SessionEvent::QueueUpdated { queue } => {
1802            let normalize_text = |content: &str| {
1803                NonEmptyString::new(content.to_string())
1804                    .or_else(|| NonEmptyString::new("(empty)".to_string()))
1805            };
1806
1807            state.queued_work = queue
1808                .iter()
1809                .filter_map(|item| match item.kind {
1810                    Some(QueuedWorkKind::UserMessage) => {
1811                        let text = normalize_text(item.content.as_str())?;
1812                        Some(QueuedWorkItem::UserMessage(
1813                            crate::app::domain::state::QueuedUserMessage {
1814                                text,
1815                                op_id: item.op_id,
1816                                message_id: item.message_id.clone(),
1817                                model: item.model.clone().unwrap_or_else(
1818                                    crate::config::model::builtin::claude_sonnet_4_5,
1819                                ),
1820                                queued_at: item.queued_at,
1821                            },
1822                        ))
1823                    }
1824                    Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1825                        crate::app::domain::state::QueuedBashCommand {
1826                            command: item.content.clone(),
1827                            op_id: item.op_id,
1828                            message_id: item.message_id.clone(),
1829                            queued_at: item.queued_at,
1830                        },
1831                    )),
1832                    None => {
1833                        let text = normalize_text(item.content.as_str())?;
1834                        Some(QueuedWorkItem::UserMessage(
1835                            crate::app::domain::state::QueuedUserMessage {
1836                                text,
1837                                op_id: item.op_id,
1838                                message_id: item.message_id.clone(),
1839                                model: item.model.clone().unwrap_or_else(
1840                                    crate::config::model::builtin::claude_sonnet_4_5,
1841                                ),
1842                                queued_at: item.queued_at,
1843                            },
1844                        ))
1845                    }
1846                })
1847                .collect();
1848        }
1849        _ => {}
1850    }
1851
1852    state.event_sequence += 1;
1853}
1854
1855struct CompactionCompleteParams {
1856    op_id: crate::app::domain::types::OpId,
1857    compaction_id: crate::app::domain::types::CompactionId,
1858    summary_message_id: crate::app::domain::types::MessageId,
1859    summary: String,
1860    compacted_head_message_id: crate::app::domain::types::MessageId,
1861    previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1862    model_name: String,
1863    timestamp: u64,
1864}
1865
1866fn handle_compaction_complete(
1867    state: &mut AppState,
1868    session_id: crate::app::domain::types::SessionId,
1869    params: CompactionCompleteParams,
1870) -> Vec<Effect> {
1871    use crate::app::conversation::{AssistantContent, Message, MessageData};
1872    use crate::app::domain::types::CompactionRecord;
1873
1874    let CompactionCompleteParams {
1875        op_id,
1876        compaction_id,
1877        summary_message_id,
1878        summary,
1879        compacted_head_message_id,
1880        previous_active_message_id,
1881        model_name,
1882        timestamp,
1883    } = params;
1884
1885    let summary_message = Message {
1886        data: MessageData::Assistant {
1887            content: vec![AssistantContent::Text {
1888                text: summary.clone(),
1889            }],
1890        },
1891        id: summary_message_id.to_string(),
1892        parent_message_id: None,
1893        timestamp,
1894    };
1895
1896    state.message_graph.add_message(summary_message.clone());
1897
1898    let record = CompactionRecord::with_timestamp(
1899        compaction_id,
1900        summary_message_id,
1901        compacted_head_message_id,
1902        previous_active_message_id,
1903        model_name,
1904        timestamp,
1905    );
1906
1907    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1908        model
1909    } else {
1910        state.complete_operation(op_id);
1911        return vec![Effect::EmitEvent {
1912            session_id,
1913            event: SessionEvent::Error {
1914                message: format!("Missing model for compaction operation {op_id}"),
1915            },
1916        }];
1917    };
1918
1919    state.complete_operation(op_id);
1920
1921    let mut effects = vec![
1922        Effect::EmitEvent {
1923            session_id,
1924            event: SessionEvent::AssistantMessageAdded {
1925                message: summary_message,
1926                model,
1927            },
1928        },
1929        Effect::EmitEvent {
1930            session_id,
1931            event: SessionEvent::CompactResult {
1932                result: crate::app::domain::event::CompactResult::Success(summary),
1933            },
1934        },
1935        Effect::EmitEvent {
1936            session_id,
1937            event: SessionEvent::ConversationCompacted { record },
1938        },
1939        Effect::EmitEvent {
1940            session_id,
1941            event: SessionEvent::OperationCompleted { op_id },
1942        },
1943    ];
1944
1945    effects.extend(maybe_start_queued_work(state, session_id));
1946
1947    effects
1948}
1949
1950fn handle_compaction_failed(
1951    state: &mut AppState,
1952    session_id: crate::app::domain::types::SessionId,
1953    op_id: crate::app::domain::types::OpId,
1954    error: String,
1955) -> Vec<Effect> {
1956    state.complete_operation(op_id);
1957
1958    let mut effects = vec![
1959        Effect::EmitEvent {
1960            session_id,
1961            event: SessionEvent::Error { message: error },
1962        },
1963        Effect::EmitEvent {
1964            session_id,
1965            event: SessionEvent::OperationCompleted { op_id },
1966        },
1967    ];
1968
1969    effects.extend(maybe_start_queued_work(state, session_id));
1970
1971    effects
1972}
1973
1974fn emit_mcp_connect_effects(
1975    state: &AppState,
1976    session_id: crate::app::domain::types::SessionId,
1977) -> Vec<Effect> {
1978    let mut effects = Vec::new();
1979
1980    let Some(ref config) = state.session_config else {
1981        return effects;
1982    };
1983
1984    for backend_config in &config.tool_config.backends {
1985        let BackendConfig::Mcp {
1986            server_name,
1987            transport,
1988            tool_filter,
1989        } = backend_config;
1990
1991        let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
1992            matches!(
1993                s,
1994                McpServerState::Connecting | McpServerState::Connected { .. }
1995            )
1996        });
1997
1998        if !already_connected {
1999            effects.push(Effect::ConnectMcpServer {
2000                session_id,
2001                config: McpServerConfig {
2002                    server_name: server_name.clone(),
2003                    transport: transport.clone(),
2004                    tool_filter: tool_filter.clone(),
2005                },
2006            });
2007        }
2008    }
2009
2010    effects
2011}
2012
2013#[cfg(test)]
2014mod tests {
2015    use super::*;
2016    use crate::app::domain::state::{OperationState, PendingApproval};
2017    use crate::app::domain::types::{
2018        MessageId, NonEmptyString, OpId, RequestId, SessionId, ToolCallId,
2019    };
2020    use crate::config::model::builtin;
2021    use crate::primary_agents::resolve_effective_config;
2022    use crate::session::state::{
2023        ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2024        ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2025    };
2026    use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2027    use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2028    use schemars::schema_for;
2029    use serde_json::json;
2030    use std::collections::HashSet;
2031    use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2032
2033    fn test_state() -> AppState {
2034        AppState::new(SessionId::new())
2035    }
2036
2037    fn test_schema(name: &str) -> ToolSchema {
2038        ToolSchema {
2039            name: name.to_string(),
2040            display_name: name.to_string(),
2041            description: String::new(),
2042            input_schema: InputSchema::empty_object(),
2043        }
2044    }
2045
2046    fn base_session_config() -> SessionConfig {
2047        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2048        config.primary_agent_id = Some("normal".to_string());
2049        config.policy_overrides = SessionPolicyOverrides::empty();
2050        resolve_effective_config(&config)
2051    }
2052
2053    fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2054        super::reduce(state, action).expect("reduce failed")
2055    }
2056
2057    #[test]
2058    fn test_user_input_starts_operation() {
2059        let mut state = test_state();
2060        let session_id = state.session_id;
2061        let op_id = OpId::new();
2062        let message_id = MessageId::new();
2063        let model = builtin::claude_sonnet_4_5();
2064
2065        let effects = reduce(
2066            &mut state,
2067            Action::UserInput {
2068                session_id,
2069                text: NonEmptyString::new("Hello").unwrap(),
2070                op_id,
2071                message_id,
2072                model,
2073                timestamp: 1_234_567_890,
2074            },
2075        );
2076
2077        assert_eq!(state.message_graph.messages.len(), 1);
2078        assert!(state.current_operation.is_some());
2079        assert!(
2080            effects
2081                .iter()
2082                .any(|e| matches!(e, Effect::CallModel { .. }))
2083        );
2084    }
2085
2086    #[test]
2087    fn test_switch_primary_agent_updates_visibility() {
2088        let mut state = test_state();
2089        let session_id = state.session_id;
2090        let config = base_session_config();
2091        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2092
2093        let effects = reduce(
2094            &mut state,
2095            Action::SwitchPrimaryAgent {
2096                session_id,
2097                agent_id: "plan".to_string(),
2098            },
2099        );
2100
2101        let updated = state.session_config.as_ref().expect("config");
2102        match &updated.tool_config.visibility {
2103            ToolVisibility::Whitelist(allowed) => {
2104                assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2105                for name in READ_ONLY_TOOL_NAMES {
2106                    assert!(allowed.contains(*name));
2107                }
2108                assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2109            }
2110            other => panic!("Unexpected tool visibility: {other:?}"),
2111        }
2112        assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2113        assert!(effects.iter().any(|e| matches!(
2114            e,
2115            Effect::EmitEvent {
2116                event: SessionEvent::SessionConfigUpdated { .. },
2117                ..
2118            }
2119        )));
2120        assert!(
2121            effects
2122                .iter()
2123                .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2124        );
2125    }
2126
2127    #[test]
2128    fn test_switch_primary_agent_yolo_auto_approves() {
2129        let mut state = test_state();
2130        let session_id = state.session_id;
2131        let config = base_session_config();
2132        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2133
2134        let _ = reduce(
2135            &mut state,
2136            Action::SwitchPrimaryAgent {
2137                session_id,
2138                agent_id: "yolo".to_string(),
2139            },
2140        );
2141
2142        let updated = state.session_config.as_ref().expect("config");
2143        assert_eq!(
2144            updated.tool_config.approval_policy.default_behavior,
2145            UnapprovedBehavior::Allow
2146        );
2147    }
2148
2149    #[test]
2150    fn test_switch_primary_agent_preserves_policy_overrides() {
2151        let mut state = test_state();
2152        let session_id = state.session_id;
2153
2154        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2155        config.primary_agent_id = Some("normal".to_string());
2156        config.policy_overrides = SessionPolicyOverrides {
2157            default_model: None,
2158            tool_visibility: None,
2159            approval_policy: ToolApprovalPolicyOverrides {
2160                default_behavior: Some(UnapprovedBehavior::Deny),
2161                preapproved: ApprovalRulesOverrides::empty(),
2162            },
2163        };
2164        let config = resolve_effective_config(&config);
2165        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2166
2167        let _ = reduce(
2168            &mut state,
2169            Action::SwitchPrimaryAgent {
2170                session_id,
2171                agent_id: "yolo".to_string(),
2172            },
2173        );
2174
2175        let updated = state.session_config.as_ref().expect("config");
2176        assert_eq!(
2177            updated.tool_config.approval_policy.default_behavior,
2178            UnapprovedBehavior::Deny
2179        );
2180        assert_eq!(
2181            updated.policy_overrides.approval_policy.default_behavior,
2182            Some(UnapprovedBehavior::Deny)
2183        );
2184    }
2185
2186    #[test]
2187    fn dispatch_agent_resume_is_auto_approved() {
2188        let mut state = test_state();
2189        let session_id = state.session_id;
2190        let config = base_session_config();
2191        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2192
2193        let tool_call = ToolCall {
2194            id: "tc_dispatch_resume".to_string(),
2195            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2196            parameters: json!({
2197                "prompt": "resume work",
2198                "target": {
2199                    "session": "resume",
2200                    "session_id": SessionId::new().to_string()
2201                }
2202            }),
2203        };
2204
2205        let decision = get_tool_decision(&state, &tool_call);
2206        assert_eq!(decision, ToolDecision::Allow);
2207        assert_eq!(state.session_id, session_id);
2208    }
2209
2210    #[test]
2211    fn test_switch_primary_agent_restores_base_prompt() {
2212        let mut state = test_state();
2213        let session_id = state.session_id;
2214        let mut config = base_session_config();
2215        config.system_prompt = Some("base prompt".to_string());
2216        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2217
2218        let _ = reduce(
2219            &mut state,
2220            Action::SwitchPrimaryAgent {
2221                session_id,
2222                agent_id: "plan".to_string(),
2223            },
2224        );
2225
2226        let _ = reduce(
2227            &mut state,
2228            Action::SwitchPrimaryAgent {
2229                session_id,
2230                agent_id: "normal".to_string(),
2231            },
2232        );
2233
2234        let updated = state.session_config.as_ref().expect("config");
2235        assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2236    }
2237
2238    #[test]
2239    fn test_switch_primary_agent_blocked_during_operation() {
2240        let mut state = test_state();
2241        let session_id = state.session_id;
2242        let config = base_session_config();
2243        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2244
2245        state.current_operation = Some(OperationState {
2246            op_id: OpId::new(),
2247            kind: OperationKind::AgentLoop,
2248            pending_tool_calls: HashSet::new(),
2249        });
2250
2251        let result = super::reduce(
2252            &mut state,
2253            Action::SwitchPrimaryAgent {
2254                session_id,
2255                agent_id: "plan".to_string(),
2256            },
2257        );
2258
2259        assert!(matches!(
2260            result,
2261            Err(ReduceError::InvalidAction {
2262                kind: InvalidActionKind::OperationInFlight,
2263                ..
2264            })
2265        ));
2266        assert!(state.primary_agent_id.as_deref() == Some("normal"));
2267    }
2268
2269    #[test]
2270    fn test_late_result_ignored_after_cancel() {
2271        let mut state = test_state();
2272        let session_id = state.session_id;
2273        let op_id = OpId::new();
2274        let tool_call_id = ToolCallId::from_string("tc_1");
2275
2276        state.current_operation = Some(OperationState {
2277            op_id,
2278            kind: OperationKind::AgentLoop,
2279            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2280        });
2281
2282        let _ = reduce(
2283            &mut state,
2284            Action::Cancel {
2285                session_id,
2286                op_id: None,
2287            },
2288        );
2289
2290        state.current_operation = Some(OperationState {
2291            op_id,
2292            kind: OperationKind::AgentLoop,
2293            pending_tool_calls: HashSet::new(),
2294        });
2295        state
2296            .operation_models
2297            .insert(op_id, builtin::claude_sonnet_4_5());
2298        state
2299            .operation_models
2300            .insert(op_id, builtin::claude_sonnet_4_5());
2301        state
2302            .operation_models
2303            .insert(op_id, builtin::claude_sonnet_4_5());
2304
2305        let effects = reduce(
2306            &mut state,
2307            Action::ToolResult {
2308                session_id,
2309                tool_call_id,
2310                tool_name: "test".to_string(),
2311                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2312                    tool_name: "test".to_string(),
2313                    payload: "done".to_string(),
2314                })),
2315            },
2316        );
2317
2318        assert!(effects.is_empty());
2319    }
2320
2321    #[test]
2322    fn test_pre_approved_tool_executes_immediately() {
2323        let mut state = test_state();
2324        let session_id = state.session_id;
2325        let op_id = OpId::new();
2326
2327        state.approved_tools.insert("test_tool".to_string());
2328        state.current_operation = Some(OperationState {
2329            op_id,
2330            kind: OperationKind::AgentLoop,
2331            pending_tool_calls: HashSet::new(),
2332        });
2333        state
2334            .operation_models
2335            .insert(op_id, builtin::claude_sonnet_4_5());
2336        state
2337            .operation_models
2338            .insert(op_id, builtin::claude_sonnet_4_5());
2339        state
2340            .operation_models
2341            .insert(op_id, builtin::claude_sonnet_4_5());
2342
2343        let tool_call = steer_tools::ToolCall {
2344            id: "tc_1".to_string(),
2345            name: "test_tool".to_string(),
2346            parameters: serde_json::json!({}),
2347        };
2348
2349        let effects = reduce(
2350            &mut state,
2351            Action::ToolApprovalRequested {
2352                session_id,
2353                request_id: RequestId::new(),
2354                tool_call,
2355            },
2356        );
2357
2358        assert!(
2359            effects
2360                .iter()
2361                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2362        );
2363        assert!(state.pending_approval.is_none());
2364    }
2365
2366    #[test]
2367    fn test_denied_tool_request_emits_failure_message() {
2368        let mut state = test_state();
2369        let session_id = state.session_id;
2370        let op_id = OpId::new();
2371
2372        state.current_operation = Some(OperationState {
2373            op_id,
2374            kind: OperationKind::AgentLoop,
2375            pending_tool_calls: HashSet::new(),
2376        });
2377
2378        state
2379            .operation_models
2380            .insert(op_id, builtin::claude_sonnet_4_5());
2381
2382        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2383        config.tool_config.approval_policy = ToolApprovalPolicy {
2384            default_behavior: UnapprovedBehavior::Deny,
2385            preapproved: ApprovalRules::default(),
2386        };
2387        state.session_config = Some(config);
2388
2389        let tool_call = steer_tools::ToolCall {
2390            id: "tc_1".to_string(),
2391            name: "test_tool".to_string(),
2392            parameters: serde_json::json!({}),
2393        };
2394
2395        let effects = reduce(
2396            &mut state,
2397            Action::ToolApprovalRequested {
2398                session_id,
2399                request_id: RequestId::new(),
2400                tool_call,
2401            },
2402        );
2403
2404        assert!(effects.iter().any(|e| matches!(
2405            e,
2406            Effect::EmitEvent {
2407                event: SessionEvent::ToolCallFailed { .. },
2408                ..
2409            }
2410        )));
2411        assert!(effects.iter().any(|e| matches!(
2412            e,
2413            Effect::EmitEvent {
2414                event: SessionEvent::ToolMessageAdded { .. },
2415                ..
2416            }
2417        )));
2418        assert!(
2419            !effects
2420                .iter()
2421                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2422        );
2423        assert!(
2424            !effects
2425                .iter()
2426                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2427        );
2428        assert!(state.pending_approval.is_none());
2429        assert!(state.approval_queue.is_empty());
2430        assert_eq!(state.message_graph.messages.len(), 1);
2431
2432        match &state.message_graph.messages[0].data {
2433            MessageData::Tool { result, .. } => match result {
2434                ToolResult::Error(error) => {
2435                    assert!(
2436                        matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2437                    );
2438                }
2439                _ => panic!("expected denied tool error"),
2440            },
2441            _ => panic!("expected tool message"),
2442        }
2443    }
2444
2445    #[test]
2446    fn test_user_denied_tool_request_emits_failure_message() {
2447        let mut state = test_state();
2448        let session_id = state.session_id;
2449        let op_id = OpId::new();
2450
2451        state.current_operation = Some(OperationState {
2452            op_id,
2453            kind: OperationKind::AgentLoop,
2454            pending_tool_calls: HashSet::new(),
2455        });
2456        state
2457            .operation_models
2458            .insert(op_id, builtin::claude_sonnet_4_5());
2459
2460        let tool_call = steer_tools::ToolCall {
2461            id: "tc_1".to_string(),
2462            name: "test_tool".to_string(),
2463            parameters: serde_json::json!({}),
2464        };
2465        let request_id = RequestId::new();
2466        state.pending_approval = Some(PendingApproval {
2467            request_id,
2468            tool_call: tool_call.clone(),
2469        });
2470
2471        let effects = reduce(
2472            &mut state,
2473            Action::ToolApprovalDecided {
2474                session_id,
2475                request_id,
2476                decision: ApprovalDecision::Denied,
2477                remember: None,
2478            },
2479        );
2480
2481        assert!(effects.iter().any(|e| matches!(
2482            e,
2483            Effect::EmitEvent {
2484                event: SessionEvent::ToolCallFailed { .. },
2485                ..
2486            }
2487        )));
2488        assert!(effects.iter().any(|e| matches!(
2489            e,
2490            Effect::EmitEvent {
2491                event: SessionEvent::ToolMessageAdded { .. },
2492                ..
2493            }
2494        )));
2495        assert!(
2496            !effects
2497                .iter()
2498                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2499        );
2500        assert!(state.pending_approval.is_none());
2501        assert!(state.approval_queue.is_empty());
2502        assert_eq!(state.message_graph.messages.len(), 1);
2503
2504        match &state.message_graph.messages[0].data {
2505            MessageData::Tool { result, .. } => match result {
2506                ToolResult::Error(error) => {
2507                    assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2508                }
2509                _ => panic!("expected denied tool error"),
2510            },
2511            _ => panic!("expected tool message"),
2512        }
2513    }
2514
2515    #[test]
2516    fn test_cancel_pops_queued_item_without_auto_start() {
2517        let mut state = test_state();
2518        let session_id = state.session_id;
2519        let op_id = OpId::new();
2520
2521        state.current_operation = Some(OperationState {
2522            op_id,
2523            kind: OperationKind::AgentLoop,
2524            pending_tool_calls: HashSet::new(),
2525        });
2526        state
2527            .operation_models
2528            .insert(op_id, builtin::claude_sonnet_4_5());
2529
2530        let queued_op = OpId::new();
2531        let queued_message_id = MessageId::from_string("queued_msg");
2532        let _ = reduce(
2533            &mut state,
2534            Action::UserInput {
2535                session_id,
2536                text: NonEmptyString::new("Queued message").expect("non-empty"),
2537                op_id: queued_op,
2538                message_id: queued_message_id.clone(),
2539                model: builtin::claude_sonnet_4_5(),
2540                timestamp: 1,
2541            },
2542        );
2543
2544        let effects = reduce(
2545            &mut state,
2546            Action::Cancel {
2547                session_id,
2548                op_id: None,
2549            },
2550        );
2551
2552        assert!(state.current_operation.is_none());
2553        assert!(state.queued_work.is_empty());
2554
2555        let cancellation_info = effects.iter().find_map(|effect| match effect {
2556            Effect::EmitEvent {
2557                event: SessionEvent::OperationCancelled { info, .. },
2558                ..
2559            } => Some(info),
2560            _ => None,
2561        });
2562        let info = cancellation_info.expect("expected OperationCancelled event");
2563        let popped = info
2564            .popped_queued_item
2565            .as_ref()
2566            .expect("expected popped queued item");
2567        assert_eq!(popped.content, "Queued message");
2568        assert_eq!(popped.op_id, queued_op);
2569        assert_eq!(popped.message_id, queued_message_id);
2570
2571        assert!(
2572            !effects.iter().any(|effect| matches!(
2573                effect,
2574                Effect::EmitEvent {
2575                    event: SessionEvent::OperationStarted { .. },
2576                    ..
2577                }
2578            )),
2579            "queued work should not auto-start on cancel"
2580        );
2581    }
2582
2583    #[test]
2584    fn test_cancel_injects_tool_results_for_pending_calls() {
2585        let mut state = test_state();
2586        let session_id = state.session_id;
2587        let op_id = OpId::new();
2588
2589        let tool_call = ToolCall {
2590            id: "tc_1".to_string(),
2591            name: "test_tool".to_string(),
2592            parameters: serde_json::json!({}),
2593        };
2594
2595        state.message_graph.add_message(Message {
2596            data: MessageData::Assistant {
2597                content: vec![AssistantContent::ToolCall {
2598                    tool_call: tool_call.clone(),
2599                    thought_signature: None,
2600                }],
2601            },
2602            timestamp: 0,
2603            id: "msg_1".to_string(),
2604            parent_message_id: None,
2605        });
2606
2607        state.current_operation = Some(OperationState {
2608            op_id,
2609            kind: OperationKind::AgentLoop,
2610            pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2611        });
2612        state
2613            .operation_models
2614            .insert(op_id, builtin::claude_sonnet_4_5());
2615
2616        let effects = reduce(
2617            &mut state,
2618            Action::Cancel {
2619                session_id,
2620                op_id: None,
2621            },
2622        );
2623
2624        assert!(effects.iter().any(|e| matches!(
2625            e,
2626            Effect::EmitEvent {
2627                event: SessionEvent::ToolMessageAdded { .. },
2628                ..
2629            }
2630        )));
2631
2632        let tool_message = state
2633            .message_graph
2634            .messages
2635            .iter()
2636            .find(|message| matches!(message.data, MessageData::Tool { .. }))
2637            .expect("tool result should be injected on cancel");
2638
2639        match &tool_message.data {
2640            MessageData::Tool { result, .. } => match result {
2641                ToolResult::Error(error) => {
2642                    assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2643                }
2644                _ => panic!("expected cancelled tool error"),
2645            },
2646            _ => panic!("expected tool message"),
2647        }
2648    }
2649
2650    #[test]
2651    fn test_malformed_tool_call_auto_denies() {
2652        let mut state = test_state();
2653        let session_id = state.session_id;
2654        let op_id = OpId::new();
2655
2656        state.current_operation = Some(OperationState {
2657            op_id,
2658            kind: OperationKind::AgentLoop,
2659            pending_tool_calls: HashSet::new(),
2660        });
2661
2662        state
2663            .operation_models
2664            .insert(op_id, builtin::claude_sonnet_4_5());
2665
2666        let mut properties = serde_json::Map::new();
2667        properties.insert("command".to_string(), json!({ "type": "string" }));
2668
2669        state.tools.push(ToolSchema {
2670            name: "test_tool".to_string(),
2671            display_name: "test_tool".to_string(),
2672            description: String::new(),
2673            input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2674        });
2675
2676        let tool_call = ToolCall {
2677            id: "tc_1".to_string(),
2678            name: "test_tool".to_string(),
2679            parameters: json!({}),
2680        };
2681
2682        let effects = reduce(
2683            &mut state,
2684            Action::ToolApprovalRequested {
2685                session_id,
2686                request_id: RequestId::new(),
2687                tool_call,
2688            },
2689        );
2690
2691        assert!(effects.iter().any(|e| matches!(
2692            e,
2693            Effect::EmitEvent {
2694                event: SessionEvent::ToolCallFailed { .. },
2695                ..
2696            }
2697        )));
2698        assert!(effects.iter().any(|e| matches!(
2699            e,
2700            Effect::EmitEvent {
2701                event: SessionEvent::ToolMessageAdded { .. },
2702                ..
2703            }
2704        )));
2705        assert!(
2706            !effects
2707                .iter()
2708                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2709        );
2710        assert!(
2711            !effects
2712                .iter()
2713                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2714        );
2715        assert!(state.pending_approval.is_none());
2716        assert!(state.approval_queue.is_empty());
2717        assert_eq!(state.message_graph.messages.len(), 1);
2718
2719        match &state.message_graph.messages[0].data {
2720            MessageData::Tool { result, .. } => match result {
2721                ToolResult::Error(error) => {
2722                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2723                }
2724                _ => panic!("expected invalid params tool error"),
2725            },
2726            _ => panic!("expected tool message"),
2727        }
2728    }
2729
2730    #[test]
2731    fn test_approval_queuing() {
2732        let mut state = test_state();
2733        let session_id = state.session_id;
2734        let op_id = OpId::new();
2735
2736        state.current_operation = Some(OperationState {
2737            op_id,
2738            kind: OperationKind::AgentLoop,
2739            pending_tool_calls: HashSet::new(),
2740        });
2741
2742        let tool_call_1 = steer_tools::ToolCall {
2743            id: "tc_1".to_string(),
2744            name: "tool_1".to_string(),
2745            parameters: serde_json::json!({}),
2746        };
2747        let tool_call_2 = steer_tools::ToolCall {
2748            id: "tc_2".to_string(),
2749            name: "tool_2".to_string(),
2750            parameters: serde_json::json!({}),
2751        };
2752
2753        let _ = reduce(
2754            &mut state,
2755            Action::ToolApprovalRequested {
2756                session_id,
2757                request_id: RequestId::new(),
2758                tool_call: tool_call_1,
2759            },
2760        );
2761
2762        assert!(state.pending_approval.is_some());
2763
2764        let _ = reduce(
2765            &mut state,
2766            Action::ToolApprovalRequested {
2767                session_id,
2768                request_id: RequestId::new(),
2769                tool_call: tool_call_2,
2770            },
2771        );
2772
2773        assert_eq!(state.approval_queue.len(), 1);
2774    }
2775
2776    #[test]
2777    fn test_dispatch_agent_missing_target_auto_denies() {
2778        let mut state = test_state();
2779        let session_id = state.session_id;
2780        let op_id = OpId::new();
2781
2782        state.current_operation = Some(OperationState {
2783            op_id,
2784            kind: OperationKind::AgentLoop,
2785            pending_tool_calls: HashSet::new(),
2786        });
2787
2788        state
2789            .operation_models
2790            .insert(op_id, builtin::claude_sonnet_4_5());
2791
2792        let input_schema: InputSchema =
2793            schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2794        state.tools.push(ToolSchema {
2795            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2796            display_name: "Dispatch Agent".to_string(),
2797            description: String::new(),
2798            input_schema,
2799        });
2800
2801        let tool_call = ToolCall {
2802            id: "tc_dispatch".to_string(),
2803            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2804            parameters: json!({ "prompt": "hello world" }),
2805        };
2806
2807        let effects = reduce(
2808            &mut state,
2809            Action::ToolApprovalRequested {
2810                session_id,
2811                request_id: RequestId::new(),
2812                tool_call,
2813            },
2814        );
2815
2816        assert!(effects.iter().any(|e| matches!(
2817            e,
2818            Effect::EmitEvent {
2819                event: SessionEvent::ToolCallFailed { .. },
2820                ..
2821            }
2822        )));
2823        assert!(effects.iter().any(|e| matches!(
2824            e,
2825            Effect::EmitEvent {
2826                event: SessionEvent::ToolMessageAdded { .. },
2827                ..
2828            }
2829        )));
2830        assert!(
2831            !effects
2832                .iter()
2833                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2834        );
2835        assert!(state.pending_approval.is_none());
2836        assert!(state.approval_queue.is_empty());
2837
2838        match &state.message_graph.messages[0].data {
2839            MessageData::Tool { result, .. } => match result {
2840                ToolResult::Error(error) => {
2841                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2842                }
2843                _ => panic!("expected invalid params tool error"),
2844            },
2845            _ => panic!("expected tool message"),
2846        }
2847    }
2848
2849    #[test]
2850    fn test_model_response_with_tool_calls_requests_approval() {
2851        let mut state = test_state();
2852        let session_id = state.session_id;
2853        let op_id = OpId::new();
2854        let message_id = MessageId::new();
2855
2856        state.current_operation = Some(OperationState {
2857            op_id,
2858            kind: OperationKind::AgentLoop,
2859            pending_tool_calls: HashSet::new(),
2860        });
2861        state
2862            .operation_models
2863            .insert(op_id, builtin::claude_sonnet_4_5());
2864
2865        let tool_call = steer_tools::ToolCall {
2866            id: "tc_1".to_string(),
2867            name: "bash".to_string(),
2868            parameters: serde_json::json!({"command": "ls"}),
2869        };
2870
2871        let content = vec![
2872            AssistantContent::Text {
2873                text: "Let me list the files.".to_string(),
2874            },
2875            AssistantContent::ToolCall {
2876                tool_call: tool_call.clone(),
2877                thought_signature: None,
2878            },
2879        ];
2880
2881        let effects = reduce(
2882            &mut state,
2883            Action::ModelResponseComplete {
2884                session_id,
2885                op_id,
2886                message_id,
2887                content,
2888                timestamp: 12345,
2889            },
2890        );
2891
2892        assert!(state.pending_approval.is_some());
2893        assert!(
2894            effects
2895                .iter()
2896                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2897        );
2898        assert!(state.current_operation.is_some());
2899    }
2900
2901    #[test]
2902    fn test_model_response_no_tools_completes_operation() {
2903        let mut state = test_state();
2904        let session_id = state.session_id;
2905        let op_id = OpId::new();
2906        let message_id = MessageId::new();
2907
2908        state.current_operation = Some(OperationState {
2909            op_id,
2910            kind: OperationKind::AgentLoop,
2911            pending_tool_calls: HashSet::new(),
2912        });
2913        state
2914            .operation_models
2915            .insert(op_id, builtin::claude_sonnet_4_5());
2916
2917        let content = vec![AssistantContent::Text {
2918            text: "Hello! How can I help?".to_string(),
2919        }];
2920
2921        let effects = reduce(
2922            &mut state,
2923            Action::ModelResponseComplete {
2924                session_id,
2925                op_id,
2926                message_id,
2927                content,
2928                timestamp: 12345,
2929            },
2930        );
2931
2932        assert!(state.current_operation.is_none());
2933        assert!(effects.iter().any(|e| matches!(
2934            e,
2935            Effect::EmitEvent {
2936                event: SessionEvent::OperationCompleted { .. },
2937                ..
2938            }
2939        )));
2940    }
2941
2942    #[test]
2943    fn test_out_of_order_completion_preserves_newer_operation() {
2944        let mut state = test_state();
2945        let session_id = state.session_id;
2946        let model = builtin::claude_sonnet_4_5();
2947
2948        let op_a = OpId::new();
2949        let op_b = OpId::new();
2950
2951        let _ = reduce(
2952            &mut state,
2953            Action::UserInput {
2954                session_id,
2955                text: NonEmptyString::new("first").unwrap(),
2956                op_id: op_a,
2957                message_id: MessageId::new(),
2958                model: model.clone(),
2959                timestamp: 1,
2960            },
2961        );
2962
2963        let _ = reduce(
2964            &mut state,
2965            Action::UserInput {
2966                session_id,
2967                text: NonEmptyString::new("second").unwrap(),
2968                op_id: op_b,
2969                message_id: MessageId::new(),
2970                model: model.clone(),
2971                timestamp: 2,
2972            },
2973        );
2974
2975        let _ = reduce(
2976            &mut state,
2977            Action::ModelResponseComplete {
2978                session_id,
2979                op_id: op_a,
2980                message_id: MessageId::new(),
2981                content: vec![AssistantContent::Text {
2982                    text: "done A".to_string(),
2983                }],
2984                timestamp: 3,
2985            },
2986        );
2987
2988        assert!(
2989            state
2990                .current_operation
2991                .as_ref()
2992                .is_some_and(|op| op.op_id == op_b)
2993        );
2994        assert!(state.operation_models.contains_key(&op_b));
2995        assert!(!state.operation_models.contains_key(&op_a));
2996
2997        let effects = reduce(
2998            &mut state,
2999            Action::ModelResponseComplete {
3000                session_id,
3001                op_id: op_b,
3002                message_id: MessageId::new(),
3003                content: vec![AssistantContent::Text {
3004                    text: "done B".to_string(),
3005                }],
3006                timestamp: 4,
3007            },
3008        );
3009
3010        assert!(effects.iter().any(|e| matches!(
3011            e,
3012            Effect::EmitEvent {
3013                event: SessionEvent::OperationCompleted { op_id },
3014                ..
3015            } if *op_id == op_b
3016        )));
3017        assert!(!effects.iter().any(|e| matches!(
3018            e,
3019            Effect::EmitEvent {
3020                event: SessionEvent::Error { message },
3021                ..
3022            } if message.contains("Missing model for operation")
3023        )));
3024    }
3025
3026    #[test]
3027    fn test_tool_approval_does_not_call_model_before_result() {
3028        let mut state = test_state();
3029        let session_id = state.session_id;
3030        let op_id = OpId::new();
3031
3032        state.current_operation = Some(OperationState {
3033            op_id,
3034            kind: OperationKind::AgentLoop,
3035            pending_tool_calls: HashSet::new(),
3036        });
3037        state
3038            .operation_models
3039            .insert(op_id, builtin::claude_sonnet_4_5());
3040
3041        let tool_call = steer_tools::ToolCall {
3042            id: "tc_1".to_string(),
3043            name: "bash".to_string(),
3044            parameters: serde_json::json!({"command": "ls"}),
3045        };
3046        let request_id = RequestId::new();
3047        state.pending_approval = Some(PendingApproval {
3048            request_id,
3049            tool_call: tool_call.clone(),
3050        });
3051
3052        let effects = reduce(
3053            &mut state,
3054            Action::ToolApprovalDecided {
3055                session_id,
3056                request_id,
3057                decision: ApprovalDecision::Approved,
3058                remember: None,
3059            },
3060        );
3061
3062        assert!(
3063            effects
3064                .iter()
3065                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3066        );
3067        assert!(
3068            !effects
3069                .iter()
3070                .any(|e| matches!(e, Effect::CallModel { .. }))
3071        );
3072        assert!(state.current_operation.as_ref().is_some_and(|op| {
3073            op.pending_tool_calls
3074                .contains(&ToolCallId::from_string("tc_1"))
3075        }));
3076    }
3077
3078    #[test]
3079    fn test_mcp_tool_visibility_and_disconnect_removal() {
3080        let mut state = test_state();
3081        let session_id = state.session_id;
3082
3083        let mut allowed = HashSet::new();
3084        allowed.insert("mcp__alpha__allowed".to_string());
3085
3086        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3087        config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3088        state.session_config = Some(config);
3089
3090        state.tools.push(test_schema("bash"));
3091
3092        let _ = reduce(
3093            &mut state,
3094            Action::McpServerStateChanged {
3095                session_id,
3096                server_name: "alpha".to_string(),
3097                state: McpServerState::Connected {
3098                    tools: vec![
3099                        test_schema("mcp__alpha__allowed"),
3100                        test_schema("mcp__alpha__blocked"),
3101                    ],
3102                },
3103            },
3104        );
3105
3106        assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3107        assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3108
3109        let _ = reduce(
3110            &mut state,
3111            Action::McpServerStateChanged {
3112                session_id,
3113                server_name: "alpha".to_string(),
3114                state: McpServerState::Disconnected { error: None },
3115            },
3116        );
3117
3118        assert!(
3119            !state
3120                .tools
3121                .iter()
3122                .any(|t| t.name.starts_with("mcp__alpha__"))
3123        );
3124        assert!(state.tools.iter().any(|t| t.name == "bash"));
3125    }
3126
3127    #[test]
3128    fn test_tool_result_continues_agent_loop() {
3129        let mut state = test_state();
3130        let session_id = state.session_id;
3131        let op_id = OpId::new();
3132        let tool_call_id = ToolCallId::from_string("tc_1");
3133
3134        state.current_operation = Some(OperationState {
3135            op_id,
3136            kind: OperationKind::AgentLoop,
3137            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3138        });
3139        state
3140            .operation_models
3141            .insert(op_id, builtin::claude_sonnet_4_5());
3142
3143        let effects = reduce(
3144            &mut state,
3145            Action::ToolResult {
3146                session_id,
3147                tool_call_id,
3148                tool_name: "bash".to_string(),
3149                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3150                    tool_name: "bash".to_string(),
3151                    payload: "file1.txt\nfile2.txt".to_string(),
3152                })),
3153            },
3154        );
3155
3156        assert!(
3157            effects
3158                .iter()
3159                .any(|e| matches!(e, Effect::CallModel { .. }))
3160        );
3161    }
3162
3163    #[test]
3164    fn test_tool_result_waits_for_pending_tools() {
3165        let mut state = test_state();
3166        let session_id = state.session_id;
3167        let op_id = OpId::new();
3168        let tool_call_id_1 = ToolCallId::from_string("tc_1");
3169        let tool_call_id_2 = ToolCallId::from_string("tc_2");
3170
3171        state.current_operation = Some(OperationState {
3172            op_id,
3173            kind: OperationKind::AgentLoop,
3174            pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3175                .into_iter()
3176                .collect(),
3177        });
3178        state
3179            .operation_models
3180            .insert(op_id, builtin::claude_sonnet_4_5());
3181
3182        let effects = reduce(
3183            &mut state,
3184            Action::ToolResult {
3185                session_id,
3186                tool_call_id: tool_call_id_1,
3187                tool_name: "bash".to_string(),
3188                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3189                    tool_name: "bash".to_string(),
3190                    payload: "done".to_string(),
3191                })),
3192            },
3193        );
3194
3195        assert!(
3196            !effects
3197                .iter()
3198                .any(|e| matches!(e, Effect::CallModel { .. }))
3199        );
3200
3201        let effects = reduce(
3202            &mut state,
3203            Action::ToolResult {
3204                session_id,
3205                tool_call_id: tool_call_id_2,
3206                tool_name: "bash".to_string(),
3207                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3208                    tool_name: "bash".to_string(),
3209                    payload: "done".to_string(),
3210                })),
3211            },
3212        );
3213
3214        assert!(
3215            effects
3216                .iter()
3217                .any(|e| matches!(e, Effect::CallModel { .. }))
3218        );
3219    }
3220}