Skip to main content

steer_core/app/domain/
reduce.rs

1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
4use crate::app::domain::effect::{Effect, McpServerConfig};
5use crate::app::domain::event::{
6    CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
7};
8use crate::app::domain::state::{
9    AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
10};
11use crate::app::domain::types::NonEmptyString;
12use crate::primary_agents::{
13    default_primary_agent_id, primary_agent_spec, resolve_effective_config,
14};
15use crate::session::state::{BackendConfig, ToolDecision};
16use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
17use serde_json::Value;
18use steer_tools::ToolError;
19use steer_tools::result::ToolResult;
20use steer_tools::tools::BASH_TOOL_NAME;
21use thiserror::Error;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum InvalidActionKind {
25    OperationInFlight,
26    MissingSessionConfig,
27    UnknownPrimaryAgent,
28    QueueEmpty,
29}
30
31#[derive(Debug, Error)]
32pub enum ReduceError {
33    #[error("{message}")]
34    InvalidAction {
35        message: String,
36        kind: InvalidActionKind,
37    },
38    #[error("Invariant violated: {message}")]
39    Invariant { message: String },
40}
41
42fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
43    ReduceError::InvalidAction {
44        message: message.into(),
45        kind,
46    }
47}
48
49pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
50    match action {
51        Action::UserInput {
52            session_id,
53            text,
54            op_id,
55            message_id,
56            model,
57            timestamp,
58        } => Ok(handle_user_input(
59            state, session_id, text, op_id, message_id, model, timestamp,
60        )),
61
62        Action::UserEditedMessage {
63            session_id,
64            message_id,
65            new_content,
66            op_id,
67            new_message_id,
68            model,
69            timestamp,
70        } => handle_user_edited_message(
71            state,
72            session_id,
73            UserEditedMessageParams {
74                original_message_id: message_id,
75                new_content,
76                op_id,
77                new_message_id,
78                model,
79                timestamp,
80            },
81        ),
82
83        Action::ToolApprovalRequested {
84            session_id,
85            request_id,
86            tool_call,
87        } => Ok(handle_tool_approval_requested(
88            state, session_id, request_id, tool_call,
89        )),
90
91        Action::ToolApprovalDecided {
92            session_id,
93            request_id,
94            decision,
95            remember,
96        } => Ok(handle_tool_approval_decided(
97            state, session_id, request_id, decision, remember,
98        )),
99
100        Action::ToolExecutionStarted {
101            session_id,
102            tool_call_id,
103            tool_name,
104            tool_parameters,
105        } => Ok(handle_tool_execution_started(
106            state,
107            session_id,
108            tool_call_id,
109            tool_name,
110            tool_parameters,
111        )),
112
113        Action::ToolResult {
114            session_id,
115            tool_call_id,
116            tool_name,
117            result,
118        } => Ok(handle_tool_result(
119            state,
120            session_id,
121            tool_call_id,
122            tool_name,
123            result,
124        )),
125
126        Action::ModelResponseComplete {
127            session_id,
128            op_id,
129            message_id,
130            content,
131            timestamp,
132        } => Ok(handle_model_response_complete(
133            state, session_id, op_id, message_id, content, timestamp,
134        )),
135
136        Action::ModelResponseError {
137            session_id,
138            op_id,
139            error,
140        } => Ok(handle_model_response_error(
141            state, session_id, op_id, &error,
142        )),
143
144        Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
145
146        Action::DirectBashCommand {
147            session_id,
148            op_id,
149            message_id,
150            command,
151            timestamp,
152        } => Ok(handle_direct_bash(
153            state, session_id, op_id, message_id, command, timestamp,
154        )),
155
156        Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
157
158        Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
159
160        Action::RequestCompaction {
161            session_id,
162            op_id,
163            model,
164        } => handle_request_compaction(state, session_id, op_id, model),
165
166        Action::Hydrate {
167            session_id,
168            events,
169            starting_sequence,
170        } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
171
172        Action::WorkspaceFilesListed { files, .. } => {
173            state.workspace_files = files;
174            Ok(vec![])
175        }
176
177        Action::ToolSchemasAvailable { tools, .. } => {
178            state.tools = tools;
179            Ok(vec![])
180        }
181
182        Action::ToolSchemasUpdated { schemas, .. } => {
183            state.tools = schemas;
184            Ok(vec![])
185        }
186
187        Action::SwitchPrimaryAgent {
188            session_id,
189            agent_id,
190        } => handle_switch_primary_agent(state, session_id, agent_id),
191
192        Action::McpServerStateChanged {
193            session_id,
194            server_name,
195            state: new_state,
196        } => {
197            // When connected, merge MCP tools into state.tools
198            if let McpServerState::Connected { tools } = &new_state {
199                let tools = state.session_config.as_ref().map_or_else(
200                    || tools.clone(),
201                    |config| config.filter_tools_by_visibility(tools.clone()),
202                );
203
204                // Add MCP tools that aren't already present (by name)
205                for tool in tools {
206                    if !state.tools.iter().any(|t| t.name == tool.name) {
207                        state.tools.push(tool.clone());
208                    }
209                }
210            }
211
212            // When disconnected or failed, remove tools from that server
213            if matches!(
214                &new_state,
215                McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
216            ) {
217                let prefix = format!("mcp__{server_name}__");
218                state.tools.retain(|t| !t.name.starts_with(&prefix));
219            }
220
221            state
222                .mcp_servers
223                .insert(server_name.clone(), new_state.clone());
224            Ok(vec![Effect::EmitEvent {
225                session_id,
226                event: SessionEvent::McpServerStateChanged {
227                    server_name,
228                    state: new_state,
229                },
230            }])
231        }
232
233        Action::CompactionComplete {
234            session_id,
235            op_id,
236            compaction_id,
237            summary_message_id,
238            summary,
239            compacted_head_message_id,
240            previous_active_message_id,
241            model,
242            timestamp,
243        } => Ok(handle_compaction_complete(
244            state,
245            session_id,
246            CompactionCompleteParams {
247                op_id,
248                compaction_id,
249                summary_message_id,
250                summary,
251                compacted_head_message_id,
252                previous_active_message_id,
253                model_name: model,
254                timestamp,
255            },
256        )),
257
258        Action::CompactionFailed {
259            session_id,
260            op_id,
261            error,
262        } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
263
264        Action::Shutdown => Ok(vec![]),
265    }
266}
267
268fn handle_user_input(
269    state: &mut AppState,
270    session_id: crate::app::domain::types::SessionId,
271    text: crate::app::domain::types::NonEmptyString,
272    op_id: crate::app::domain::types::OpId,
273    message_id: crate::app::domain::types::MessageId,
274    model: crate::config::model::ModelId,
275    timestamp: u64,
276) -> Vec<Effect> {
277    let mut effects = Vec::new();
278
279    if state.has_active_operation() {
280        state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
281            text,
282            op_id,
283            message_id,
284            model,
285            queued_at: timestamp,
286        });
287        effects.push(Effect::EmitEvent {
288            session_id,
289            event: SessionEvent::QueueUpdated {
290                queue: snapshot_queue(state),
291            },
292        });
293        return effects;
294    }
295
296    let parent_id = state.message_graph.active_message_id.clone();
297
298    let message = Message {
299        data: MessageData::User {
300            content: vec![UserContent::Text {
301                text: text.as_str().to_string(),
302            }],
303        },
304        timestamp,
305        id: message_id.0.clone(),
306        parent_message_id: parent_id,
307    };
308
309    state.message_graph.add_message(message.clone());
310    state.message_graph.active_message_id = Some(message_id.0.clone());
311
312    state.start_operation(op_id, OperationKind::AgentLoop);
313    state.operation_models.insert(op_id, model.clone());
314
315    effects.push(Effect::EmitEvent {
316        session_id,
317        event: SessionEvent::UserMessageAdded {
318            message: message.clone(),
319        },
320    });
321
322    effects.push(Effect::EmitEvent {
323        session_id,
324        event: SessionEvent::OperationStarted {
325            op_id,
326            kind: OperationKind::AgentLoop,
327        },
328    });
329
330    effects.push(Effect::CallModel {
331        session_id,
332        op_id,
333        model,
334        messages: state
335            .message_graph
336            .get_thread_messages()
337            .into_iter()
338            .cloned()
339            .collect(),
340        system_context: state.cached_system_context.clone(),
341        tools: state.tools.clone(),
342    });
343
344    effects
345}
346
347struct UserEditedMessageParams {
348    original_message_id: crate::app::domain::types::MessageId,
349    new_content: String,
350    op_id: crate::app::domain::types::OpId,
351    new_message_id: crate::app::domain::types::MessageId,
352    model: crate::config::model::ModelId,
353    timestamp: u64,
354}
355
356fn handle_user_edited_message(
357    state: &mut AppState,
358    session_id: crate::app::domain::types::SessionId,
359    params: UserEditedMessageParams,
360) -> Result<Vec<Effect>, ReduceError> {
361    let UserEditedMessageParams {
362        original_message_id,
363        new_content,
364        op_id,
365        new_message_id,
366        model,
367        timestamp,
368    } = params;
369    let mut effects = Vec::new();
370
371    if state.has_active_operation() {
372        return Err(invalid_action(
373            InvalidActionKind::OperationInFlight,
374            "Cannot edit message while an operation is active.",
375        ));
376    }
377
378    let parent_id = state
379        .message_graph
380        .messages
381        .iter()
382        .find(|m| m.id() == original_message_id.0)
383        .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
384
385    let message = Message {
386        data: MessageData::User {
387            content: vec![UserContent::Text { text: new_content }],
388        },
389        timestamp,
390        id: new_message_id.0.clone(),
391        parent_message_id: parent_id,
392    };
393
394    state.message_graph.add_message(message.clone());
395    state.message_graph.active_message_id = Some(new_message_id.0.clone());
396
397    state.start_operation(op_id, OperationKind::AgentLoop);
398    state.operation_models.insert(op_id, model.clone());
399
400    effects.push(Effect::EmitEvent {
401        session_id,
402        event: SessionEvent::UserMessageAdded {
403            message: message.clone(),
404        },
405    });
406
407    effects.push(Effect::EmitEvent {
408        session_id,
409        event: SessionEvent::OperationStarted {
410            op_id,
411            kind: OperationKind::AgentLoop,
412        },
413    });
414
415    effects.push(Effect::CallModel {
416        session_id,
417        op_id,
418        model,
419        messages: state
420            .message_graph
421            .get_thread_messages()
422            .into_iter()
423            .cloned()
424            .collect(),
425        system_context: state.cached_system_context.clone(),
426        tools: state.tools.clone(),
427    });
428
429    Ok(effects)
430}
431
432fn handle_tool_approval_requested(
433    state: &mut AppState,
434    session_id: crate::app::domain::types::SessionId,
435    request_id: crate::app::domain::types::RequestId,
436    tool_call: steer_tools::ToolCall,
437) -> Vec<Effect> {
438    let mut effects = Vec::new();
439
440    if let Err(error) = validate_tool_call(state, &tool_call) {
441        let error_message = error.to_string();
442        return fail_tool_call_without_execution(
443            state,
444            session_id,
445            tool_call,
446            error,
447            error_message,
448            "invalid",
449            true,
450        );
451    }
452
453    let decision = get_tool_decision(state, &tool_call);
454
455    match decision {
456        ToolDecision::Allow => {
457            let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
458                return vec![Effect::EmitEvent {
459                    session_id,
460                    event: SessionEvent::Error {
461                        message: "Tool approval requested without active operation".to_string(),
462                    },
463                }];
464            };
465            state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
466                &tool_call.id,
467            ));
468
469            effects.push(Effect::ExecuteTool {
470                session_id,
471                op_id,
472                tool_call,
473            });
474        }
475        ToolDecision::Deny => {
476            let error = ToolError::DeniedByPolicy(tool_call.name.clone());
477            let tool_name = tool_call.name.clone();
478            effects.extend(fail_tool_call_without_execution(
479                state,
480                session_id,
481                tool_call,
482                error,
483                format!("Tool '{tool_name}' denied by policy"),
484                "denied",
485                true,
486            ));
487        }
488        ToolDecision::Ask => {
489            if state.pending_approval.is_some() {
490                state.approval_queue.push_back(QueuedApproval { tool_call });
491                return effects;
492            }
493
494            state.pending_approval = Some(PendingApproval {
495                request_id,
496                tool_call: tool_call.clone(),
497            });
498
499            effects.push(Effect::EmitEvent {
500                session_id,
501                event: SessionEvent::ApprovalRequested {
502                    request_id,
503                    tool_call: tool_call.clone(),
504                },
505            });
506
507            effects.push(Effect::RequestUserApproval {
508                session_id,
509                request_id,
510                tool_call,
511            });
512        }
513    }
514
515    effects
516}
517
518fn validate_tool_call(
519    state: &AppState,
520    tool_call: &steer_tools::ToolCall,
521) -> Result<(), ToolError> {
522    if tool_call.name.trim().is_empty() {
523        return Err(ToolError::invalid_params(
524            "unknown",
525            "Malformed tool call: missing tool name",
526        ));
527    }
528
529    if tool_call.id.trim().is_empty() {
530        return Err(ToolError::invalid_params(
531            tool_call.name.clone(),
532            "Malformed tool call: missing tool call id",
533        ));
534    }
535
536    if state.tools.is_empty() {
537        return Ok(());
538    }
539
540    let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
541        return Ok(());
542    };
543
544    validate_against_json_schema(
545        &tool_call.name,
546        schema.input_schema.as_value(),
547        &tool_call.parameters,
548    )
549}
550
551fn validate_against_json_schema(
552    tool_name: &str,
553    schema: &Value,
554    params: &Value,
555) -> Result<(), ToolError> {
556    let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
557        ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
558    })?;
559
560    if let Err(errors) = validator.validate(params) {
561        let message = errors
562            .into_iter()
563            .map(|error| error.to_string())
564            .next()
565            .unwrap_or_else(|| "Parameters do not match schema".to_string());
566        return Err(ToolError::invalid_params(tool_name.to_string(), message));
567    }
568
569    Ok(())
570}
571
572fn emit_tool_failure_message(
573    state: &mut AppState,
574    session_id: crate::app::domain::types::SessionId,
575    tool_call_id: &str,
576    tool_name: &str,
577    tool_error: ToolError,
578    event_error: String,
579    message_id_prefix: &str,
580) -> Vec<Effect> {
581    let mut effects = Vec::new();
582
583    let tool_result = ToolResult::Error(tool_error);
584    let parent_id = state.message_graph.active_message_id.clone();
585    let tool_message = Message {
586        data: MessageData::Tool {
587            tool_use_id: tool_call_id.to_string(),
588            result: tool_result,
589        },
590        timestamp: 0,
591        id: format!("{message_id_prefix}_{tool_call_id}"),
592        parent_message_id: parent_id,
593    };
594    state.message_graph.add_message(tool_message.clone());
595
596    if let Some(model) = state
597        .current_operation
598        .as_ref()
599        .and_then(|op| state.operation_models.get(&op.op_id).cloned())
600    {
601        effects.push(Effect::EmitEvent {
602            session_id,
603            event: SessionEvent::ToolCallFailed {
604                id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
605                name: tool_name.to_string(),
606                error: event_error,
607                model,
608            },
609        });
610    }
611
612    effects.push(Effect::EmitEvent {
613        session_id,
614        event: SessionEvent::ToolMessageAdded {
615            message: tool_message,
616        },
617    });
618
619    effects
620}
621
622fn fail_tool_call_without_execution(
623    state: &mut AppState,
624    session_id: crate::app::domain::types::SessionId,
625    tool_call: steer_tools::ToolCall,
626    tool_error: ToolError,
627    event_error: String,
628    message_id_prefix: &str,
629    call_model_if_ready: bool,
630) -> Vec<Effect> {
631    let mut effects = emit_tool_failure_message(
632        state,
633        session_id,
634        &tool_call.id,
635        &tool_call.name,
636        tool_error,
637        event_error,
638        message_id_prefix,
639    );
640
641    if !call_model_if_ready {
642        return effects;
643    }
644
645    let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
646        effects.push(Effect::EmitEvent {
647            session_id,
648            event: SessionEvent::Error {
649                message: "Tool failure recorded without active operation".to_string(),
650            },
651        });
652        return effects;
653    };
654    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
655        model
656    } else {
657        effects.push(Effect::EmitEvent {
658            session_id,
659            event: SessionEvent::Error {
660                message: format!("Missing model for operation {op_id}"),
661            },
662        });
663        return effects;
664    };
665
666    let all_tools_complete = state
667        .current_operation
668        .as_ref()
669        .is_none_or(|op| op.pending_tool_calls.is_empty());
670    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
671
672    if all_tools_complete && no_pending_approvals {
673        effects.push(Effect::CallModel {
674            session_id,
675            op_id,
676            model,
677            messages: state
678                .message_graph
679                .get_thread_messages()
680                .into_iter()
681                .cloned()
682                .collect(),
683            system_context: state.cached_system_context.clone(),
684            tools: state.tools.clone(),
685        });
686    }
687
688    effects
689}
690
691fn handle_tool_approval_decided(
692    state: &mut AppState,
693    session_id: crate::app::domain::types::SessionId,
694    request_id: crate::app::domain::types::RequestId,
695    decision: ApprovalDecision,
696    remember: Option<ApprovalMemory>,
697) -> Vec<Effect> {
698    let mut effects = Vec::new();
699
700    let pending = match state.pending_approval.take() {
701        Some(p) if p.request_id == request_id => p,
702        other => {
703            state.pending_approval = other;
704            return effects;
705        }
706    };
707
708    let resolved_memory = if decision == ApprovalDecision::Approved {
709        match remember {
710            Some(ApprovalMemory::PendingTool) => {
711                Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
712            }
713            Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
714            Some(ApprovalMemory::BashPattern(pattern)) => {
715                Some(ApprovalMemory::BashPattern(pattern))
716            }
717            None => None,
718        }
719    } else {
720        None
721    };
722
723    effects.push(Effect::EmitEvent {
724        session_id,
725        event: SessionEvent::ApprovalDecided {
726            request_id,
727            decision,
728            remember: resolved_memory.clone(),
729        },
730    });
731
732    if decision == ApprovalDecision::Approved {
733        if let Some(ref memory) = resolved_memory {
734            match memory {
735                ApprovalMemory::Tool(name) => {
736                    state.approved_tools.insert(name.clone());
737                }
738                ApprovalMemory::BashPattern(pattern) => {
739                    state.approved_bash_patterns.insert(pattern.clone());
740                }
741                ApprovalMemory::PendingTool => {}
742            }
743        }
744
745        let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
746            effects.push(Effect::EmitEvent {
747                session_id,
748                event: SessionEvent::Error {
749                    message: "Tool approval decided without active operation".to_string(),
750                },
751            });
752            return effects;
753        };
754        state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
755            &pending.tool_call.id,
756        ));
757
758        effects.push(Effect::ExecuteTool {
759            session_id,
760            op_id,
761            tool_call: pending.tool_call,
762        });
763    } else {
764        let tool_name = pending.tool_call.name.clone();
765        let error = ToolError::DeniedByUser(tool_name.clone());
766        effects.extend(fail_tool_call_without_execution(
767            state,
768            session_id,
769            pending.tool_call,
770            error,
771            format!("Tool '{tool_name}' denied by user"),
772            "denied",
773            false,
774        ));
775    }
776
777    effects.extend(process_next_queued_approval(state, session_id));
778
779    effects
780}
781
782fn process_next_queued_approval(
783    state: &mut AppState,
784    session_id: crate::app::domain::types::SessionId,
785) -> Vec<Effect> {
786    let mut effects = Vec::new();
787
788    while let Some(queued) = state.approval_queue.pop_front() {
789        let decision = get_tool_decision(state, &queued.tool_call);
790
791        match decision {
792            ToolDecision::Allow => {
793                let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
794                    effects.push(Effect::EmitEvent {
795                        session_id,
796                        event: SessionEvent::Error {
797                            message: "Queued tool approval processed without active operation"
798                                .to_string(),
799                        },
800                    });
801                    state.approval_queue.push_front(queued);
802                    break;
803                };
804                state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
805                    &queued.tool_call.id,
806                ));
807
808                effects.push(Effect::ExecuteTool {
809                    session_id,
810                    op_id,
811                    tool_call: queued.tool_call,
812                });
813            }
814            ToolDecision::Deny => {
815                let tool_name = queued.tool_call.name.clone();
816                let error = ToolError::DeniedByPolicy(tool_name.clone());
817                effects.extend(fail_tool_call_without_execution(
818                    state,
819                    session_id,
820                    queued.tool_call,
821                    error,
822                    format!("Tool '{tool_name}' denied by policy"),
823                    "denied",
824                    false,
825                ));
826            }
827            ToolDecision::Ask => {
828                let request_id = crate::app::domain::types::RequestId::new();
829                state.pending_approval = Some(PendingApproval {
830                    request_id,
831                    tool_call: queued.tool_call.clone(),
832                });
833
834                effects.push(Effect::EmitEvent {
835                    session_id,
836                    event: SessionEvent::ApprovalRequested {
837                        request_id,
838                        tool_call: queued.tool_call.clone(),
839                    },
840                });
841
842                effects.push(Effect::RequestUserApproval {
843                    session_id,
844                    request_id,
845                    tool_call: queued.tool_call,
846                });
847
848                break;
849            }
850        }
851    }
852
853    let all_tools_complete = state
854        .current_operation
855        .as_ref()
856        .is_none_or(|op| op.pending_tool_calls.is_empty());
857    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
858
859    if all_tools_complete
860        && no_pending_approvals
861        && let Some(op) = &state.current_operation
862    {
863        let op_id = op.op_id;
864        if let Some(model) = state.operation_models.get(&op_id).cloned() {
865            effects.push(Effect::CallModel {
866                session_id,
867                op_id,
868                model,
869                messages: state
870                    .message_graph
871                    .get_thread_messages()
872                    .into_iter()
873                    .cloned()
874                    .collect(),
875                system_context: state.cached_system_context.clone(),
876                tools: state.tools.clone(),
877            });
878        }
879    }
880
881    effects
882}
883
884fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
885    if state.approved_tools.contains(&tool_call.name) {
886        return ToolDecision::Allow;
887    }
888
889    if tool_call.name == DISPATCH_AGENT_TOOL_NAME
890        && let Ok(params) =
891            serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
892        && let Some(config) = state.session_config.as_ref()
893    {
894        let policy = &config.tool_config.approval_policy;
895        match params.target {
896            DispatchAgentTarget::Resume { .. } => {
897                return ToolDecision::Allow;
898            }
899            DispatchAgentTarget::New { agent, .. } => {
900                let agent_id = agent
901                    .as_deref()
902                    .filter(|value| !value.trim().is_empty())
903                    .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
904                if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
905                    return ToolDecision::Allow;
906                }
907            }
908        }
909    }
910
911    if tool_call.name == BASH_TOOL_NAME
912        && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
913            tool_call.parameters.clone(),
914        )
915        && state.is_bash_pattern_approved(&params.command)
916    {
917        return ToolDecision::Allow;
918    }
919
920    state
921        .session_config
922        .as_ref()
923        .map_or(ToolDecision::Ask, |config| {
924            config
925                .tool_config
926                .approval_policy
927                .tool_decision(&tool_call.name)
928        })
929}
930
931fn handle_tool_execution_started(
932    state: &mut AppState,
933    session_id: crate::app::domain::types::SessionId,
934    tool_call_id: crate::app::domain::types::ToolCallId,
935    tool_name: String,
936    tool_parameters: serde_json::Value,
937) -> Vec<Effect> {
938    state.add_pending_tool_call(tool_call_id.clone());
939
940    let op_id = match state.current_operation.as_ref() {
941        Some(op) => op.op_id,
942        None => {
943            return vec![Effect::EmitEvent {
944                session_id,
945                event: SessionEvent::Error {
946                    message: "Tool call started without active operation".to_string(),
947                },
948            }];
949        }
950    };
951
952    let is_direct_bash = matches!(
953        state.current_operation.as_ref().map(|op| &op.kind),
954        Some(OperationKind::DirectBash { .. })
955    );
956
957    if is_direct_bash {
958        return vec![];
959    }
960
961    let model = match state.operation_models.get(&op_id).cloned() {
962        Some(model) => model,
963        None => {
964            return vec![Effect::EmitEvent {
965                session_id,
966                event: SessionEvent::Error {
967                    message: format!("Missing model for tool call on operation {op_id}"),
968                },
969            }];
970        }
971    };
972
973    vec![Effect::EmitEvent {
974        session_id,
975        event: SessionEvent::ToolCallStarted {
976            id: tool_call_id,
977            name: tool_name,
978            parameters: tool_parameters,
979            model,
980        },
981    }]
982}
983
984fn handle_tool_result(
985    state: &mut AppState,
986    session_id: crate::app::domain::types::SessionId,
987    tool_call_id: crate::app::domain::types::ToolCallId,
988    tool_name: String,
989    result: Result<ToolResult, ToolError>,
990) -> Vec<Effect> {
991    let mut effects = Vec::new();
992
993    let op = match &state.current_operation {
994        Some(op) => {
995            if state.cancelled_ops.contains(&op.op_id) {
996                tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
997                return effects;
998            }
999            op.clone()
1000        }
1001        None => return effects,
1002    };
1003    let op_id = op.op_id;
1004
1005    state.remove_pending_tool_call(&tool_call_id);
1006
1007    let tool_result = match result {
1008        Ok(r) => r,
1009        Err(e) => ToolResult::Error(e),
1010    };
1011
1012    let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1013
1014    if is_direct_bash {
1015        let command = match &op.kind {
1016            OperationKind::DirectBash { command } => command.clone(),
1017            _ => tool_name,
1018        };
1019
1020        let (stdout, stderr, exit_code) = match &tool_result {
1021            ToolResult::Bash(result) => (
1022                result.stdout.clone(),
1023                result.stderr.clone(),
1024                result.exit_code,
1025            ),
1026            ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1027            other => (format!("{other:?}"), String::new(), 0),
1028        };
1029
1030        let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1031            state
1032                .message_graph
1033                .update_command_execution(
1034                    id.as_str(),
1035                    command.clone(),
1036                    stdout.clone(),
1037                    stderr.clone(),
1038                    exit_code,
1039                )
1040                .or_else(|| {
1041                    let parent_id = state.message_graph.active_message_id.clone();
1042                    let timestamp = Message::current_timestamp();
1043                    Some(Message {
1044                        data: MessageData::User {
1045                            content: vec![UserContent::CommandExecution {
1046                                command: command.clone(),
1047                                stdout: stdout.clone(),
1048                                stderr: stderr.clone(),
1049                                exit_code,
1050                            }],
1051                        },
1052                        timestamp,
1053                        id: id.to_string(),
1054                        parent_message_id: parent_id,
1055                    })
1056                })
1057        });
1058
1059        state.complete_operation(op_id);
1060
1061        if let Some(message) = updated {
1062            effects.push(Effect::EmitEvent {
1063                session_id,
1064                event: SessionEvent::MessageUpdated { message },
1065            });
1066        }
1067
1068        effects.push(Effect::EmitEvent {
1069            session_id,
1070            event: SessionEvent::OperationCompleted { op_id },
1071        });
1072
1073        effects.extend(maybe_start_queued_work(state, session_id));
1074
1075        return effects;
1076    }
1077
1078    let model = match state.operation_models.get(&op_id).cloned() {
1079        Some(model) => model,
1080        None => {
1081            return vec![Effect::EmitEvent {
1082                session_id,
1083                event: SessionEvent::Error {
1084                    message: format!("Missing model for tool result on operation {op_id}"),
1085                },
1086            }];
1087        }
1088    };
1089
1090    let event = match &tool_result {
1091        ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1092            id: tool_call_id.clone(),
1093            name: tool_name.clone(),
1094            error: e.to_string(),
1095            model: model.clone(),
1096        },
1097        _ => SessionEvent::ToolCallCompleted {
1098            id: tool_call_id.clone(),
1099            name: tool_name,
1100            result: tool_result.clone(),
1101            model: model.clone(),
1102        },
1103    };
1104
1105    effects.push(Effect::EmitEvent { session_id, event });
1106
1107    let parent_id = state.message_graph.active_message_id.clone();
1108    let tool_message = Message {
1109        data: MessageData::Tool {
1110            tool_use_id: tool_call_id.0.clone(),
1111            result: tool_result,
1112        },
1113        timestamp: 0,
1114        id: format!("tool_result_{}", tool_call_id.0),
1115        parent_message_id: parent_id,
1116    };
1117    state.message_graph.add_message(tool_message.clone());
1118
1119    effects.push(Effect::EmitEvent {
1120        session_id,
1121        event: SessionEvent::ToolMessageAdded {
1122            message: tool_message,
1123        },
1124    });
1125
1126    let all_tools_complete = state
1127        .current_operation
1128        .as_ref()
1129        .is_none_or(|op| op.pending_tool_calls.is_empty());
1130    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1131
1132    if all_tools_complete && no_pending_approvals {
1133        effects.push(Effect::CallModel {
1134            session_id,
1135            op_id,
1136            model,
1137            messages: state
1138                .message_graph
1139                .get_thread_messages()
1140                .into_iter()
1141                .cloned()
1142                .collect(),
1143            system_context: state.cached_system_context.clone(),
1144            tools: state.tools.clone(),
1145        });
1146    }
1147
1148    effects
1149}
1150
1151fn handle_model_response_complete(
1152    state: &mut AppState,
1153    session_id: crate::app::domain::types::SessionId,
1154    op_id: crate::app::domain::types::OpId,
1155    message_id: crate::app::domain::types::MessageId,
1156    content: Vec<AssistantContent>,
1157    timestamp: u64,
1158) -> Vec<Effect> {
1159    let mut effects = Vec::new();
1160
1161    if state.cancelled_ops.contains(&op_id) {
1162        tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1163        return effects;
1164    }
1165
1166    let tool_calls: Vec<_> = content
1167        .iter()
1168        .filter_map(|c| {
1169            if let AssistantContent::ToolCall { tool_call, .. } = c {
1170                Some(tool_call.clone())
1171            } else {
1172                None
1173            }
1174        })
1175        .collect();
1176
1177    let parent_id = state.message_graph.active_message_id.clone();
1178
1179    let message = Message {
1180        data: MessageData::Assistant {
1181            content: content.clone(),
1182        },
1183        timestamp,
1184        id: message_id.0.clone(),
1185        parent_message_id: parent_id,
1186    };
1187
1188    state.message_graph.add_message(message.clone());
1189    state.message_graph.active_message_id = Some(message_id.0.clone());
1190
1191    let model = match state.operation_models.get(&op_id).cloned() {
1192        Some(model) => model,
1193        None => {
1194            return vec![Effect::EmitEvent {
1195                session_id,
1196                event: SessionEvent::Error {
1197                    message: format!("Missing model for operation {op_id}"),
1198                },
1199            }];
1200        }
1201    };
1202
1203    effects.push(Effect::EmitEvent {
1204        session_id,
1205        event: SessionEvent::AssistantMessageAdded { message, model },
1206    });
1207
1208    if tool_calls.is_empty() {
1209        state.complete_operation(op_id);
1210        effects.push(Effect::EmitEvent {
1211            session_id,
1212            event: SessionEvent::OperationCompleted { op_id },
1213        });
1214        effects.extend(maybe_start_queued_work(state, session_id));
1215    } else {
1216        for tool_call in tool_calls {
1217            let request_id = crate::app::domain::types::RequestId::new();
1218            effects.extend(handle_tool_approval_requested(
1219                state, session_id, request_id, tool_call,
1220            ));
1221        }
1222    }
1223
1224    effects
1225}
1226
1227fn handle_model_response_error(
1228    state: &mut AppState,
1229    session_id: crate::app::domain::types::SessionId,
1230    op_id: crate::app::domain::types::OpId,
1231    error: &str,
1232) -> Vec<Effect> {
1233    let mut effects = Vec::new();
1234
1235    if state.cancelled_ops.contains(&op_id) {
1236        return effects;
1237    }
1238
1239    state.complete_operation(op_id);
1240
1241    effects.push(Effect::EmitEvent {
1242        session_id,
1243        event: SessionEvent::Error {
1244            message: error.to_string(),
1245        },
1246    });
1247
1248    effects.push(Effect::EmitEvent {
1249        session_id,
1250        event: SessionEvent::OperationCompleted { op_id },
1251    });
1252
1253    effects.extend(maybe_start_queued_work(state, session_id));
1254
1255    effects
1256}
1257
1258fn handle_direct_bash(
1259    state: &mut AppState,
1260    session_id: crate::app::domain::types::SessionId,
1261    op_id: crate::app::domain::types::OpId,
1262    message_id: crate::app::domain::types::MessageId,
1263    command: String,
1264    timestamp: u64,
1265) -> Vec<Effect> {
1266    let mut effects = Vec::new();
1267
1268    if state.has_active_operation() {
1269        state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1270            command,
1271            op_id,
1272            message_id,
1273            queued_at: timestamp,
1274        });
1275        effects.push(Effect::EmitEvent {
1276            session_id,
1277            event: SessionEvent::QueueUpdated {
1278                queue: snapshot_queue(state),
1279            },
1280        });
1281        return effects;
1282    }
1283
1284    let parent_id = state.message_graph.active_message_id.clone();
1285    let message = Message {
1286        data: MessageData::User {
1287            content: vec![UserContent::CommandExecution {
1288                command: command.clone(),
1289                stdout: String::new(),
1290                stderr: String::new(),
1291                exit_code: 0,
1292            }],
1293        },
1294        timestamp,
1295        id: message_id.0.clone(),
1296        parent_message_id: parent_id,
1297    };
1298
1299    state.message_graph.add_message(message.clone());
1300    state.message_graph.active_message_id = Some(message_id.0.clone());
1301
1302    state.start_operation(
1303        op_id,
1304        OperationKind::DirectBash {
1305            command: command.clone(),
1306        },
1307    );
1308    state.operation_messages.insert(op_id, message_id);
1309
1310    effects.push(Effect::EmitEvent {
1311        session_id,
1312        event: SessionEvent::UserMessageAdded { message },
1313    });
1314
1315    effects.push(Effect::EmitEvent {
1316        session_id,
1317        event: SessionEvent::OperationStarted {
1318            op_id,
1319            kind: OperationKind::DirectBash {
1320                command: command.clone(),
1321            },
1322        },
1323    });
1324
1325    let tool_call = steer_tools::ToolCall {
1326        id: format!("direct_bash_{op_id}"),
1327        name: BASH_TOOL_NAME.to_string(),
1328        parameters: serde_json::json!({ "command": command }),
1329    };
1330
1331    effects.push(Effect::ExecuteTool {
1332        session_id,
1333        op_id,
1334        tool_call,
1335    });
1336
1337    effects
1338}
1339
1340fn handle_dequeue_queued_item(
1341    state: &mut AppState,
1342    session_id: crate::app::domain::types::SessionId,
1343) -> Result<Vec<Effect>, ReduceError> {
1344    if state.pop_next_queued_work().is_some() {
1345        Ok(vec![Effect::EmitEvent {
1346            session_id,
1347            event: SessionEvent::QueueUpdated {
1348                queue: snapshot_queue(state),
1349            },
1350        }])
1351    } else {
1352        Err(invalid_action(
1353            InvalidActionKind::QueueEmpty,
1354            "No queued item to remove.",
1355        ))
1356    }
1357}
1358
1359fn maybe_start_queued_work(
1360    state: &mut AppState,
1361    session_id: crate::app::domain::types::SessionId,
1362) -> Vec<Effect> {
1363    if state.has_active_operation() {
1364        return vec![];
1365    }
1366
1367    let Some(next) = state.pop_next_queued_work() else {
1368        return vec![];
1369    };
1370
1371    let mut effects = vec![Effect::EmitEvent {
1372        session_id,
1373        event: SessionEvent::QueueUpdated {
1374            queue: snapshot_queue(state),
1375        },
1376    }];
1377
1378    match next {
1379        QueuedWorkItem::UserMessage(item) => {
1380            effects.extend(handle_user_input(
1381                state,
1382                session_id,
1383                item.text,
1384                item.op_id,
1385                item.message_id,
1386                item.model,
1387                item.queued_at,
1388            ));
1389        }
1390        QueuedWorkItem::DirectBash(item) => {
1391            effects.extend(handle_direct_bash(
1392                state,
1393                session_id,
1394                item.op_id,
1395                item.message_id,
1396                item.command,
1397                item.queued_at,
1398            ));
1399        }
1400    }
1401
1402    effects
1403}
1404
1405fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1406    state
1407        .queued_work
1408        .iter()
1409        .map(snapshot_queued_work_item)
1410        .collect()
1411}
1412
1413fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1414    match item {
1415        QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1416            kind: Some(QueuedWorkKind::UserMessage),
1417            content: message.text.as_str().to_string(),
1418            queued_at: message.queued_at,
1419            model: Some(message.model.clone()),
1420            op_id: message.op_id,
1421            message_id: message.message_id.clone(),
1422        },
1423        QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1424            kind: Some(QueuedWorkKind::DirectBash),
1425            content: command.command.clone(),
1426            queued_at: command.queued_at,
1427            model: None,
1428            op_id: command.op_id,
1429            message_id: command.message_id.clone(),
1430        },
1431    }
1432}
1433
1434fn handle_request_compaction(
1435    state: &mut AppState,
1436    session_id: crate::app::domain::types::SessionId,
1437    op_id: crate::app::domain::types::OpId,
1438    model: crate::config::model::ModelId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440    const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1441    let message_count = state.message_graph.get_thread_messages().len();
1442
1443    if state.has_active_operation() {
1444        return Err(invalid_action(
1445            InvalidActionKind::OperationInFlight,
1446            "Cannot compact while an operation is active.",
1447        ));
1448    }
1449
1450    if message_count < MIN_MESSAGES_FOR_COMPACT {
1451        return Ok(vec![Effect::EmitEvent {
1452            session_id,
1453            event: SessionEvent::CompactResult {
1454                result: crate::app::domain::event::CompactResult::InsufficientMessages,
1455            },
1456        }]);
1457    }
1458
1459    state.start_operation(op_id, OperationKind::Compact);
1460    state.operation_models.insert(op_id, model.clone());
1461
1462    Ok(vec![
1463        Effect::EmitEvent {
1464            session_id,
1465            event: SessionEvent::OperationStarted {
1466                op_id,
1467                kind: OperationKind::Compact,
1468            },
1469        },
1470        Effect::RequestCompaction {
1471            session_id,
1472            op_id,
1473            model,
1474        },
1475    ])
1476}
1477
1478fn handle_cancel(
1479    state: &mut AppState,
1480    session_id: crate::app::domain::types::SessionId,
1481    target_op: Option<crate::app::domain::types::OpId>,
1482) -> Vec<Effect> {
1483    let mut effects = Vec::new();
1484
1485    let op = match &state.current_operation {
1486        Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1487        _ => return effects,
1488    };
1489
1490    state.record_cancelled_op(op.op_id);
1491
1492    if matches!(op.kind, OperationKind::Compact) {
1493        effects.push(Effect::EmitEvent {
1494            session_id,
1495            event: SessionEvent::CompactResult {
1496                result: crate::app::domain::event::CompactResult::Cancelled,
1497            },
1498        });
1499    }
1500
1501    let pending_approval = state.pending_approval.take();
1502    let queued_approvals = std::mem::take(&mut state.approval_queue);
1503
1504    if matches!(op.kind, OperationKind::AgentLoop) {
1505        if let Some(pending) = pending_approval {
1506            let tool_name = pending.tool_call.name.clone();
1507            effects.extend(emit_tool_failure_message(
1508                state,
1509                session_id,
1510                &pending.tool_call.id,
1511                &tool_name,
1512                ToolError::Cancelled(tool_name.clone()),
1513                format!("Tool '{tool_name}' cancelled"),
1514                "cancelled",
1515            ));
1516        }
1517
1518        for queued in queued_approvals {
1519            let tool_name = queued.tool_call.name.clone();
1520            effects.extend(emit_tool_failure_message(
1521                state,
1522                session_id,
1523                &queued.tool_call.id,
1524                &tool_name,
1525                ToolError::Cancelled(tool_name.clone()),
1526                format!("Tool '{tool_name}' cancelled"),
1527                "cancelled",
1528            ));
1529        }
1530
1531        for tool_call_id in &op.pending_tool_calls {
1532            let tool_name = state
1533                .message_graph
1534                .find_tool_name_by_id(tool_call_id.as_str())
1535                .unwrap_or_else(|| tool_call_id.as_str().to_string());
1536            let event_error = if tool_name == tool_call_id.as_str() {
1537                format!("Tool call '{tool_call_id}' cancelled")
1538            } else {
1539                format!("Tool '{tool_name}' cancelled")
1540            };
1541            effects.extend(emit_tool_failure_message(
1542                state,
1543                session_id,
1544                tool_call_id.as_str(),
1545                &tool_name,
1546                ToolError::Cancelled(tool_name.clone()),
1547                event_error,
1548                "cancelled",
1549            ));
1550        }
1551    }
1552    state.active_streams.remove(&op.op_id);
1553
1554    let dequeued_item = state.pop_next_queued_work();
1555    let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1556
1557    effects.push(Effect::EmitEvent {
1558        session_id,
1559        event: SessionEvent::OperationCancelled {
1560            op_id: op.op_id,
1561            info: CancellationInfo {
1562                pending_tool_calls: op.pending_tool_calls.len(),
1563                popped_queued_item,
1564            },
1565        },
1566    });
1567
1568    effects.push(Effect::CancelOperation {
1569        session_id,
1570        op_id: op.op_id,
1571    });
1572
1573    state.complete_operation(op.op_id);
1574
1575    if dequeued_item.is_some() {
1576        effects.push(Effect::EmitEvent {
1577            session_id,
1578            event: SessionEvent::QueueUpdated {
1579                queue: snapshot_queue(state),
1580            },
1581        });
1582    }
1583
1584    effects
1585}
1586
1587fn handle_hydrate(
1588    state: &mut AppState,
1589    session_id: crate::app::domain::types::SessionId,
1590    events: Vec<SessionEvent>,
1591    starting_sequence: u64,
1592) -> Vec<Effect> {
1593    for event in events {
1594        apply_event_to_state(state, &event);
1595    }
1596
1597    state.event_sequence = starting_sequence;
1598
1599    emit_mcp_connect_effects(state, session_id)
1600}
1601
1602fn handle_switch_primary_agent(
1603    state: &mut AppState,
1604    session_id: crate::app::domain::types::SessionId,
1605    agent_id: String,
1606) -> Result<Vec<Effect>, ReduceError> {
1607    if state.current_operation.is_some() {
1608        return Err(invalid_action(
1609            InvalidActionKind::OperationInFlight,
1610            "Cannot switch primary agent while an operation is active.",
1611        ));
1612    }
1613
1614    let Some(base_config) = state
1615        .base_session_config
1616        .as_ref()
1617        .or(state.session_config.as_ref())
1618    else {
1619        return Err(invalid_action(
1620            InvalidActionKind::MissingSessionConfig,
1621            "Cannot switch primary agent without session config.",
1622        ));
1623    };
1624
1625    let Some(_spec) = primary_agent_spec(&agent_id) else {
1626        return Err(invalid_action(
1627            InvalidActionKind::UnknownPrimaryAgent,
1628            format!("Unknown primary agent '{agent_id}'."),
1629        ));
1630    };
1631
1632    let mut updated_config = base_config.clone();
1633    updated_config.primary_agent_id = Some(agent_id.clone());
1634    let new_config = resolve_effective_config(&updated_config);
1635    let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1636
1637    apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1638
1639    let mut effects = Vec::new();
1640    effects.push(Effect::EmitEvent {
1641        session_id,
1642        event: SessionEvent::SessionConfigUpdated {
1643            config: Box::new(new_config),
1644            primary_agent_id: agent_id,
1645        },
1646    });
1647    effects.extend(backend_effects);
1648    effects.push(Effect::ReloadToolSchemas { session_id });
1649
1650    Ok(effects)
1651}
1652
1653fn apply_session_config_state(
1654    state: &mut AppState,
1655    config: &crate::session::state::SessionConfig,
1656    primary_agent_id: Option<String>,
1657    update_base: bool,
1658) {
1659    state.apply_session_config(config, primary_agent_id, update_base);
1660}
1661
1662fn mcp_backend_diff_effects(
1663    session_id: crate::app::domain::types::SessionId,
1664    old_config: &crate::session::state::SessionConfig,
1665    new_config: &crate::session::state::SessionConfig,
1666) -> Vec<Effect> {
1667    let old_map = collect_mcp_backends(old_config);
1668    let new_map = collect_mcp_backends(new_config);
1669
1670    let mut effects = Vec::new();
1671
1672    for (server_name, (old_transport, old_filter)) in &old_map {
1673        match new_map.get(server_name) {
1674            None => {
1675                effects.push(Effect::DisconnectMcpServer {
1676                    session_id,
1677                    server_name: server_name.clone(),
1678                });
1679            }
1680            Some((new_transport, new_filter)) => {
1681                if new_transport != old_transport || new_filter != old_filter {
1682                    effects.push(Effect::DisconnectMcpServer {
1683                        session_id,
1684                        server_name: server_name.clone(),
1685                    });
1686                    effects.push(Effect::ConnectMcpServer {
1687                        session_id,
1688                        config: McpServerConfig {
1689                            server_name: server_name.clone(),
1690                            transport: new_transport.clone(),
1691                            tool_filter: new_filter.clone(),
1692                        },
1693                    });
1694                }
1695            }
1696        }
1697    }
1698
1699    for (server_name, (new_transport, new_filter)) in &new_map {
1700        if !old_map.contains_key(server_name) {
1701            effects.push(Effect::ConnectMcpServer {
1702                session_id,
1703                config: McpServerConfig {
1704                    server_name: server_name.clone(),
1705                    transport: new_transport.clone(),
1706                    tool_filter: new_filter.clone(),
1707                },
1708            });
1709        }
1710    }
1711
1712    effects
1713}
1714
1715fn collect_mcp_backends(
1716    config: &crate::session::state::SessionConfig,
1717) -> std::collections::HashMap<
1718    String,
1719    (
1720        crate::tools::McpTransport,
1721        crate::session::state::ToolFilter,
1722    ),
1723> {
1724    let mut map = std::collections::HashMap::new();
1725
1726    for backend_config in &config.tool_config.backends {
1727        let BackendConfig::Mcp {
1728            server_name,
1729            transport,
1730            tool_filter,
1731        } = backend_config;
1732
1733        map.insert(
1734            server_name.clone(),
1735            (transport.clone(), tool_filter.clone()),
1736        );
1737    }
1738
1739    map
1740}
1741
1742pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1743    match event {
1744        SessionEvent::SessionCreated { config, .. } => {
1745            let primary_agent_id = config
1746                .primary_agent_id
1747                .clone()
1748                .unwrap_or_else(|| default_primary_agent_id().to_string());
1749            apply_session_config_state(state, config, Some(primary_agent_id), true);
1750        }
1751        SessionEvent::SessionConfigUpdated {
1752            config,
1753            primary_agent_id,
1754        } => {
1755            apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1756        }
1757        SessionEvent::AssistantMessageAdded { message, .. }
1758        | SessionEvent::UserMessageAdded { message }
1759        | SessionEvent::ToolMessageAdded { message } => {
1760            state.message_graph.add_message(message.clone());
1761            state.message_graph.active_message_id = Some(message.id().to_string());
1762        }
1763        SessionEvent::MessageUpdated { message } => {
1764            state.message_graph.replace_message(message.clone());
1765        }
1766        SessionEvent::ApprovalDecided {
1767            decision, remember, ..
1768        } => {
1769            if *decision == ApprovalDecision::Approved
1770                && let Some(memory) = remember
1771            {
1772                match memory {
1773                    ApprovalMemory::Tool(name) => {
1774                        state.approved_tools.insert(name.clone());
1775                    }
1776                    ApprovalMemory::BashPattern(pattern) => {
1777                        state.approved_bash_patterns.insert(pattern.clone());
1778                    }
1779                    ApprovalMemory::PendingTool => {}
1780                }
1781            }
1782            state.pending_approval = None;
1783        }
1784        SessionEvent::OperationCompleted { op_id } => {
1785            state.complete_operation(*op_id);
1786        }
1787        SessionEvent::OperationCancelled { op_id, .. } => {
1788            state.record_cancelled_op(*op_id);
1789            state.complete_operation(*op_id);
1790        }
1791        SessionEvent::McpServerStateChanged {
1792            server_name,
1793            state: mcp_state,
1794        } => {
1795            state
1796                .mcp_servers
1797                .insert(server_name.clone(), mcp_state.clone());
1798        }
1799        SessionEvent::QueueUpdated { queue } => {
1800            let normalize_text = |content: &str| {
1801                NonEmptyString::new(content.to_string())
1802                    .or_else(|| NonEmptyString::new("(empty)".to_string()))
1803            };
1804
1805            state.queued_work = queue
1806                .iter()
1807                .filter_map(|item| match item.kind {
1808                    Some(QueuedWorkKind::UserMessage) => {
1809                        let text = normalize_text(item.content.as_str())?;
1810                        Some(QueuedWorkItem::UserMessage(
1811                            crate::app::domain::state::QueuedUserMessage {
1812                                text,
1813                                op_id: item.op_id,
1814                                message_id: item.message_id.clone(),
1815                                model: item.model.clone().unwrap_or_else(
1816                                    crate::config::model::builtin::claude_sonnet_4_5,
1817                                ),
1818                                queued_at: item.queued_at,
1819                            },
1820                        ))
1821                    }
1822                    Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1823                        crate::app::domain::state::QueuedBashCommand {
1824                            command: item.content.clone(),
1825                            op_id: item.op_id,
1826                            message_id: item.message_id.clone(),
1827                            queued_at: item.queued_at,
1828                        },
1829                    )),
1830                    None => {
1831                        let text = normalize_text(item.content.as_str())?;
1832                        Some(QueuedWorkItem::UserMessage(
1833                            crate::app::domain::state::QueuedUserMessage {
1834                                text,
1835                                op_id: item.op_id,
1836                                message_id: item.message_id.clone(),
1837                                model: item.model.clone().unwrap_or_else(
1838                                    crate::config::model::builtin::claude_sonnet_4_5,
1839                                ),
1840                                queued_at: item.queued_at,
1841                            },
1842                        ))
1843                    }
1844                })
1845                .collect();
1846        }
1847        _ => {}
1848    }
1849
1850    state.event_sequence += 1;
1851}
1852
1853struct CompactionCompleteParams {
1854    op_id: crate::app::domain::types::OpId,
1855    compaction_id: crate::app::domain::types::CompactionId,
1856    summary_message_id: crate::app::domain::types::MessageId,
1857    summary: String,
1858    compacted_head_message_id: crate::app::domain::types::MessageId,
1859    previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1860    model_name: String,
1861    timestamp: u64,
1862}
1863
1864fn handle_compaction_complete(
1865    state: &mut AppState,
1866    session_id: crate::app::domain::types::SessionId,
1867    params: CompactionCompleteParams,
1868) -> Vec<Effect> {
1869    use crate::app::conversation::{AssistantContent, Message, MessageData};
1870    use crate::app::domain::types::CompactionRecord;
1871
1872    let CompactionCompleteParams {
1873        op_id,
1874        compaction_id,
1875        summary_message_id,
1876        summary,
1877        compacted_head_message_id,
1878        previous_active_message_id,
1879        model_name,
1880        timestamp,
1881    } = params;
1882
1883    let summary_message = Message {
1884        data: MessageData::Assistant {
1885            content: vec![AssistantContent::Text {
1886                text: summary.clone(),
1887            }],
1888        },
1889        id: summary_message_id.to_string(),
1890        parent_message_id: None,
1891        timestamp,
1892    };
1893
1894    state.message_graph.add_message(summary_message.clone());
1895
1896    let record = CompactionRecord::with_timestamp(
1897        compaction_id,
1898        summary_message_id,
1899        compacted_head_message_id,
1900        previous_active_message_id,
1901        model_name,
1902        timestamp,
1903    );
1904
1905    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1906        model
1907    } else {
1908        state.complete_operation(op_id);
1909        return vec![Effect::EmitEvent {
1910            session_id,
1911            event: SessionEvent::Error {
1912                message: format!("Missing model for compaction operation {op_id}"),
1913            },
1914        }];
1915    };
1916
1917    state.complete_operation(op_id);
1918
1919    let mut effects = vec![
1920        Effect::EmitEvent {
1921            session_id,
1922            event: SessionEvent::AssistantMessageAdded {
1923                message: summary_message,
1924                model,
1925            },
1926        },
1927        Effect::EmitEvent {
1928            session_id,
1929            event: SessionEvent::CompactResult {
1930                result: crate::app::domain::event::CompactResult::Success(summary),
1931            },
1932        },
1933        Effect::EmitEvent {
1934            session_id,
1935            event: SessionEvent::ConversationCompacted { record },
1936        },
1937        Effect::EmitEvent {
1938            session_id,
1939            event: SessionEvent::OperationCompleted { op_id },
1940        },
1941    ];
1942
1943    effects.extend(maybe_start_queued_work(state, session_id));
1944
1945    effects
1946}
1947
1948fn handle_compaction_failed(
1949    state: &mut AppState,
1950    session_id: crate::app::domain::types::SessionId,
1951    op_id: crate::app::domain::types::OpId,
1952    error: String,
1953) -> Vec<Effect> {
1954    state.complete_operation(op_id);
1955
1956    let mut effects = vec![
1957        Effect::EmitEvent {
1958            session_id,
1959            event: SessionEvent::Error { message: error },
1960        },
1961        Effect::EmitEvent {
1962            session_id,
1963            event: SessionEvent::OperationCompleted { op_id },
1964        },
1965    ];
1966
1967    effects.extend(maybe_start_queued_work(state, session_id));
1968
1969    effects
1970}
1971
1972fn emit_mcp_connect_effects(
1973    state: &AppState,
1974    session_id: crate::app::domain::types::SessionId,
1975) -> Vec<Effect> {
1976    let mut effects = Vec::new();
1977
1978    let Some(ref config) = state.session_config else {
1979        return effects;
1980    };
1981
1982    for backend_config in &config.tool_config.backends {
1983        let BackendConfig::Mcp {
1984            server_name,
1985            transport,
1986            tool_filter,
1987        } = backend_config;
1988
1989        let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
1990            matches!(
1991                s,
1992                McpServerState::Connecting | McpServerState::Connected { .. }
1993            )
1994        });
1995
1996        if !already_connected {
1997            effects.push(Effect::ConnectMcpServer {
1998                session_id,
1999                config: McpServerConfig {
2000                    server_name: server_name.clone(),
2001                    transport: transport.clone(),
2002                    tool_filter: tool_filter.clone(),
2003                },
2004            });
2005        }
2006    }
2007
2008    effects
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013    use super::*;
2014    use crate::app::domain::state::{OperationState, PendingApproval};
2015    use crate::app::domain::types::{
2016        MessageId, NonEmptyString, OpId, RequestId, SessionId, ToolCallId,
2017    };
2018    use crate::config::model::builtin;
2019    use crate::primary_agents::resolve_effective_config;
2020    use crate::session::state::{
2021        ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2022        ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2023    };
2024    use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2025    use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2026    use schemars::schema_for;
2027    use serde_json::json;
2028    use std::collections::HashSet;
2029    use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2030
2031    fn test_state() -> AppState {
2032        AppState::new(SessionId::new())
2033    }
2034
2035    fn test_schema(name: &str) -> ToolSchema {
2036        ToolSchema {
2037            name: name.to_string(),
2038            display_name: name.to_string(),
2039            description: String::new(),
2040            input_schema: InputSchema::empty_object(),
2041        }
2042    }
2043
2044    fn base_session_config() -> SessionConfig {
2045        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2046        config.primary_agent_id = Some("normal".to_string());
2047        config.policy_overrides = SessionPolicyOverrides::empty();
2048        resolve_effective_config(&config)
2049    }
2050
2051    fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2052        super::reduce(state, action).expect("reduce failed")
2053    }
2054
2055    #[test]
2056    fn test_user_input_starts_operation() {
2057        let mut state = test_state();
2058        let session_id = state.session_id;
2059        let op_id = OpId::new();
2060        let message_id = MessageId::new();
2061        let model = builtin::claude_sonnet_4_5();
2062
2063        let effects = reduce(
2064            &mut state,
2065            Action::UserInput {
2066                session_id,
2067                text: NonEmptyString::new("Hello").unwrap(),
2068                op_id,
2069                message_id,
2070                model,
2071                timestamp: 1_234_567_890,
2072            },
2073        );
2074
2075        assert_eq!(state.message_graph.messages.len(), 1);
2076        assert!(state.current_operation.is_some());
2077        assert!(
2078            effects
2079                .iter()
2080                .any(|e| matches!(e, Effect::CallModel { .. }))
2081        );
2082    }
2083
2084    #[test]
2085    fn test_switch_primary_agent_updates_visibility() {
2086        let mut state = test_state();
2087        let session_id = state.session_id;
2088        let config = base_session_config();
2089        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2090
2091        let effects = reduce(
2092            &mut state,
2093            Action::SwitchPrimaryAgent {
2094                session_id,
2095                agent_id: "plan".to_string(),
2096            },
2097        );
2098
2099        let updated = state.session_config.as_ref().expect("config");
2100        match &updated.tool_config.visibility {
2101            ToolVisibility::Whitelist(allowed) => {
2102                assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2103                for name in READ_ONLY_TOOL_NAMES {
2104                    assert!(allowed.contains(*name));
2105                }
2106                assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2107            }
2108            other => panic!("Unexpected tool visibility: {other:?}"),
2109        }
2110        assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2111        assert!(effects.iter().any(|e| matches!(
2112            e,
2113            Effect::EmitEvent {
2114                event: SessionEvent::SessionConfigUpdated { .. },
2115                ..
2116            }
2117        )));
2118        assert!(
2119            effects
2120                .iter()
2121                .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2122        );
2123    }
2124
2125    #[test]
2126    fn test_switch_primary_agent_yolo_auto_approves() {
2127        let mut state = test_state();
2128        let session_id = state.session_id;
2129        let config = base_session_config();
2130        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2131
2132        let _ = reduce(
2133            &mut state,
2134            Action::SwitchPrimaryAgent {
2135                session_id,
2136                agent_id: "yolo".to_string(),
2137            },
2138        );
2139
2140        let updated = state.session_config.as_ref().expect("config");
2141        assert_eq!(
2142            updated.tool_config.approval_policy.default_behavior,
2143            UnapprovedBehavior::Allow
2144        );
2145    }
2146
2147    #[test]
2148    fn test_switch_primary_agent_preserves_policy_overrides() {
2149        let mut state = test_state();
2150        let session_id = state.session_id;
2151
2152        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2153        config.primary_agent_id = Some("normal".to_string());
2154        config.policy_overrides = SessionPolicyOverrides {
2155            default_model: None,
2156            tool_visibility: None,
2157            approval_policy: ToolApprovalPolicyOverrides {
2158                default_behavior: Some(UnapprovedBehavior::Deny),
2159                preapproved: ApprovalRulesOverrides::empty(),
2160            },
2161        };
2162        let config = resolve_effective_config(&config);
2163        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2164
2165        let _ = reduce(
2166            &mut state,
2167            Action::SwitchPrimaryAgent {
2168                session_id,
2169                agent_id: "yolo".to_string(),
2170            },
2171        );
2172
2173        let updated = state.session_config.as_ref().expect("config");
2174        assert_eq!(
2175            updated.tool_config.approval_policy.default_behavior,
2176            UnapprovedBehavior::Deny
2177        );
2178        assert_eq!(
2179            updated.policy_overrides.approval_policy.default_behavior,
2180            Some(UnapprovedBehavior::Deny)
2181        );
2182    }
2183
2184    #[test]
2185    fn dispatch_agent_resume_is_auto_approved() {
2186        let mut state = test_state();
2187        let session_id = state.session_id;
2188        let config = base_session_config();
2189        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2190
2191        let tool_call = ToolCall {
2192            id: "tc_dispatch_resume".to_string(),
2193            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2194            parameters: json!({
2195                "prompt": "resume work",
2196                "target": {
2197                    "session": "resume",
2198                    "session_id": SessionId::new().to_string()
2199                }
2200            }),
2201        };
2202
2203        let decision = get_tool_decision(&state, &tool_call);
2204        assert_eq!(decision, ToolDecision::Allow);
2205        assert_eq!(state.session_id, session_id);
2206    }
2207
2208    #[test]
2209    fn test_switch_primary_agent_restores_base_prompt() {
2210        let mut state = test_state();
2211        let session_id = state.session_id;
2212        let mut config = base_session_config();
2213        config.system_prompt = Some("base prompt".to_string());
2214        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2215
2216        let _ = reduce(
2217            &mut state,
2218            Action::SwitchPrimaryAgent {
2219                session_id,
2220                agent_id: "plan".to_string(),
2221            },
2222        );
2223
2224        let _ = reduce(
2225            &mut state,
2226            Action::SwitchPrimaryAgent {
2227                session_id,
2228                agent_id: "normal".to_string(),
2229            },
2230        );
2231
2232        let updated = state.session_config.as_ref().expect("config");
2233        assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2234    }
2235
2236    #[test]
2237    fn test_switch_primary_agent_blocked_during_operation() {
2238        let mut state = test_state();
2239        let session_id = state.session_id;
2240        let config = base_session_config();
2241        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2242
2243        state.current_operation = Some(OperationState {
2244            op_id: OpId::new(),
2245            kind: OperationKind::AgentLoop,
2246            pending_tool_calls: HashSet::new(),
2247        });
2248
2249        let result = super::reduce(
2250            &mut state,
2251            Action::SwitchPrimaryAgent {
2252                session_id,
2253                agent_id: "plan".to_string(),
2254            },
2255        );
2256
2257        assert!(matches!(
2258            result,
2259            Err(ReduceError::InvalidAction {
2260                kind: InvalidActionKind::OperationInFlight,
2261                ..
2262            })
2263        ));
2264        assert!(state.primary_agent_id.as_deref() == Some("normal"));
2265    }
2266
2267    #[test]
2268    fn test_late_result_ignored_after_cancel() {
2269        let mut state = test_state();
2270        let session_id = state.session_id;
2271        let op_id = OpId::new();
2272        let tool_call_id = ToolCallId::from_string("tc_1");
2273
2274        state.current_operation = Some(OperationState {
2275            op_id,
2276            kind: OperationKind::AgentLoop,
2277            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2278        });
2279
2280        let _ = reduce(
2281            &mut state,
2282            Action::Cancel {
2283                session_id,
2284                op_id: None,
2285            },
2286        );
2287
2288        state.current_operation = Some(OperationState {
2289            op_id,
2290            kind: OperationKind::AgentLoop,
2291            pending_tool_calls: HashSet::new(),
2292        });
2293        state
2294            .operation_models
2295            .insert(op_id, builtin::claude_sonnet_4_5());
2296        state
2297            .operation_models
2298            .insert(op_id, builtin::claude_sonnet_4_5());
2299        state
2300            .operation_models
2301            .insert(op_id, builtin::claude_sonnet_4_5());
2302
2303        let effects = reduce(
2304            &mut state,
2305            Action::ToolResult {
2306                session_id,
2307                tool_call_id,
2308                tool_name: "test".to_string(),
2309                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2310                    tool_name: "test".to_string(),
2311                    payload: "done".to_string(),
2312                })),
2313            },
2314        );
2315
2316        assert!(effects.is_empty());
2317    }
2318
2319    #[test]
2320    fn test_pre_approved_tool_executes_immediately() {
2321        let mut state = test_state();
2322        let session_id = state.session_id;
2323        let op_id = OpId::new();
2324
2325        state.approved_tools.insert("test_tool".to_string());
2326        state.current_operation = Some(OperationState {
2327            op_id,
2328            kind: OperationKind::AgentLoop,
2329            pending_tool_calls: HashSet::new(),
2330        });
2331        state
2332            .operation_models
2333            .insert(op_id, builtin::claude_sonnet_4_5());
2334        state
2335            .operation_models
2336            .insert(op_id, builtin::claude_sonnet_4_5());
2337        state
2338            .operation_models
2339            .insert(op_id, builtin::claude_sonnet_4_5());
2340
2341        let tool_call = steer_tools::ToolCall {
2342            id: "tc_1".to_string(),
2343            name: "test_tool".to_string(),
2344            parameters: serde_json::json!({}),
2345        };
2346
2347        let effects = reduce(
2348            &mut state,
2349            Action::ToolApprovalRequested {
2350                session_id,
2351                request_id: RequestId::new(),
2352                tool_call,
2353            },
2354        );
2355
2356        assert!(
2357            effects
2358                .iter()
2359                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2360        );
2361        assert!(state.pending_approval.is_none());
2362    }
2363
2364    #[test]
2365    fn test_denied_tool_request_emits_failure_message() {
2366        let mut state = test_state();
2367        let session_id = state.session_id;
2368        let op_id = OpId::new();
2369
2370        state.current_operation = Some(OperationState {
2371            op_id,
2372            kind: OperationKind::AgentLoop,
2373            pending_tool_calls: HashSet::new(),
2374        });
2375
2376        state
2377            .operation_models
2378            .insert(op_id, builtin::claude_sonnet_4_5());
2379
2380        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2381        config.tool_config.approval_policy = ToolApprovalPolicy {
2382            default_behavior: UnapprovedBehavior::Deny,
2383            preapproved: ApprovalRules::default(),
2384        };
2385        state.session_config = Some(config);
2386
2387        let tool_call = steer_tools::ToolCall {
2388            id: "tc_1".to_string(),
2389            name: "test_tool".to_string(),
2390            parameters: serde_json::json!({}),
2391        };
2392
2393        let effects = reduce(
2394            &mut state,
2395            Action::ToolApprovalRequested {
2396                session_id,
2397                request_id: RequestId::new(),
2398                tool_call,
2399            },
2400        );
2401
2402        assert!(effects.iter().any(|e| matches!(
2403            e,
2404            Effect::EmitEvent {
2405                event: SessionEvent::ToolCallFailed { .. },
2406                ..
2407            }
2408        )));
2409        assert!(effects.iter().any(|e| matches!(
2410            e,
2411            Effect::EmitEvent {
2412                event: SessionEvent::ToolMessageAdded { .. },
2413                ..
2414            }
2415        )));
2416        assert!(
2417            !effects
2418                .iter()
2419                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2420        );
2421        assert!(
2422            !effects
2423                .iter()
2424                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2425        );
2426        assert!(state.pending_approval.is_none());
2427        assert!(state.approval_queue.is_empty());
2428        assert_eq!(state.message_graph.messages.len(), 1);
2429
2430        match &state.message_graph.messages[0].data {
2431            MessageData::Tool { result, .. } => match result {
2432                ToolResult::Error(error) => {
2433                    assert!(
2434                        matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2435                    );
2436                }
2437                _ => panic!("expected denied tool error"),
2438            },
2439            _ => panic!("expected tool message"),
2440        }
2441    }
2442
2443    #[test]
2444    fn test_user_denied_tool_request_emits_failure_message() {
2445        let mut state = test_state();
2446        let session_id = state.session_id;
2447        let op_id = OpId::new();
2448
2449        state.current_operation = Some(OperationState {
2450            op_id,
2451            kind: OperationKind::AgentLoop,
2452            pending_tool_calls: HashSet::new(),
2453        });
2454        state
2455            .operation_models
2456            .insert(op_id, builtin::claude_sonnet_4_5());
2457
2458        let tool_call = steer_tools::ToolCall {
2459            id: "tc_1".to_string(),
2460            name: "test_tool".to_string(),
2461            parameters: serde_json::json!({}),
2462        };
2463        let request_id = RequestId::new();
2464        state.pending_approval = Some(PendingApproval {
2465            request_id,
2466            tool_call: tool_call.clone(),
2467        });
2468
2469        let effects = reduce(
2470            &mut state,
2471            Action::ToolApprovalDecided {
2472                session_id,
2473                request_id,
2474                decision: ApprovalDecision::Denied,
2475                remember: None,
2476            },
2477        );
2478
2479        assert!(effects.iter().any(|e| matches!(
2480            e,
2481            Effect::EmitEvent {
2482                event: SessionEvent::ToolCallFailed { .. },
2483                ..
2484            }
2485        )));
2486        assert!(effects.iter().any(|e| matches!(
2487            e,
2488            Effect::EmitEvent {
2489                event: SessionEvent::ToolMessageAdded { .. },
2490                ..
2491            }
2492        )));
2493        assert!(
2494            !effects
2495                .iter()
2496                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2497        );
2498        assert!(state.pending_approval.is_none());
2499        assert!(state.approval_queue.is_empty());
2500        assert_eq!(state.message_graph.messages.len(), 1);
2501
2502        match &state.message_graph.messages[0].data {
2503            MessageData::Tool { result, .. } => match result {
2504                ToolResult::Error(error) => {
2505                    assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2506                }
2507                _ => panic!("expected denied tool error"),
2508            },
2509            _ => panic!("expected tool message"),
2510        }
2511    }
2512
2513    #[test]
2514    fn test_cancel_pops_queued_item_without_auto_start() {
2515        let mut state = test_state();
2516        let session_id = state.session_id;
2517        let op_id = OpId::new();
2518
2519        state.current_operation = Some(OperationState {
2520            op_id,
2521            kind: OperationKind::AgentLoop,
2522            pending_tool_calls: HashSet::new(),
2523        });
2524        state
2525            .operation_models
2526            .insert(op_id, builtin::claude_sonnet_4_5());
2527
2528        let queued_op = OpId::new();
2529        let queued_message_id = MessageId::from_string("queued_msg");
2530        let _ = reduce(
2531            &mut state,
2532            Action::UserInput {
2533                session_id,
2534                text: NonEmptyString::new("Queued message").expect("non-empty"),
2535                op_id: queued_op,
2536                message_id: queued_message_id.clone(),
2537                model: builtin::claude_sonnet_4_5(),
2538                timestamp: 1,
2539            },
2540        );
2541
2542        let effects = reduce(
2543            &mut state,
2544            Action::Cancel {
2545                session_id,
2546                op_id: None,
2547            },
2548        );
2549
2550        assert!(state.current_operation.is_none());
2551        assert!(state.queued_work.is_empty());
2552
2553        let cancellation_info = effects.iter().find_map(|effect| match effect {
2554            Effect::EmitEvent {
2555                event: SessionEvent::OperationCancelled { info, .. },
2556                ..
2557            } => Some(info),
2558            _ => None,
2559        });
2560        let info = cancellation_info.expect("expected OperationCancelled event");
2561        let popped = info
2562            .popped_queued_item
2563            .as_ref()
2564            .expect("expected popped queued item");
2565        assert_eq!(popped.content, "Queued message");
2566        assert_eq!(popped.op_id, queued_op);
2567        assert_eq!(popped.message_id, queued_message_id);
2568
2569        assert!(
2570            !effects.iter().any(|effect| matches!(
2571                effect,
2572                Effect::EmitEvent {
2573                    event: SessionEvent::OperationStarted { .. },
2574                    ..
2575                }
2576            )),
2577            "queued work should not auto-start on cancel"
2578        );
2579    }
2580
2581    #[test]
2582    fn test_cancel_injects_tool_results_for_pending_calls() {
2583        let mut state = test_state();
2584        let session_id = state.session_id;
2585        let op_id = OpId::new();
2586
2587        let tool_call = ToolCall {
2588            id: "tc_1".to_string(),
2589            name: "test_tool".to_string(),
2590            parameters: serde_json::json!({}),
2591        };
2592
2593        state.message_graph.add_message(Message {
2594            data: MessageData::Assistant {
2595                content: vec![AssistantContent::ToolCall {
2596                    tool_call: tool_call.clone(),
2597                    thought_signature: None,
2598                }],
2599            },
2600            timestamp: 0,
2601            id: "msg_1".to_string(),
2602            parent_message_id: None,
2603        });
2604
2605        state.current_operation = Some(OperationState {
2606            op_id,
2607            kind: OperationKind::AgentLoop,
2608            pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2609        });
2610        state
2611            .operation_models
2612            .insert(op_id, builtin::claude_sonnet_4_5());
2613
2614        let effects = reduce(
2615            &mut state,
2616            Action::Cancel {
2617                session_id,
2618                op_id: None,
2619            },
2620        );
2621
2622        assert!(effects.iter().any(|e| matches!(
2623            e,
2624            Effect::EmitEvent {
2625                event: SessionEvent::ToolMessageAdded { .. },
2626                ..
2627            }
2628        )));
2629
2630        let tool_message = state
2631            .message_graph
2632            .messages
2633            .iter()
2634            .find(|message| matches!(message.data, MessageData::Tool { .. }))
2635            .expect("tool result should be injected on cancel");
2636
2637        match &tool_message.data {
2638            MessageData::Tool { result, .. } => match result {
2639                ToolResult::Error(error) => {
2640                    assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2641                }
2642                _ => panic!("expected cancelled tool error"),
2643            },
2644            _ => panic!("expected tool message"),
2645        }
2646    }
2647
2648    #[test]
2649    fn test_malformed_tool_call_auto_denies() {
2650        let mut state = test_state();
2651        let session_id = state.session_id;
2652        let op_id = OpId::new();
2653
2654        state.current_operation = Some(OperationState {
2655            op_id,
2656            kind: OperationKind::AgentLoop,
2657            pending_tool_calls: HashSet::new(),
2658        });
2659
2660        state
2661            .operation_models
2662            .insert(op_id, builtin::claude_sonnet_4_5());
2663
2664        let mut properties = serde_json::Map::new();
2665        properties.insert("command".to_string(), json!({ "type": "string" }));
2666
2667        state.tools.push(ToolSchema {
2668            name: "test_tool".to_string(),
2669            display_name: "test_tool".to_string(),
2670            description: String::new(),
2671            input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2672        });
2673
2674        let tool_call = ToolCall {
2675            id: "tc_1".to_string(),
2676            name: "test_tool".to_string(),
2677            parameters: json!({}),
2678        };
2679
2680        let effects = reduce(
2681            &mut state,
2682            Action::ToolApprovalRequested {
2683                session_id,
2684                request_id: RequestId::new(),
2685                tool_call,
2686            },
2687        );
2688
2689        assert!(effects.iter().any(|e| matches!(
2690            e,
2691            Effect::EmitEvent {
2692                event: SessionEvent::ToolCallFailed { .. },
2693                ..
2694            }
2695        )));
2696        assert!(effects.iter().any(|e| matches!(
2697            e,
2698            Effect::EmitEvent {
2699                event: SessionEvent::ToolMessageAdded { .. },
2700                ..
2701            }
2702        )));
2703        assert!(
2704            !effects
2705                .iter()
2706                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2707        );
2708        assert!(
2709            !effects
2710                .iter()
2711                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2712        );
2713        assert!(state.pending_approval.is_none());
2714        assert!(state.approval_queue.is_empty());
2715        assert_eq!(state.message_graph.messages.len(), 1);
2716
2717        match &state.message_graph.messages[0].data {
2718            MessageData::Tool { result, .. } => match result {
2719                ToolResult::Error(error) => {
2720                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2721                }
2722                _ => panic!("expected invalid params tool error"),
2723            },
2724            _ => panic!("expected tool message"),
2725        }
2726    }
2727
2728    #[test]
2729    fn test_approval_queuing() {
2730        let mut state = test_state();
2731        let session_id = state.session_id;
2732        let op_id = OpId::new();
2733
2734        state.current_operation = Some(OperationState {
2735            op_id,
2736            kind: OperationKind::AgentLoop,
2737            pending_tool_calls: HashSet::new(),
2738        });
2739
2740        let tool_call_1 = steer_tools::ToolCall {
2741            id: "tc_1".to_string(),
2742            name: "tool_1".to_string(),
2743            parameters: serde_json::json!({}),
2744        };
2745        let tool_call_2 = steer_tools::ToolCall {
2746            id: "tc_2".to_string(),
2747            name: "tool_2".to_string(),
2748            parameters: serde_json::json!({}),
2749        };
2750
2751        let _ = reduce(
2752            &mut state,
2753            Action::ToolApprovalRequested {
2754                session_id,
2755                request_id: RequestId::new(),
2756                tool_call: tool_call_1,
2757            },
2758        );
2759
2760        assert!(state.pending_approval.is_some());
2761
2762        let _ = reduce(
2763            &mut state,
2764            Action::ToolApprovalRequested {
2765                session_id,
2766                request_id: RequestId::new(),
2767                tool_call: tool_call_2,
2768            },
2769        );
2770
2771        assert_eq!(state.approval_queue.len(), 1);
2772    }
2773
2774    #[test]
2775    fn test_dispatch_agent_missing_target_auto_denies() {
2776        let mut state = test_state();
2777        let session_id = state.session_id;
2778        let op_id = OpId::new();
2779
2780        state.current_operation = Some(OperationState {
2781            op_id,
2782            kind: OperationKind::AgentLoop,
2783            pending_tool_calls: HashSet::new(),
2784        });
2785
2786        state
2787            .operation_models
2788            .insert(op_id, builtin::claude_sonnet_4_5());
2789
2790        let input_schema: InputSchema =
2791            schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2792        state.tools.push(ToolSchema {
2793            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2794            display_name: "Dispatch Agent".to_string(),
2795            description: String::new(),
2796            input_schema,
2797        });
2798
2799        let tool_call = ToolCall {
2800            id: "tc_dispatch".to_string(),
2801            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2802            parameters: json!({ "prompt": "hello world" }),
2803        };
2804
2805        let effects = reduce(
2806            &mut state,
2807            Action::ToolApprovalRequested {
2808                session_id,
2809                request_id: RequestId::new(),
2810                tool_call,
2811            },
2812        );
2813
2814        assert!(effects.iter().any(|e| matches!(
2815            e,
2816            Effect::EmitEvent {
2817                event: SessionEvent::ToolCallFailed { .. },
2818                ..
2819            }
2820        )));
2821        assert!(effects.iter().any(|e| matches!(
2822            e,
2823            Effect::EmitEvent {
2824                event: SessionEvent::ToolMessageAdded { .. },
2825                ..
2826            }
2827        )));
2828        assert!(
2829            !effects
2830                .iter()
2831                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2832        );
2833        assert!(state.pending_approval.is_none());
2834        assert!(state.approval_queue.is_empty());
2835
2836        match &state.message_graph.messages[0].data {
2837            MessageData::Tool { result, .. } => match result {
2838                ToolResult::Error(error) => {
2839                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2840                }
2841                _ => panic!("expected invalid params tool error"),
2842            },
2843            _ => panic!("expected tool message"),
2844        }
2845    }
2846
2847    #[test]
2848    fn test_model_response_with_tool_calls_requests_approval() {
2849        let mut state = test_state();
2850        let session_id = state.session_id;
2851        let op_id = OpId::new();
2852        let message_id = MessageId::new();
2853
2854        state.current_operation = Some(OperationState {
2855            op_id,
2856            kind: OperationKind::AgentLoop,
2857            pending_tool_calls: HashSet::new(),
2858        });
2859        state
2860            .operation_models
2861            .insert(op_id, builtin::claude_sonnet_4_5());
2862
2863        let tool_call = steer_tools::ToolCall {
2864            id: "tc_1".to_string(),
2865            name: "bash".to_string(),
2866            parameters: serde_json::json!({"command": "ls"}),
2867        };
2868
2869        let content = vec![
2870            AssistantContent::Text {
2871                text: "Let me list the files.".to_string(),
2872            },
2873            AssistantContent::ToolCall {
2874                tool_call: tool_call.clone(),
2875                thought_signature: None,
2876            },
2877        ];
2878
2879        let effects = reduce(
2880            &mut state,
2881            Action::ModelResponseComplete {
2882                session_id,
2883                op_id,
2884                message_id,
2885                content,
2886                timestamp: 12345,
2887            },
2888        );
2889
2890        assert!(state.pending_approval.is_some());
2891        assert!(
2892            effects
2893                .iter()
2894                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2895        );
2896        assert!(state.current_operation.is_some());
2897    }
2898
2899    #[test]
2900    fn test_model_response_no_tools_completes_operation() {
2901        let mut state = test_state();
2902        let session_id = state.session_id;
2903        let op_id = OpId::new();
2904        let message_id = MessageId::new();
2905
2906        state.current_operation = Some(OperationState {
2907            op_id,
2908            kind: OperationKind::AgentLoop,
2909            pending_tool_calls: HashSet::new(),
2910        });
2911        state
2912            .operation_models
2913            .insert(op_id, builtin::claude_sonnet_4_5());
2914
2915        let content = vec![AssistantContent::Text {
2916            text: "Hello! How can I help?".to_string(),
2917        }];
2918
2919        let effects = reduce(
2920            &mut state,
2921            Action::ModelResponseComplete {
2922                session_id,
2923                op_id,
2924                message_id,
2925                content,
2926                timestamp: 12345,
2927            },
2928        );
2929
2930        assert!(state.current_operation.is_none());
2931        assert!(effects.iter().any(|e| matches!(
2932            e,
2933            Effect::EmitEvent {
2934                event: SessionEvent::OperationCompleted { .. },
2935                ..
2936            }
2937        )));
2938    }
2939
2940    #[test]
2941    fn test_out_of_order_completion_preserves_newer_operation() {
2942        let mut state = test_state();
2943        let session_id = state.session_id;
2944        let model = builtin::claude_sonnet_4_5();
2945
2946        let op_a = OpId::new();
2947        let op_b = OpId::new();
2948
2949        let _ = reduce(
2950            &mut state,
2951            Action::UserInput {
2952                session_id,
2953                text: NonEmptyString::new("first").unwrap(),
2954                op_id: op_a,
2955                message_id: MessageId::new(),
2956                model: model.clone(),
2957                timestamp: 1,
2958            },
2959        );
2960
2961        let _ = reduce(
2962            &mut state,
2963            Action::UserInput {
2964                session_id,
2965                text: NonEmptyString::new("second").unwrap(),
2966                op_id: op_b,
2967                message_id: MessageId::new(),
2968                model: model.clone(),
2969                timestamp: 2,
2970            },
2971        );
2972
2973        let _ = reduce(
2974            &mut state,
2975            Action::ModelResponseComplete {
2976                session_id,
2977                op_id: op_a,
2978                message_id: MessageId::new(),
2979                content: vec![AssistantContent::Text {
2980                    text: "done A".to_string(),
2981                }],
2982                timestamp: 3,
2983            },
2984        );
2985
2986        assert!(
2987            state
2988                .current_operation
2989                .as_ref()
2990                .is_some_and(|op| op.op_id == op_b)
2991        );
2992        assert!(state.operation_models.contains_key(&op_b));
2993        assert!(!state.operation_models.contains_key(&op_a));
2994
2995        let effects = reduce(
2996            &mut state,
2997            Action::ModelResponseComplete {
2998                session_id,
2999                op_id: op_b,
3000                message_id: MessageId::new(),
3001                content: vec![AssistantContent::Text {
3002                    text: "done B".to_string(),
3003                }],
3004                timestamp: 4,
3005            },
3006        );
3007
3008        assert!(effects.iter().any(|e| matches!(
3009            e,
3010            Effect::EmitEvent {
3011                event: SessionEvent::OperationCompleted { op_id },
3012                ..
3013            } if *op_id == op_b
3014        )));
3015        assert!(!effects.iter().any(|e| matches!(
3016            e,
3017            Effect::EmitEvent {
3018                event: SessionEvent::Error { message },
3019                ..
3020            } if message.contains("Missing model for operation")
3021        )));
3022    }
3023
3024    #[test]
3025    fn test_tool_approval_does_not_call_model_before_result() {
3026        let mut state = test_state();
3027        let session_id = state.session_id;
3028        let op_id = OpId::new();
3029
3030        state.current_operation = Some(OperationState {
3031            op_id,
3032            kind: OperationKind::AgentLoop,
3033            pending_tool_calls: HashSet::new(),
3034        });
3035        state
3036            .operation_models
3037            .insert(op_id, builtin::claude_sonnet_4_5());
3038
3039        let tool_call = steer_tools::ToolCall {
3040            id: "tc_1".to_string(),
3041            name: "bash".to_string(),
3042            parameters: serde_json::json!({"command": "ls"}),
3043        };
3044        let request_id = RequestId::new();
3045        state.pending_approval = Some(PendingApproval {
3046            request_id,
3047            tool_call: tool_call.clone(),
3048        });
3049
3050        let effects = reduce(
3051            &mut state,
3052            Action::ToolApprovalDecided {
3053                session_id,
3054                request_id,
3055                decision: ApprovalDecision::Approved,
3056                remember: None,
3057            },
3058        );
3059
3060        assert!(
3061            effects
3062                .iter()
3063                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3064        );
3065        assert!(
3066            !effects
3067                .iter()
3068                .any(|e| matches!(e, Effect::CallModel { .. }))
3069        );
3070        assert!(state.current_operation.as_ref().is_some_and(|op| {
3071            op.pending_tool_calls
3072                .contains(&ToolCallId::from_string("tc_1"))
3073        }));
3074    }
3075
3076    #[test]
3077    fn test_mcp_tool_visibility_and_disconnect_removal() {
3078        let mut state = test_state();
3079        let session_id = state.session_id;
3080
3081        let mut allowed = HashSet::new();
3082        allowed.insert("mcp__alpha__allowed".to_string());
3083
3084        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3085        config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3086        state.session_config = Some(config);
3087
3088        state.tools.push(test_schema("bash"));
3089
3090        let _ = reduce(
3091            &mut state,
3092            Action::McpServerStateChanged {
3093                session_id,
3094                server_name: "alpha".to_string(),
3095                state: McpServerState::Connected {
3096                    tools: vec![
3097                        test_schema("mcp__alpha__allowed"),
3098                        test_schema("mcp__alpha__blocked"),
3099                    ],
3100                },
3101            },
3102        );
3103
3104        assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3105        assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3106
3107        let _ = reduce(
3108            &mut state,
3109            Action::McpServerStateChanged {
3110                session_id,
3111                server_name: "alpha".to_string(),
3112                state: McpServerState::Disconnected { error: None },
3113            },
3114        );
3115
3116        assert!(
3117            !state
3118                .tools
3119                .iter()
3120                .any(|t| t.name.starts_with("mcp__alpha__"))
3121        );
3122        assert!(state.tools.iter().any(|t| t.name == "bash"));
3123    }
3124
3125    #[test]
3126    fn test_tool_result_continues_agent_loop() {
3127        let mut state = test_state();
3128        let session_id = state.session_id;
3129        let op_id = OpId::new();
3130        let tool_call_id = ToolCallId::from_string("tc_1");
3131
3132        state.current_operation = Some(OperationState {
3133            op_id,
3134            kind: OperationKind::AgentLoop,
3135            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3136        });
3137        state
3138            .operation_models
3139            .insert(op_id, builtin::claude_sonnet_4_5());
3140
3141        let effects = reduce(
3142            &mut state,
3143            Action::ToolResult {
3144                session_id,
3145                tool_call_id,
3146                tool_name: "bash".to_string(),
3147                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3148                    tool_name: "bash".to_string(),
3149                    payload: "file1.txt\nfile2.txt".to_string(),
3150                })),
3151            },
3152        );
3153
3154        assert!(
3155            effects
3156                .iter()
3157                .any(|e| matches!(e, Effect::CallModel { .. }))
3158        );
3159    }
3160
3161    #[test]
3162    fn test_tool_result_waits_for_pending_tools() {
3163        let mut state = test_state();
3164        let session_id = state.session_id;
3165        let op_id = OpId::new();
3166        let tool_call_id_1 = ToolCallId::from_string("tc_1");
3167        let tool_call_id_2 = ToolCallId::from_string("tc_2");
3168
3169        state.current_operation = Some(OperationState {
3170            op_id,
3171            kind: OperationKind::AgentLoop,
3172            pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3173                .into_iter()
3174                .collect(),
3175        });
3176        state
3177            .operation_models
3178            .insert(op_id, builtin::claude_sonnet_4_5());
3179
3180        let effects = reduce(
3181            &mut state,
3182            Action::ToolResult {
3183                session_id,
3184                tool_call_id: tool_call_id_1,
3185                tool_name: "bash".to_string(),
3186                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3187                    tool_name: "bash".to_string(),
3188                    payload: "done".to_string(),
3189                })),
3190            },
3191        );
3192
3193        assert!(
3194            !effects
3195                .iter()
3196                .any(|e| matches!(e, Effect::CallModel { .. }))
3197        );
3198
3199        let effects = reduce(
3200            &mut state,
3201            Action::ToolResult {
3202                session_id,
3203                tool_call_id: tool_call_id_2,
3204                tool_name: "bash".to_string(),
3205                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3206                    tool_name: "bash".to_string(),
3207                    payload: "done".to_string(),
3208                })),
3209            },
3210        );
3211
3212        assert!(
3213            effects
3214                .iter()
3215                .any(|e| matches!(e, Effect::CallModel { .. }))
3216        );
3217    }
3218}