Skip to main content

steer_core/app/domain/
reduce.rs

1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3
4use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
5
6use crate::app::domain::effect::{Effect, McpServerConfig};
7use crate::app::domain::event::{
8    CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
9};
10use crate::app::domain::state::{
11    AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
12};
13use crate::primary_agents::{
14    default_primary_agent_id, primary_agent_spec, resolve_effective_config,
15};
16use crate::session::state::{BackendConfig, ToolDecision};
17
18use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
19use serde_json::Value;
20use steer_tools::ToolError;
21use steer_tools::result::ToolResult;
22use steer_tools::tools::BASH_TOOL_NAME;
23use thiserror::Error;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum InvalidActionKind {
27    OperationInFlight,
28    MissingSessionConfig,
29    UnknownPrimaryAgent,
30    QueueEmpty,
31}
32
33#[derive(Debug, Error)]
34pub enum ReduceError {
35    #[error("{message}")]
36    InvalidAction {
37        message: String,
38        kind: InvalidActionKind,
39    },
40    #[error("Invariant violated: {message}")]
41    Invariant { message: String },
42}
43
44fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
45    ReduceError::InvalidAction {
46        message: message.into(),
47        kind,
48    }
49}
50
51pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
52    match action {
53        Action::UserInput {
54            session_id,
55            content,
56            op_id,
57            message_id,
58            model,
59            timestamp,
60        } => Ok(handle_user_input(
61            state, session_id, content, op_id, message_id, model, timestamp,
62        )),
63
64        Action::UserEditedMessage {
65            session_id,
66            message_id,
67            new_content,
68            op_id,
69            new_message_id,
70            model,
71            timestamp,
72        } => handle_user_edited_message(
73            state,
74            session_id,
75            UserEditedMessageParams {
76                original_message_id: message_id,
77                new_content,
78                op_id,
79                new_message_id,
80                model,
81                timestamp,
82            },
83        ),
84
85        Action::ToolApprovalRequested {
86            session_id,
87            request_id,
88            tool_call,
89        } => Ok(handle_tool_approval_requested(
90            state, session_id, request_id, tool_call,
91        )),
92
93        Action::ToolApprovalDecided {
94            session_id,
95            request_id,
96            decision,
97            remember,
98        } => Ok(handle_tool_approval_decided(
99            state, session_id, request_id, decision, remember,
100        )),
101
102        Action::ToolExecutionStarted {
103            session_id,
104            tool_call_id,
105            tool_name,
106            tool_parameters,
107        } => Ok(handle_tool_execution_started(
108            state,
109            session_id,
110            tool_call_id,
111            tool_name,
112            tool_parameters,
113        )),
114
115        Action::ToolResult {
116            session_id,
117            tool_call_id,
118            tool_name,
119            result,
120        } => Ok(handle_tool_result(
121            state,
122            session_id,
123            tool_call_id,
124            tool_name,
125            result,
126        )),
127
128        Action::ModelResponseComplete {
129            session_id,
130            op_id,
131            message_id,
132            content,
133            timestamp,
134        } => Ok(handle_model_response_complete(
135            state, session_id, op_id, message_id, content, timestamp,
136        )),
137
138        Action::ModelResponseError {
139            session_id,
140            op_id,
141            error,
142        } => Ok(handle_model_response_error(
143            state, session_id, op_id, &error,
144        )),
145
146        Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
147
148        Action::DirectBashCommand {
149            session_id,
150            op_id,
151            message_id,
152            command,
153            timestamp,
154        } => Ok(handle_direct_bash(
155            state, session_id, op_id, message_id, command, timestamp,
156        )),
157
158        Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
159
160        Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
161
162        Action::RequestCompaction {
163            session_id,
164            op_id,
165            model,
166        } => handle_request_compaction(state, session_id, op_id, model),
167
168        Action::Hydrate {
169            session_id,
170            events,
171            starting_sequence,
172        } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
173
174        Action::WorkspaceFilesListed { files, .. } => {
175            state.workspace_files = files;
176            Ok(vec![])
177        }
178
179        Action::ToolSchemasAvailable { tools, .. } => {
180            state.tools = tools;
181            Ok(vec![])
182        }
183
184        Action::ToolSchemasUpdated { schemas, .. } => {
185            state.tools = schemas;
186            Ok(vec![])
187        }
188
189        Action::SwitchPrimaryAgent {
190            session_id,
191            agent_id,
192        } => handle_switch_primary_agent(state, session_id, agent_id),
193
194        Action::McpServerStateChanged {
195            session_id,
196            server_name,
197            state: new_state,
198        } => {
199            // When connected, merge MCP tools into state.tools
200            if let McpServerState::Connected { tools } = &new_state {
201                let tools = state.session_config.as_ref().map_or_else(
202                    || tools.clone(),
203                    |config| config.filter_tools_by_visibility(tools.clone()),
204                );
205
206                // Add MCP tools that aren't already present (by name)
207                for tool in tools {
208                    if !state.tools.iter().any(|t| t.name == tool.name) {
209                        state.tools.push(tool.clone());
210                    }
211                }
212            }
213
214            // When disconnected or failed, remove tools from that server
215            if matches!(
216                &new_state,
217                McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
218            ) {
219                let prefix = format!("mcp__{server_name}__");
220                state.tools.retain(|t| !t.name.starts_with(&prefix));
221            }
222
223            state
224                .mcp_servers
225                .insert(server_name.clone(), new_state.clone());
226            Ok(vec![Effect::EmitEvent {
227                session_id,
228                event: SessionEvent::McpServerStateChanged {
229                    server_name,
230                    state: new_state,
231                },
232            }])
233        }
234
235        Action::CompactionComplete {
236            session_id,
237            op_id,
238            compaction_id,
239            summary_message_id,
240            summary,
241            compacted_head_message_id,
242            previous_active_message_id,
243            model,
244            timestamp,
245        } => Ok(handle_compaction_complete(
246            state,
247            session_id,
248            CompactionCompleteParams {
249                op_id,
250                compaction_id,
251                summary_message_id,
252                summary,
253                compacted_head_message_id,
254                previous_active_message_id,
255                model_name: model,
256                timestamp,
257            },
258        )),
259
260        Action::CompactionFailed {
261            session_id,
262            op_id,
263            error,
264        } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
265
266        Action::Shutdown => Ok(vec![]),
267    }
268}
269
270fn handle_user_input(
271    state: &mut AppState,
272    session_id: crate::app::domain::types::SessionId,
273    content: Vec<UserContent>,
274    op_id: crate::app::domain::types::OpId,
275    message_id: crate::app::domain::types::MessageId,
276    model: crate::config::model::ModelId,
277    timestamp: u64,
278) -> Vec<Effect> {
279    let mut effects = Vec::new();
280
281    if state.has_active_operation() {
282        state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
283            content,
284            op_id,
285            message_id,
286            model,
287            queued_at: timestamp,
288        });
289        effects.push(Effect::EmitEvent {
290            session_id,
291            event: SessionEvent::QueueUpdated {
292                queue: snapshot_queue(state),
293            },
294        });
295        return effects;
296    }
297
298    let parent_id = state.message_graph.active_message_id.clone();
299
300    let message = Message {
301        data: MessageData::User { content },
302        timestamp,
303        id: message_id.0.clone(),
304        parent_message_id: parent_id,
305    };
306
307    state.message_graph.add_message(message.clone());
308    state.message_graph.active_message_id = Some(message_id.0.clone());
309
310    state.start_operation(op_id, OperationKind::AgentLoop);
311    state.operation_models.insert(op_id, model.clone());
312
313    effects.push(Effect::EmitEvent {
314        session_id,
315        event: SessionEvent::UserMessageAdded {
316            message: message.clone(),
317        },
318    });
319
320    effects.push(Effect::EmitEvent {
321        session_id,
322        event: SessionEvent::OperationStarted {
323            op_id,
324            kind: OperationKind::AgentLoop,
325        },
326    });
327
328    effects.push(Effect::CallModel {
329        session_id,
330        op_id,
331        model,
332        messages: state
333            .message_graph
334            .get_thread_messages()
335            .into_iter()
336            .cloned()
337            .collect(),
338        system_context: state.cached_system_context.clone(),
339        tools: state.tools.clone(),
340    });
341
342    effects
343}
344
345struct UserEditedMessageParams {
346    original_message_id: crate::app::domain::types::MessageId,
347    new_content: Vec<UserContent>,
348    op_id: crate::app::domain::types::OpId,
349    new_message_id: crate::app::domain::types::MessageId,
350    model: crate::config::model::ModelId,
351    timestamp: u64,
352}
353
354fn handle_user_edited_message(
355    state: &mut AppState,
356    session_id: crate::app::domain::types::SessionId,
357    params: UserEditedMessageParams,
358) -> Result<Vec<Effect>, ReduceError> {
359    let UserEditedMessageParams {
360        original_message_id,
361        new_content,
362        op_id,
363        new_message_id,
364        model,
365        timestamp,
366    } = params;
367    let mut effects = Vec::new();
368
369    if state.has_active_operation() {
370        return Err(invalid_action(
371            InvalidActionKind::OperationInFlight,
372            "Cannot edit message while an operation is active.",
373        ));
374    }
375
376    let parent_id = state
377        .message_graph
378        .messages
379        .iter()
380        .find(|m| m.id() == original_message_id.0)
381        .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
382
383    let message = Message {
384        data: MessageData::User {
385            content: new_content,
386        },
387        timestamp,
388        id: new_message_id.0.clone(),
389        parent_message_id: parent_id,
390    };
391
392    state.message_graph.add_message(message.clone());
393    state.message_graph.active_message_id = Some(new_message_id.0.clone());
394
395    state.start_operation(op_id, OperationKind::AgentLoop);
396    state.operation_models.insert(op_id, model.clone());
397
398    effects.push(Effect::EmitEvent {
399        session_id,
400        event: SessionEvent::UserMessageAdded {
401            message: message.clone(),
402        },
403    });
404
405    effects.push(Effect::EmitEvent {
406        session_id,
407        event: SessionEvent::OperationStarted {
408            op_id,
409            kind: OperationKind::AgentLoop,
410        },
411    });
412
413    effects.push(Effect::CallModel {
414        session_id,
415        op_id,
416        model,
417        messages: state
418            .message_graph
419            .get_thread_messages()
420            .into_iter()
421            .cloned()
422            .collect(),
423        system_context: state.cached_system_context.clone(),
424        tools: state.tools.clone(),
425    });
426
427    Ok(effects)
428}
429
430fn handle_tool_approval_requested(
431    state: &mut AppState,
432    session_id: crate::app::domain::types::SessionId,
433    request_id: crate::app::domain::types::RequestId,
434    tool_call: steer_tools::ToolCall,
435) -> Vec<Effect> {
436    let mut effects = Vec::new();
437
438    if let Err(error) = validate_tool_call(state, &tool_call) {
439        let error_message = error.to_string();
440        return fail_tool_call_without_execution(
441            state,
442            session_id,
443            tool_call,
444            error,
445            error_message,
446            "invalid",
447            true,
448        );
449    }
450
451    let decision = get_tool_decision(state, &tool_call);
452
453    match decision {
454        ToolDecision::Allow => {
455            let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
456                return vec![Effect::EmitEvent {
457                    session_id,
458                    event: SessionEvent::Error {
459                        message: "Tool approval requested without active operation".to_string(),
460                    },
461                }];
462            };
463            state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
464                &tool_call.id,
465            ));
466
467            effects.push(Effect::ExecuteTool {
468                session_id,
469                op_id,
470                tool_call,
471            });
472        }
473        ToolDecision::Deny => {
474            let error = ToolError::DeniedByPolicy(tool_call.name.clone());
475            let tool_name = tool_call.name.clone();
476            effects.extend(fail_tool_call_without_execution(
477                state,
478                session_id,
479                tool_call,
480                error,
481                format!("Tool '{tool_name}' denied by policy"),
482                "denied",
483                true,
484            ));
485        }
486        ToolDecision::Ask => {
487            if state.pending_approval.is_some() {
488                state.approval_queue.push_back(QueuedApproval { tool_call });
489                return effects;
490            }
491
492            state.pending_approval = Some(PendingApproval {
493                request_id,
494                tool_call: tool_call.clone(),
495            });
496
497            effects.push(Effect::EmitEvent {
498                session_id,
499                event: SessionEvent::ApprovalRequested {
500                    request_id,
501                    tool_call: tool_call.clone(),
502                },
503            });
504
505            effects.push(Effect::RequestUserApproval {
506                session_id,
507                request_id,
508                tool_call,
509            });
510        }
511    }
512
513    effects
514}
515
516fn validate_tool_call(
517    state: &AppState,
518    tool_call: &steer_tools::ToolCall,
519) -> Result<(), ToolError> {
520    if tool_call.name.trim().is_empty() {
521        return Err(ToolError::invalid_params(
522            "unknown",
523            "Malformed tool call: missing tool name",
524        ));
525    }
526
527    if tool_call.id.trim().is_empty() {
528        return Err(ToolError::invalid_params(
529            tool_call.name.clone(),
530            "Malformed tool call: missing tool call id",
531        ));
532    }
533
534    if state.tools.is_empty() {
535        return Ok(());
536    }
537
538    let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
539        return Ok(());
540    };
541
542    validate_against_json_schema(
543        &tool_call.name,
544        schema.input_schema.as_value(),
545        &tool_call.parameters,
546    )
547}
548
549fn validate_against_json_schema(
550    tool_name: &str,
551    schema: &Value,
552    params: &Value,
553) -> Result<(), ToolError> {
554    let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
555        ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
556    })?;
557
558    if let Err(errors) = validator.validate(params) {
559        let message = errors
560            .into_iter()
561            .map(|error| error.to_string())
562            .next()
563            .unwrap_or_else(|| "Parameters do not match schema".to_string());
564        return Err(ToolError::invalid_params(tool_name.to_string(), message));
565    }
566
567    Ok(())
568}
569
570fn emit_tool_failure_message(
571    state: &mut AppState,
572    session_id: crate::app::domain::types::SessionId,
573    tool_call_id: &str,
574    tool_name: &str,
575    tool_error: ToolError,
576    event_error: String,
577    message_id_prefix: &str,
578) -> Vec<Effect> {
579    let mut effects = Vec::new();
580
581    let tool_result = ToolResult::Error(tool_error);
582    let parent_id = state.message_graph.active_message_id.clone();
583    let tool_message = Message {
584        data: MessageData::Tool {
585            tool_use_id: tool_call_id.to_string(),
586            result: tool_result,
587        },
588        timestamp: 0,
589        id: format!("{message_id_prefix}_{tool_call_id}"),
590        parent_message_id: parent_id,
591    };
592    state.message_graph.add_message(tool_message.clone());
593
594    if let Some(model) = state
595        .current_operation
596        .as_ref()
597        .and_then(|op| state.operation_models.get(&op.op_id).cloned())
598    {
599        effects.push(Effect::EmitEvent {
600            session_id,
601            event: SessionEvent::ToolCallFailed {
602                id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
603                name: tool_name.to_string(),
604                error: event_error,
605                model,
606            },
607        });
608    }
609
610    effects.push(Effect::EmitEvent {
611        session_id,
612        event: SessionEvent::ToolMessageAdded {
613            message: tool_message,
614        },
615    });
616
617    effects
618}
619
620fn fail_tool_call_without_execution(
621    state: &mut AppState,
622    session_id: crate::app::domain::types::SessionId,
623    tool_call: steer_tools::ToolCall,
624    tool_error: ToolError,
625    event_error: String,
626    message_id_prefix: &str,
627    call_model_if_ready: bool,
628) -> Vec<Effect> {
629    let mut effects = emit_tool_failure_message(
630        state,
631        session_id,
632        &tool_call.id,
633        &tool_call.name,
634        tool_error,
635        event_error,
636        message_id_prefix,
637    );
638
639    if !call_model_if_ready {
640        return effects;
641    }
642
643    let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
644        effects.push(Effect::EmitEvent {
645            session_id,
646            event: SessionEvent::Error {
647                message: "Tool failure recorded without active operation".to_string(),
648            },
649        });
650        return effects;
651    };
652    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
653        model
654    } else {
655        effects.push(Effect::EmitEvent {
656            session_id,
657            event: SessionEvent::Error {
658                message: format!("Missing model for operation {op_id}"),
659            },
660        });
661        return effects;
662    };
663
664    let all_tools_complete = state
665        .current_operation
666        .as_ref()
667        .is_none_or(|op| op.pending_tool_calls.is_empty());
668    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
669
670    if all_tools_complete && no_pending_approvals {
671        effects.push(Effect::CallModel {
672            session_id,
673            op_id,
674            model,
675            messages: state
676                .message_graph
677                .get_thread_messages()
678                .into_iter()
679                .cloned()
680                .collect(),
681            system_context: state.cached_system_context.clone(),
682            tools: state.tools.clone(),
683        });
684    }
685
686    effects
687}
688
689fn handle_tool_approval_decided(
690    state: &mut AppState,
691    session_id: crate::app::domain::types::SessionId,
692    request_id: crate::app::domain::types::RequestId,
693    decision: ApprovalDecision,
694    remember: Option<ApprovalMemory>,
695) -> Vec<Effect> {
696    let mut effects = Vec::new();
697
698    let pending = match state.pending_approval.take() {
699        Some(p) if p.request_id == request_id => p,
700        other => {
701            state.pending_approval = other;
702            return effects;
703        }
704    };
705
706    let resolved_memory = if decision == ApprovalDecision::Approved {
707        match remember {
708            Some(ApprovalMemory::PendingTool) => {
709                Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
710            }
711            Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
712            Some(ApprovalMemory::BashPattern(pattern)) => {
713                Some(ApprovalMemory::BashPattern(pattern))
714            }
715            None => None,
716        }
717    } else {
718        None
719    };
720
721    effects.push(Effect::EmitEvent {
722        session_id,
723        event: SessionEvent::ApprovalDecided {
724            request_id,
725            decision,
726            remember: resolved_memory.clone(),
727        },
728    });
729
730    if decision == ApprovalDecision::Approved {
731        if let Some(ref memory) = resolved_memory {
732            match memory {
733                ApprovalMemory::Tool(name) => {
734                    state.approved_tools.insert(name.clone());
735                }
736                ApprovalMemory::BashPattern(pattern) => {
737                    state.approved_bash_patterns.insert(pattern.clone());
738                }
739                ApprovalMemory::PendingTool => {}
740            }
741        }
742
743        let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
744            effects.push(Effect::EmitEvent {
745                session_id,
746                event: SessionEvent::Error {
747                    message: "Tool approval decided without active operation".to_string(),
748                },
749            });
750            return effects;
751        };
752        state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
753            &pending.tool_call.id,
754        ));
755
756        effects.push(Effect::ExecuteTool {
757            session_id,
758            op_id,
759            tool_call: pending.tool_call,
760        });
761    } else {
762        let tool_name = pending.tool_call.name.clone();
763        let error = ToolError::DeniedByUser(tool_name.clone());
764        effects.extend(fail_tool_call_without_execution(
765            state,
766            session_id,
767            pending.tool_call,
768            error,
769            format!("Tool '{tool_name}' denied by user"),
770            "denied",
771            false,
772        ));
773    }
774
775    effects.extend(process_next_queued_approval(state, session_id));
776
777    effects
778}
779
780fn process_next_queued_approval(
781    state: &mut AppState,
782    session_id: crate::app::domain::types::SessionId,
783) -> Vec<Effect> {
784    let mut effects = Vec::new();
785
786    while let Some(queued) = state.approval_queue.pop_front() {
787        let decision = get_tool_decision(state, &queued.tool_call);
788
789        match decision {
790            ToolDecision::Allow => {
791                let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
792                    effects.push(Effect::EmitEvent {
793                        session_id,
794                        event: SessionEvent::Error {
795                            message: "Queued tool approval processed without active operation"
796                                .to_string(),
797                        },
798                    });
799                    state.approval_queue.push_front(queued);
800                    break;
801                };
802                state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
803                    &queued.tool_call.id,
804                ));
805
806                effects.push(Effect::ExecuteTool {
807                    session_id,
808                    op_id,
809                    tool_call: queued.tool_call,
810                });
811            }
812            ToolDecision::Deny => {
813                let tool_name = queued.tool_call.name.clone();
814                let error = ToolError::DeniedByPolicy(tool_name.clone());
815                effects.extend(fail_tool_call_without_execution(
816                    state,
817                    session_id,
818                    queued.tool_call,
819                    error,
820                    format!("Tool '{tool_name}' denied by policy"),
821                    "denied",
822                    false,
823                ));
824            }
825            ToolDecision::Ask => {
826                let request_id = crate::app::domain::types::RequestId::new();
827                state.pending_approval = Some(PendingApproval {
828                    request_id,
829                    tool_call: queued.tool_call.clone(),
830                });
831
832                effects.push(Effect::EmitEvent {
833                    session_id,
834                    event: SessionEvent::ApprovalRequested {
835                        request_id,
836                        tool_call: queued.tool_call.clone(),
837                    },
838                });
839
840                effects.push(Effect::RequestUserApproval {
841                    session_id,
842                    request_id,
843                    tool_call: queued.tool_call,
844                });
845
846                break;
847            }
848        }
849    }
850
851    let all_tools_complete = state
852        .current_operation
853        .as_ref()
854        .is_none_or(|op| op.pending_tool_calls.is_empty());
855    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
856
857    if all_tools_complete
858        && no_pending_approvals
859        && let Some(op) = &state.current_operation
860    {
861        let op_id = op.op_id;
862        if let Some(model) = state.operation_models.get(&op_id).cloned() {
863            effects.push(Effect::CallModel {
864                session_id,
865                op_id,
866                model,
867                messages: state
868                    .message_graph
869                    .get_thread_messages()
870                    .into_iter()
871                    .cloned()
872                    .collect(),
873                system_context: state.cached_system_context.clone(),
874                tools: state.tools.clone(),
875            });
876        }
877    }
878
879    effects
880}
881
882fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
883    if state.approved_tools.contains(&tool_call.name) {
884        return ToolDecision::Allow;
885    }
886
887    if tool_call.name == DISPATCH_AGENT_TOOL_NAME
888        && let Ok(params) =
889            serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
890        && let Some(config) = state.session_config.as_ref()
891    {
892        let policy = &config.tool_config.approval_policy;
893        match params.target {
894            DispatchAgentTarget::Resume { .. } => {
895                return ToolDecision::Allow;
896            }
897            DispatchAgentTarget::New { agent, .. } => {
898                let agent_id = agent
899                    .as_deref()
900                    .filter(|value| !value.trim().is_empty())
901                    .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
902                if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
903                    return ToolDecision::Allow;
904                }
905            }
906        }
907    }
908
909    if tool_call.name == BASH_TOOL_NAME
910        && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
911            tool_call.parameters.clone(),
912        )
913        && state.is_bash_pattern_approved(&params.command)
914    {
915        return ToolDecision::Allow;
916    }
917
918    state
919        .session_config
920        .as_ref()
921        .map_or(ToolDecision::Ask, |config| {
922            config
923                .tool_config
924                .approval_policy
925                .tool_decision(&tool_call.name)
926        })
927}
928
929fn handle_tool_execution_started(
930    state: &mut AppState,
931    session_id: crate::app::domain::types::SessionId,
932    tool_call_id: crate::app::domain::types::ToolCallId,
933    tool_name: String,
934    tool_parameters: serde_json::Value,
935) -> Vec<Effect> {
936    state.add_pending_tool_call(tool_call_id.clone());
937
938    let op_id = match state.current_operation.as_ref() {
939        Some(op) => op.op_id,
940        None => {
941            return vec![Effect::EmitEvent {
942                session_id,
943                event: SessionEvent::Error {
944                    message: "Tool call started without active operation".to_string(),
945                },
946            }];
947        }
948    };
949
950    let is_direct_bash = matches!(
951        state.current_operation.as_ref().map(|op| &op.kind),
952        Some(OperationKind::DirectBash { .. })
953    );
954
955    if is_direct_bash {
956        return vec![];
957    }
958
959    let model = match state.operation_models.get(&op_id).cloned() {
960        Some(model) => model,
961        None => {
962            return vec![Effect::EmitEvent {
963                session_id,
964                event: SessionEvent::Error {
965                    message: format!("Missing model for tool call on operation {op_id}"),
966                },
967            }];
968        }
969    };
970
971    vec![Effect::EmitEvent {
972        session_id,
973        event: SessionEvent::ToolCallStarted {
974            id: tool_call_id,
975            name: tool_name,
976            parameters: tool_parameters,
977            model,
978        },
979    }]
980}
981
982fn handle_tool_result(
983    state: &mut AppState,
984    session_id: crate::app::domain::types::SessionId,
985    tool_call_id: crate::app::domain::types::ToolCallId,
986    tool_name: String,
987    result: Result<ToolResult, ToolError>,
988) -> Vec<Effect> {
989    let mut effects = Vec::new();
990
991    let op = match &state.current_operation {
992        Some(op) => {
993            if state.cancelled_ops.contains(&op.op_id) {
994                tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
995                return effects;
996            }
997            op.clone()
998        }
999        None => return effects,
1000    };
1001    let op_id = op.op_id;
1002
1003    state.remove_pending_tool_call(&tool_call_id);
1004
1005    let tool_result = match result {
1006        Ok(r) => r,
1007        Err(e) => ToolResult::Error(e),
1008    };
1009
1010    let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1011
1012    if is_direct_bash {
1013        let command = match &op.kind {
1014            OperationKind::DirectBash { command } => command.clone(),
1015            _ => tool_name,
1016        };
1017
1018        let (stdout, stderr, exit_code) = match &tool_result {
1019            ToolResult::Bash(result) => (
1020                result.stdout.clone(),
1021                result.stderr.clone(),
1022                result.exit_code,
1023            ),
1024            ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1025            other => (format!("{other:?}"), String::new(), 0),
1026        };
1027
1028        let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1029            state
1030                .message_graph
1031                .update_command_execution(
1032                    id.as_str(),
1033                    command.clone(),
1034                    stdout.clone(),
1035                    stderr.clone(),
1036                    exit_code,
1037                )
1038                .or_else(|| {
1039                    let parent_id = state.message_graph.active_message_id.clone();
1040                    let timestamp = Message::current_timestamp();
1041                    Some(Message {
1042                        data: MessageData::User {
1043                            content: vec![UserContent::CommandExecution {
1044                                command: command.clone(),
1045                                stdout: stdout.clone(),
1046                                stderr: stderr.clone(),
1047                                exit_code,
1048                            }],
1049                        },
1050                        timestamp,
1051                        id: id.to_string(),
1052                        parent_message_id: parent_id,
1053                    })
1054                })
1055        });
1056
1057        state.complete_operation(op_id);
1058
1059        if let Some(message) = updated {
1060            effects.push(Effect::EmitEvent {
1061                session_id,
1062                event: SessionEvent::MessageUpdated { message },
1063            });
1064        }
1065
1066        effects.push(Effect::EmitEvent {
1067            session_id,
1068            event: SessionEvent::OperationCompleted { op_id },
1069        });
1070
1071        effects.extend(maybe_start_queued_work(state, session_id));
1072
1073        return effects;
1074    }
1075
1076    let model = match state.operation_models.get(&op_id).cloned() {
1077        Some(model) => model,
1078        None => {
1079            return vec![Effect::EmitEvent {
1080                session_id,
1081                event: SessionEvent::Error {
1082                    message: format!("Missing model for tool result on operation {op_id}"),
1083                },
1084            }];
1085        }
1086    };
1087
1088    let event = match &tool_result {
1089        ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1090            id: tool_call_id.clone(),
1091            name: tool_name.clone(),
1092            error: e.to_string(),
1093            model: model.clone(),
1094        },
1095        _ => SessionEvent::ToolCallCompleted {
1096            id: tool_call_id.clone(),
1097            name: tool_name,
1098            result: tool_result.clone(),
1099            model: model.clone(),
1100        },
1101    };
1102
1103    effects.push(Effect::EmitEvent { session_id, event });
1104
1105    let parent_id = state.message_graph.active_message_id.clone();
1106    let tool_message = Message {
1107        data: MessageData::Tool {
1108            tool_use_id: tool_call_id.0.clone(),
1109            result: tool_result,
1110        },
1111        timestamp: 0,
1112        id: format!("tool_result_{}", tool_call_id.0),
1113        parent_message_id: parent_id,
1114    };
1115    state.message_graph.add_message(tool_message.clone());
1116
1117    effects.push(Effect::EmitEvent {
1118        session_id,
1119        event: SessionEvent::ToolMessageAdded {
1120            message: tool_message,
1121        },
1122    });
1123
1124    let all_tools_complete = state
1125        .current_operation
1126        .as_ref()
1127        .is_none_or(|op| op.pending_tool_calls.is_empty());
1128    let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1129
1130    if all_tools_complete && no_pending_approvals {
1131        effects.push(Effect::CallModel {
1132            session_id,
1133            op_id,
1134            model,
1135            messages: state
1136                .message_graph
1137                .get_thread_messages()
1138                .into_iter()
1139                .cloned()
1140                .collect(),
1141            system_context: state.cached_system_context.clone(),
1142            tools: state.tools.clone(),
1143        });
1144    }
1145
1146    effects
1147}
1148
1149fn handle_model_response_complete(
1150    state: &mut AppState,
1151    session_id: crate::app::domain::types::SessionId,
1152    op_id: crate::app::domain::types::OpId,
1153    message_id: crate::app::domain::types::MessageId,
1154    content: Vec<AssistantContent>,
1155    timestamp: u64,
1156) -> Vec<Effect> {
1157    let mut effects = Vec::new();
1158
1159    if state.cancelled_ops.contains(&op_id) {
1160        tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1161        return effects;
1162    }
1163
1164    let tool_calls: Vec<_> = content
1165        .iter()
1166        .filter_map(|c| {
1167            if let AssistantContent::ToolCall { tool_call, .. } = c {
1168                Some(tool_call.clone())
1169            } else {
1170                None
1171            }
1172        })
1173        .collect();
1174
1175    let parent_id = state.message_graph.active_message_id.clone();
1176
1177    let message = Message {
1178        data: MessageData::Assistant {
1179            content: content.clone(),
1180        },
1181        timestamp,
1182        id: message_id.0.clone(),
1183        parent_message_id: parent_id,
1184    };
1185
1186    state.message_graph.add_message(message.clone());
1187    state.message_graph.active_message_id = Some(message_id.0.clone());
1188
1189    let model = match state.operation_models.get(&op_id).cloned() {
1190        Some(model) => model,
1191        None => {
1192            return vec![Effect::EmitEvent {
1193                session_id,
1194                event: SessionEvent::Error {
1195                    message: format!("Missing model for operation {op_id}"),
1196                },
1197            }];
1198        }
1199    };
1200
1201    effects.push(Effect::EmitEvent {
1202        session_id,
1203        event: SessionEvent::AssistantMessageAdded { message, model },
1204    });
1205
1206    if tool_calls.is_empty() {
1207        state.complete_operation(op_id);
1208        effects.push(Effect::EmitEvent {
1209            session_id,
1210            event: SessionEvent::OperationCompleted { op_id },
1211        });
1212        effects.extend(maybe_start_queued_work(state, session_id));
1213    } else {
1214        for tool_call in tool_calls {
1215            let request_id = crate::app::domain::types::RequestId::new();
1216            effects.extend(handle_tool_approval_requested(
1217                state, session_id, request_id, tool_call,
1218            ));
1219        }
1220    }
1221
1222    effects
1223}
1224
1225fn handle_model_response_error(
1226    state: &mut AppState,
1227    session_id: crate::app::domain::types::SessionId,
1228    op_id: crate::app::domain::types::OpId,
1229    error: &str,
1230) -> Vec<Effect> {
1231    let mut effects = Vec::new();
1232
1233    if state.cancelled_ops.contains(&op_id) {
1234        return effects;
1235    }
1236
1237    state.complete_operation(op_id);
1238
1239    effects.push(Effect::EmitEvent {
1240        session_id,
1241        event: SessionEvent::Error {
1242            message: error.to_string(),
1243        },
1244    });
1245
1246    effects.push(Effect::EmitEvent {
1247        session_id,
1248        event: SessionEvent::OperationCompleted { op_id },
1249    });
1250
1251    effects.extend(maybe_start_queued_work(state, session_id));
1252
1253    effects
1254}
1255
1256fn handle_direct_bash(
1257    state: &mut AppState,
1258    session_id: crate::app::domain::types::SessionId,
1259    op_id: crate::app::domain::types::OpId,
1260    message_id: crate::app::domain::types::MessageId,
1261    command: String,
1262    timestamp: u64,
1263) -> Vec<Effect> {
1264    let mut effects = Vec::new();
1265
1266    if state.has_active_operation() {
1267        state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1268            command,
1269            op_id,
1270            message_id,
1271            queued_at: timestamp,
1272        });
1273        effects.push(Effect::EmitEvent {
1274            session_id,
1275            event: SessionEvent::QueueUpdated {
1276                queue: snapshot_queue(state),
1277            },
1278        });
1279        return effects;
1280    }
1281
1282    let parent_id = state.message_graph.active_message_id.clone();
1283    let message = Message {
1284        data: MessageData::User {
1285            content: vec![UserContent::CommandExecution {
1286                command: command.clone(),
1287                stdout: String::new(),
1288                stderr: String::new(),
1289                exit_code: 0,
1290            }],
1291        },
1292        timestamp,
1293        id: message_id.0.clone(),
1294        parent_message_id: parent_id,
1295    };
1296
1297    state.message_graph.add_message(message.clone());
1298    state.message_graph.active_message_id = Some(message_id.0.clone());
1299
1300    state.start_operation(
1301        op_id,
1302        OperationKind::DirectBash {
1303            command: command.clone(),
1304        },
1305    );
1306    state.operation_messages.insert(op_id, message_id);
1307
1308    effects.push(Effect::EmitEvent {
1309        session_id,
1310        event: SessionEvent::UserMessageAdded { message },
1311    });
1312
1313    effects.push(Effect::EmitEvent {
1314        session_id,
1315        event: SessionEvent::OperationStarted {
1316            op_id,
1317            kind: OperationKind::DirectBash {
1318                command: command.clone(),
1319            },
1320        },
1321    });
1322
1323    let tool_call = steer_tools::ToolCall {
1324        id: format!("direct_bash_{op_id}"),
1325        name: BASH_TOOL_NAME.to_string(),
1326        parameters: serde_json::json!({ "command": command }),
1327    };
1328
1329    effects.push(Effect::ExecuteTool {
1330        session_id,
1331        op_id,
1332        tool_call,
1333    });
1334
1335    effects
1336}
1337
1338fn handle_dequeue_queued_item(
1339    state: &mut AppState,
1340    session_id: crate::app::domain::types::SessionId,
1341) -> Result<Vec<Effect>, ReduceError> {
1342    if state.pop_next_queued_work().is_some() {
1343        Ok(vec![Effect::EmitEvent {
1344            session_id,
1345            event: SessionEvent::QueueUpdated {
1346                queue: snapshot_queue(state),
1347            },
1348        }])
1349    } else {
1350        Err(invalid_action(
1351            InvalidActionKind::QueueEmpty,
1352            "No queued item to remove.",
1353        ))
1354    }
1355}
1356
1357fn maybe_start_queued_work(
1358    state: &mut AppState,
1359    session_id: crate::app::domain::types::SessionId,
1360) -> Vec<Effect> {
1361    if state.has_active_operation() {
1362        return vec![];
1363    }
1364
1365    let Some(next) = state.pop_next_queued_work() else {
1366        return vec![];
1367    };
1368
1369    let mut effects = vec![Effect::EmitEvent {
1370        session_id,
1371        event: SessionEvent::QueueUpdated {
1372            queue: snapshot_queue(state),
1373        },
1374    }];
1375
1376    match next {
1377        QueuedWorkItem::UserMessage(item) => {
1378            effects.extend(handle_user_input(
1379                state,
1380                session_id,
1381                item.content,
1382                item.op_id,
1383                item.message_id,
1384                item.model,
1385                item.queued_at,
1386            ));
1387        }
1388        QueuedWorkItem::DirectBash(item) => {
1389            effects.extend(handle_direct_bash(
1390                state,
1391                session_id,
1392                item.op_id,
1393                item.message_id,
1394                item.command,
1395                item.queued_at,
1396            ));
1397        }
1398    }
1399
1400    effects
1401}
1402
1403fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1404    state
1405        .queued_work
1406        .iter()
1407        .map(snapshot_queued_work_item)
1408        .collect()
1409}
1410
1411fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1412    match item {
1413        QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1414            kind: Some(QueuedWorkKind::UserMessage),
1415            content: message
1416                .content
1417                .iter()
1418                .filter_map(|item| match item {
1419                    UserContent::Text { text } => Some(text.as_str()),
1420                    _ => None,
1421                })
1422                .collect::<String>(),
1423            queued_at: message.queued_at,
1424            model: Some(message.model.clone()),
1425            op_id: message.op_id,
1426            message_id: message.message_id.clone(),
1427            attachment_count: message
1428                .content
1429                .iter()
1430                .filter(|item| matches!(item, UserContent::Image { .. }))
1431                .count() as u32,
1432        },
1433        QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1434            kind: Some(QueuedWorkKind::DirectBash),
1435            content: command.command.clone(),
1436            queued_at: command.queued_at,
1437            model: None,
1438            op_id: command.op_id,
1439            message_id: command.message_id.clone(),
1440            attachment_count: 0,
1441        },
1442    }
1443}
1444
1445fn handle_request_compaction(
1446    state: &mut AppState,
1447    session_id: crate::app::domain::types::SessionId,
1448    op_id: crate::app::domain::types::OpId,
1449    model: crate::config::model::ModelId,
1450) -> Result<Vec<Effect>, ReduceError> {
1451    const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1452    let message_count = state.message_graph.get_thread_messages().len();
1453
1454    if state.has_active_operation() {
1455        return Err(invalid_action(
1456            InvalidActionKind::OperationInFlight,
1457            "Cannot compact while an operation is active.",
1458        ));
1459    }
1460
1461    if message_count < MIN_MESSAGES_FOR_COMPACT {
1462        return Ok(vec![Effect::EmitEvent {
1463            session_id,
1464            event: SessionEvent::CompactResult {
1465                result: crate::app::domain::event::CompactResult::InsufficientMessages,
1466            },
1467        }]);
1468    }
1469
1470    state.start_operation(op_id, OperationKind::Compact);
1471    state.operation_models.insert(op_id, model.clone());
1472
1473    Ok(vec![
1474        Effect::EmitEvent {
1475            session_id,
1476            event: SessionEvent::OperationStarted {
1477                op_id,
1478                kind: OperationKind::Compact,
1479            },
1480        },
1481        Effect::RequestCompaction {
1482            session_id,
1483            op_id,
1484            model,
1485        },
1486    ])
1487}
1488
1489fn handle_cancel(
1490    state: &mut AppState,
1491    session_id: crate::app::domain::types::SessionId,
1492    target_op: Option<crate::app::domain::types::OpId>,
1493) -> Vec<Effect> {
1494    let mut effects = Vec::new();
1495
1496    let op = match &state.current_operation {
1497        Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1498        _ => return effects,
1499    };
1500
1501    state.record_cancelled_op(op.op_id);
1502
1503    if matches!(op.kind, OperationKind::Compact) {
1504        effects.push(Effect::EmitEvent {
1505            session_id,
1506            event: SessionEvent::CompactResult {
1507                result: crate::app::domain::event::CompactResult::Cancelled,
1508            },
1509        });
1510    }
1511
1512    let pending_approval = state.pending_approval.take();
1513    let queued_approvals = std::mem::take(&mut state.approval_queue);
1514
1515    if matches!(op.kind, OperationKind::AgentLoop) {
1516        if let Some(pending) = pending_approval {
1517            let tool_name = pending.tool_call.name.clone();
1518            effects.extend(emit_tool_failure_message(
1519                state,
1520                session_id,
1521                &pending.tool_call.id,
1522                &tool_name,
1523                ToolError::Cancelled(tool_name.clone()),
1524                format!("Tool '{tool_name}' cancelled"),
1525                "cancelled",
1526            ));
1527        }
1528
1529        for queued in queued_approvals {
1530            let tool_name = queued.tool_call.name.clone();
1531            effects.extend(emit_tool_failure_message(
1532                state,
1533                session_id,
1534                &queued.tool_call.id,
1535                &tool_name,
1536                ToolError::Cancelled(tool_name.clone()),
1537                format!("Tool '{tool_name}' cancelled"),
1538                "cancelled",
1539            ));
1540        }
1541
1542        for tool_call_id in &op.pending_tool_calls {
1543            let tool_name = state
1544                .message_graph
1545                .find_tool_name_by_id(tool_call_id.as_str())
1546                .unwrap_or_else(|| tool_call_id.as_str().to_string());
1547            let event_error = if tool_name == tool_call_id.as_str() {
1548                format!("Tool call '{tool_call_id}' cancelled")
1549            } else {
1550                format!("Tool '{tool_name}' cancelled")
1551            };
1552            effects.extend(emit_tool_failure_message(
1553                state,
1554                session_id,
1555                tool_call_id.as_str(),
1556                &tool_name,
1557                ToolError::Cancelled(tool_name.clone()),
1558                event_error,
1559                "cancelled",
1560            ));
1561        }
1562    }
1563    state.active_streams.remove(&op.op_id);
1564
1565    let dequeued_item = state.pop_next_queued_work();
1566    let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1567
1568    effects.push(Effect::EmitEvent {
1569        session_id,
1570        event: SessionEvent::OperationCancelled {
1571            op_id: op.op_id,
1572            info: CancellationInfo {
1573                pending_tool_calls: op.pending_tool_calls.len(),
1574                popped_queued_item,
1575            },
1576        },
1577    });
1578
1579    effects.push(Effect::CancelOperation {
1580        session_id,
1581        op_id: op.op_id,
1582    });
1583
1584    state.complete_operation(op.op_id);
1585
1586    if dequeued_item.is_some() {
1587        effects.push(Effect::EmitEvent {
1588            session_id,
1589            event: SessionEvent::QueueUpdated {
1590                queue: snapshot_queue(state),
1591            },
1592        });
1593    }
1594
1595    effects
1596}
1597
1598fn handle_hydrate(
1599    state: &mut AppState,
1600    session_id: crate::app::domain::types::SessionId,
1601    events: Vec<SessionEvent>,
1602    starting_sequence: u64,
1603) -> Vec<Effect> {
1604    for event in events {
1605        apply_event_to_state(state, &event);
1606    }
1607
1608    state.event_sequence = starting_sequence;
1609
1610    emit_mcp_connect_effects(state, session_id)
1611}
1612
1613fn handle_switch_primary_agent(
1614    state: &mut AppState,
1615    session_id: crate::app::domain::types::SessionId,
1616    agent_id: String,
1617) -> Result<Vec<Effect>, ReduceError> {
1618    if state.current_operation.is_some() {
1619        return Err(invalid_action(
1620            InvalidActionKind::OperationInFlight,
1621            "Cannot switch primary agent while an operation is active.",
1622        ));
1623    }
1624
1625    let Some(base_config) = state
1626        .base_session_config
1627        .as_ref()
1628        .or(state.session_config.as_ref())
1629    else {
1630        return Err(invalid_action(
1631            InvalidActionKind::MissingSessionConfig,
1632            "Cannot switch primary agent without session config.",
1633        ));
1634    };
1635
1636    let Some(_spec) = primary_agent_spec(&agent_id) else {
1637        return Err(invalid_action(
1638            InvalidActionKind::UnknownPrimaryAgent,
1639            format!("Unknown primary agent '{agent_id}'."),
1640        ));
1641    };
1642
1643    let mut updated_config = base_config.clone();
1644    updated_config.primary_agent_id = Some(agent_id.clone());
1645    let new_config = resolve_effective_config(&updated_config);
1646    let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1647
1648    apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1649
1650    let mut effects = Vec::new();
1651    effects.push(Effect::EmitEvent {
1652        session_id,
1653        event: SessionEvent::SessionConfigUpdated {
1654            config: Box::new(new_config),
1655            primary_agent_id: agent_id,
1656        },
1657    });
1658    effects.extend(backend_effects);
1659    effects.push(Effect::ReloadToolSchemas { session_id });
1660
1661    Ok(effects)
1662}
1663
1664fn apply_session_config_state(
1665    state: &mut AppState,
1666    config: &crate::session::state::SessionConfig,
1667    primary_agent_id: Option<String>,
1668    update_base: bool,
1669) {
1670    state.apply_session_config(config, primary_agent_id, update_base);
1671}
1672
1673fn mcp_backend_diff_effects(
1674    session_id: crate::app::domain::types::SessionId,
1675    old_config: &crate::session::state::SessionConfig,
1676    new_config: &crate::session::state::SessionConfig,
1677) -> Vec<Effect> {
1678    let old_map = collect_mcp_backends(old_config);
1679    let new_map = collect_mcp_backends(new_config);
1680
1681    let mut effects = Vec::new();
1682
1683    for (server_name, (old_transport, old_filter)) in &old_map {
1684        match new_map.get(server_name) {
1685            None => {
1686                effects.push(Effect::DisconnectMcpServer {
1687                    session_id,
1688                    server_name: server_name.clone(),
1689                });
1690            }
1691            Some((new_transport, new_filter)) => {
1692                if new_transport != old_transport || new_filter != old_filter {
1693                    effects.push(Effect::DisconnectMcpServer {
1694                        session_id,
1695                        server_name: server_name.clone(),
1696                    });
1697                    effects.push(Effect::ConnectMcpServer {
1698                        session_id,
1699                        config: McpServerConfig {
1700                            server_name: server_name.clone(),
1701                            transport: new_transport.clone(),
1702                            tool_filter: new_filter.clone(),
1703                        },
1704                    });
1705                }
1706            }
1707        }
1708    }
1709
1710    for (server_name, (new_transport, new_filter)) in &new_map {
1711        if !old_map.contains_key(server_name) {
1712            effects.push(Effect::ConnectMcpServer {
1713                session_id,
1714                config: McpServerConfig {
1715                    server_name: server_name.clone(),
1716                    transport: new_transport.clone(),
1717                    tool_filter: new_filter.clone(),
1718                },
1719            });
1720        }
1721    }
1722
1723    effects
1724}
1725
1726fn collect_mcp_backends(
1727    config: &crate::session::state::SessionConfig,
1728) -> std::collections::HashMap<
1729    String,
1730    (
1731        crate::tools::McpTransport,
1732        crate::session::state::ToolFilter,
1733    ),
1734> {
1735    let mut map = std::collections::HashMap::new();
1736
1737    for backend_config in &config.tool_config.backends {
1738        let BackendConfig::Mcp {
1739            server_name,
1740            transport,
1741            tool_filter,
1742        } = backend_config;
1743
1744        map.insert(
1745            server_name.clone(),
1746            (transport.clone(), tool_filter.clone()),
1747        );
1748    }
1749
1750    map
1751}
1752
1753pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1754    match event {
1755        SessionEvent::SessionCreated { config, .. } => {
1756            let primary_agent_id = config
1757                .primary_agent_id
1758                .clone()
1759                .unwrap_or_else(|| default_primary_agent_id().to_string());
1760            apply_session_config_state(state, config, Some(primary_agent_id), true);
1761        }
1762        SessionEvent::SessionConfigUpdated {
1763            config,
1764            primary_agent_id,
1765        } => {
1766            apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1767        }
1768        SessionEvent::AssistantMessageAdded { message, .. }
1769        | SessionEvent::UserMessageAdded { message }
1770        | SessionEvent::ToolMessageAdded { message } => {
1771            state.message_graph.add_message(message.clone());
1772            state.message_graph.active_message_id = Some(message.id().to_string());
1773        }
1774        SessionEvent::MessageUpdated { message } => {
1775            state.message_graph.replace_message(message.clone());
1776        }
1777        SessionEvent::ApprovalDecided {
1778            decision, remember, ..
1779        } => {
1780            if *decision == ApprovalDecision::Approved
1781                && let Some(memory) = remember
1782            {
1783                match memory {
1784                    ApprovalMemory::Tool(name) => {
1785                        state.approved_tools.insert(name.clone());
1786                    }
1787                    ApprovalMemory::BashPattern(pattern) => {
1788                        state.approved_bash_patterns.insert(pattern.clone());
1789                    }
1790                    ApprovalMemory::PendingTool => {}
1791                }
1792            }
1793            state.pending_approval = None;
1794        }
1795        SessionEvent::OperationCompleted { op_id } => {
1796            state.complete_operation(*op_id);
1797        }
1798        SessionEvent::OperationCancelled { op_id, .. } => {
1799            state.record_cancelled_op(*op_id);
1800            state.complete_operation(*op_id);
1801        }
1802        SessionEvent::McpServerStateChanged {
1803            server_name,
1804            state: mcp_state,
1805        } => {
1806            state
1807                .mcp_servers
1808                .insert(server_name.clone(), mcp_state.clone());
1809        }
1810        SessionEvent::QueueUpdated { queue } => {
1811            let parse_content = |content: &str| {
1812                if content.trim().is_empty() {
1813                    None
1814                } else {
1815                    Some(vec![UserContent::Text {
1816                        text: content.to_string(),
1817                    }])
1818                }
1819            };
1820
1821            state.queued_work = queue
1822                .iter()
1823                .filter_map(|item| match item.kind {
1824                    Some(QueuedWorkKind::UserMessage) => {
1825                        let content = parse_content(item.content.as_str())?;
1826                        Some(QueuedWorkItem::UserMessage(
1827                            crate::app::domain::state::QueuedUserMessage {
1828                                content,
1829                                op_id: item.op_id,
1830                                message_id: item.message_id.clone(),
1831                                model: item.model.clone().unwrap_or_else(
1832                                    crate::config::model::builtin::claude_sonnet_4_5,
1833                                ),
1834                                queued_at: item.queued_at,
1835                            },
1836                        ))
1837                    }
1838                    Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1839                        crate::app::domain::state::QueuedBashCommand {
1840                            command: item.content.clone(),
1841                            op_id: item.op_id,
1842                            message_id: item.message_id.clone(),
1843                            queued_at: item.queued_at,
1844                        },
1845                    )),
1846                    None => {
1847                        let content = parse_content(item.content.as_str())?;
1848                        Some(QueuedWorkItem::UserMessage(
1849                            crate::app::domain::state::QueuedUserMessage {
1850                                content,
1851                                op_id: item.op_id,
1852                                message_id: item.message_id.clone(),
1853                                model: item.model.clone().unwrap_or_else(
1854                                    crate::config::model::builtin::claude_sonnet_4_5,
1855                                ),
1856                                queued_at: item.queued_at,
1857                            },
1858                        ))
1859                    }
1860                })
1861                .collect();
1862        }
1863        _ => {}
1864    }
1865
1866    state.event_sequence += 1;
1867}
1868
1869struct CompactionCompleteParams {
1870    op_id: crate::app::domain::types::OpId,
1871    compaction_id: crate::app::domain::types::CompactionId,
1872    summary_message_id: crate::app::domain::types::MessageId,
1873    summary: String,
1874    compacted_head_message_id: crate::app::domain::types::MessageId,
1875    previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1876    model_name: String,
1877    timestamp: u64,
1878}
1879
1880fn handle_compaction_complete(
1881    state: &mut AppState,
1882    session_id: crate::app::domain::types::SessionId,
1883    params: CompactionCompleteParams,
1884) -> Vec<Effect> {
1885    use crate::app::conversation::{AssistantContent, Message, MessageData};
1886    use crate::app::domain::types::CompactionRecord;
1887
1888    let CompactionCompleteParams {
1889        op_id,
1890        compaction_id,
1891        summary_message_id,
1892        summary,
1893        compacted_head_message_id,
1894        previous_active_message_id,
1895        model_name,
1896        timestamp,
1897    } = params;
1898
1899    let summary_message = Message {
1900        data: MessageData::Assistant {
1901            content: vec![AssistantContent::Text {
1902                text: summary.clone(),
1903            }],
1904        },
1905        id: summary_message_id.to_string(),
1906        parent_message_id: None,
1907        timestamp,
1908    };
1909
1910    state.message_graph.add_message(summary_message.clone());
1911
1912    let record = CompactionRecord::with_timestamp(
1913        compaction_id,
1914        summary_message_id,
1915        compacted_head_message_id,
1916        previous_active_message_id,
1917        model_name,
1918        timestamp,
1919    );
1920
1921    let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1922        model
1923    } else {
1924        state.complete_operation(op_id);
1925        return vec![Effect::EmitEvent {
1926            session_id,
1927            event: SessionEvent::Error {
1928                message: format!("Missing model for compaction operation {op_id}"),
1929            },
1930        }];
1931    };
1932
1933    state.complete_operation(op_id);
1934
1935    let mut effects = vec![
1936        Effect::EmitEvent {
1937            session_id,
1938            event: SessionEvent::AssistantMessageAdded {
1939                message: summary_message,
1940                model,
1941            },
1942        },
1943        Effect::EmitEvent {
1944            session_id,
1945            event: SessionEvent::CompactResult {
1946                result: crate::app::domain::event::CompactResult::Success(summary),
1947            },
1948        },
1949        Effect::EmitEvent {
1950            session_id,
1951            event: SessionEvent::ConversationCompacted { record },
1952        },
1953        Effect::EmitEvent {
1954            session_id,
1955            event: SessionEvent::OperationCompleted { op_id },
1956        },
1957    ];
1958
1959    effects.extend(maybe_start_queued_work(state, session_id));
1960
1961    effects
1962}
1963
1964fn handle_compaction_failed(
1965    state: &mut AppState,
1966    session_id: crate::app::domain::types::SessionId,
1967    op_id: crate::app::domain::types::OpId,
1968    error: String,
1969) -> Vec<Effect> {
1970    state.complete_operation(op_id);
1971
1972    let mut effects = vec![
1973        Effect::EmitEvent {
1974            session_id,
1975            event: SessionEvent::Error { message: error },
1976        },
1977        Effect::EmitEvent {
1978            session_id,
1979            event: SessionEvent::OperationCompleted { op_id },
1980        },
1981    ];
1982
1983    effects.extend(maybe_start_queued_work(state, session_id));
1984
1985    effects
1986}
1987
1988fn emit_mcp_connect_effects(
1989    state: &AppState,
1990    session_id: crate::app::domain::types::SessionId,
1991) -> Vec<Effect> {
1992    let mut effects = Vec::new();
1993
1994    let Some(ref config) = state.session_config else {
1995        return effects;
1996    };
1997
1998    for backend_config in &config.tool_config.backends {
1999        let BackendConfig::Mcp {
2000            server_name,
2001            transport,
2002            tool_filter,
2003        } = backend_config;
2004
2005        let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2006            matches!(
2007                s,
2008                McpServerState::Connecting | McpServerState::Connected { .. }
2009            )
2010        });
2011
2012        if !already_connected {
2013            effects.push(Effect::ConnectMcpServer {
2014                session_id,
2015                config: McpServerConfig {
2016                    server_name: server_name.clone(),
2017                    transport: transport.clone(),
2018                    tool_filter: tool_filter.clone(),
2019                },
2020            });
2021        }
2022    }
2023
2024    effects
2025}
2026
2027#[cfg(test)]
2028mod tests {
2029    use super::*;
2030    use crate::app::domain::state::{OperationState, PendingApproval};
2031    use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2032    use crate::config::model::builtin;
2033    use crate::primary_agents::resolve_effective_config;
2034    use crate::session::state::{
2035        ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2036        ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2037    };
2038    use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2039    use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2040    use schemars::schema_for;
2041    use serde_json::json;
2042    use std::collections::HashSet;
2043    use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2044
2045    fn test_state() -> AppState {
2046        AppState::new(SessionId::new())
2047    }
2048
2049    fn test_schema(name: &str) -> ToolSchema {
2050        ToolSchema {
2051            name: name.to_string(),
2052            display_name: name.to_string(),
2053            description: String::new(),
2054            input_schema: InputSchema::empty_object(),
2055        }
2056    }
2057
2058    fn base_session_config() -> SessionConfig {
2059        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2060        config.primary_agent_id = Some("normal".to_string());
2061        config.policy_overrides = SessionPolicyOverrides::empty();
2062        resolve_effective_config(&config)
2063    }
2064
2065    fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2066        super::reduce(state, action).expect("reduce failed")
2067    }
2068
2069    #[test]
2070    fn test_user_input_starts_operation() {
2071        let mut state = test_state();
2072        let session_id = state.session_id;
2073        let op_id = OpId::new();
2074        let message_id = MessageId::new();
2075        let model = builtin::claude_sonnet_4_5();
2076
2077        let effects = reduce(
2078            &mut state,
2079            Action::UserInput {
2080                session_id,
2081                content: vec![UserContent::Text {
2082                    text: "Hello".to_string(),
2083                }],
2084                op_id,
2085                message_id,
2086                model,
2087                timestamp: 1_234_567_890,
2088            },
2089        );
2090
2091        assert_eq!(state.message_graph.messages.len(), 1);
2092        assert!(state.current_operation.is_some());
2093        assert!(
2094            effects
2095                .iter()
2096                .any(|e| matches!(e, Effect::CallModel { .. }))
2097        );
2098    }
2099
2100    #[test]
2101    fn test_switch_primary_agent_updates_visibility() {
2102        let mut state = test_state();
2103        let session_id = state.session_id;
2104        let config = base_session_config();
2105        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2106
2107        let effects = reduce(
2108            &mut state,
2109            Action::SwitchPrimaryAgent {
2110                session_id,
2111                agent_id: "plan".to_string(),
2112            },
2113        );
2114
2115        let updated = state.session_config.as_ref().expect("config");
2116        match &updated.tool_config.visibility {
2117            ToolVisibility::Whitelist(allowed) => {
2118                assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2119                for name in READ_ONLY_TOOL_NAMES {
2120                    assert!(allowed.contains(*name));
2121                }
2122                assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2123            }
2124            other => panic!("Unexpected tool visibility: {other:?}"),
2125        }
2126        assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2127        assert!(effects.iter().any(|e| matches!(
2128            e,
2129            Effect::EmitEvent {
2130                event: SessionEvent::SessionConfigUpdated { .. },
2131                ..
2132            }
2133        )));
2134        assert!(
2135            effects
2136                .iter()
2137                .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2138        );
2139    }
2140
2141    #[test]
2142    fn test_switch_primary_agent_yolo_auto_approves() {
2143        let mut state = test_state();
2144        let session_id = state.session_id;
2145        let config = base_session_config();
2146        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2147
2148        let _ = reduce(
2149            &mut state,
2150            Action::SwitchPrimaryAgent {
2151                session_id,
2152                agent_id: "yolo".to_string(),
2153            },
2154        );
2155
2156        let updated = state.session_config.as_ref().expect("config");
2157        assert_eq!(
2158            updated.tool_config.approval_policy.default_behavior,
2159            UnapprovedBehavior::Allow
2160        );
2161    }
2162
2163    #[test]
2164    fn test_switch_primary_agent_preserves_policy_overrides() {
2165        let mut state = test_state();
2166        let session_id = state.session_id;
2167
2168        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2169        config.primary_agent_id = Some("normal".to_string());
2170        config.policy_overrides = SessionPolicyOverrides {
2171            default_model: None,
2172            tool_visibility: None,
2173            approval_policy: ToolApprovalPolicyOverrides {
2174                default_behavior: Some(UnapprovedBehavior::Deny),
2175                preapproved: ApprovalRulesOverrides::empty(),
2176            },
2177        };
2178        let config = resolve_effective_config(&config);
2179        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2180
2181        let _ = reduce(
2182            &mut state,
2183            Action::SwitchPrimaryAgent {
2184                session_id,
2185                agent_id: "yolo".to_string(),
2186            },
2187        );
2188
2189        let updated = state.session_config.as_ref().expect("config");
2190        assert_eq!(
2191            updated.tool_config.approval_policy.default_behavior,
2192            UnapprovedBehavior::Deny
2193        );
2194        assert_eq!(
2195            updated.policy_overrides.approval_policy.default_behavior,
2196            Some(UnapprovedBehavior::Deny)
2197        );
2198    }
2199
2200    #[test]
2201    fn dispatch_agent_resume_is_auto_approved() {
2202        let mut state = test_state();
2203        let session_id = state.session_id;
2204        let config = base_session_config();
2205        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2206
2207        let tool_call = ToolCall {
2208            id: "tc_dispatch_resume".to_string(),
2209            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2210            parameters: json!({
2211                "prompt": "resume work",
2212                "target": {
2213                    "session": "resume",
2214                    "session_id": SessionId::new().to_string()
2215                }
2216            }),
2217        };
2218
2219        let decision = get_tool_decision(&state, &tool_call);
2220        assert_eq!(decision, ToolDecision::Allow);
2221        assert_eq!(state.session_id, session_id);
2222    }
2223
2224    #[test]
2225    fn test_switch_primary_agent_restores_base_prompt() {
2226        let mut state = test_state();
2227        let session_id = state.session_id;
2228        let mut config = base_session_config();
2229        config.system_prompt = Some("base prompt".to_string());
2230        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2231
2232        let _ = reduce(
2233            &mut state,
2234            Action::SwitchPrimaryAgent {
2235                session_id,
2236                agent_id: "plan".to_string(),
2237            },
2238        );
2239
2240        let _ = reduce(
2241            &mut state,
2242            Action::SwitchPrimaryAgent {
2243                session_id,
2244                agent_id: "normal".to_string(),
2245            },
2246        );
2247
2248        let updated = state.session_config.as_ref().expect("config");
2249        assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2250    }
2251
2252    #[test]
2253    fn test_switch_primary_agent_blocked_during_operation() {
2254        let mut state = test_state();
2255        let session_id = state.session_id;
2256        let config = base_session_config();
2257        apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2258
2259        state.current_operation = Some(OperationState {
2260            op_id: OpId::new(),
2261            kind: OperationKind::AgentLoop,
2262            pending_tool_calls: HashSet::new(),
2263        });
2264
2265        let result = super::reduce(
2266            &mut state,
2267            Action::SwitchPrimaryAgent {
2268                session_id,
2269                agent_id: "plan".to_string(),
2270            },
2271        );
2272
2273        assert!(matches!(
2274            result,
2275            Err(ReduceError::InvalidAction {
2276                kind: InvalidActionKind::OperationInFlight,
2277                ..
2278            })
2279        ));
2280        assert!(state.primary_agent_id.as_deref() == Some("normal"));
2281    }
2282
2283    #[test]
2284    fn test_late_result_ignored_after_cancel() {
2285        let mut state = test_state();
2286        let session_id = state.session_id;
2287        let op_id = OpId::new();
2288        let tool_call_id = ToolCallId::from_string("tc_1");
2289
2290        state.current_operation = Some(OperationState {
2291            op_id,
2292            kind: OperationKind::AgentLoop,
2293            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2294        });
2295
2296        let _ = reduce(
2297            &mut state,
2298            Action::Cancel {
2299                session_id,
2300                op_id: None,
2301            },
2302        );
2303
2304        state.current_operation = Some(OperationState {
2305            op_id,
2306            kind: OperationKind::AgentLoop,
2307            pending_tool_calls: HashSet::new(),
2308        });
2309        state
2310            .operation_models
2311            .insert(op_id, builtin::claude_sonnet_4_5());
2312        state
2313            .operation_models
2314            .insert(op_id, builtin::claude_sonnet_4_5());
2315        state
2316            .operation_models
2317            .insert(op_id, builtin::claude_sonnet_4_5());
2318
2319        let effects = reduce(
2320            &mut state,
2321            Action::ToolResult {
2322                session_id,
2323                tool_call_id,
2324                tool_name: "test".to_string(),
2325                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2326                    tool_name: "test".to_string(),
2327                    payload: "done".to_string(),
2328                })),
2329            },
2330        );
2331
2332        assert!(effects.is_empty());
2333    }
2334
2335    #[test]
2336    fn test_pre_approved_tool_executes_immediately() {
2337        let mut state = test_state();
2338        let session_id = state.session_id;
2339        let op_id = OpId::new();
2340
2341        state.approved_tools.insert("test_tool".to_string());
2342        state.current_operation = Some(OperationState {
2343            op_id,
2344            kind: OperationKind::AgentLoop,
2345            pending_tool_calls: HashSet::new(),
2346        });
2347        state
2348            .operation_models
2349            .insert(op_id, builtin::claude_sonnet_4_5());
2350        state
2351            .operation_models
2352            .insert(op_id, builtin::claude_sonnet_4_5());
2353        state
2354            .operation_models
2355            .insert(op_id, builtin::claude_sonnet_4_5());
2356
2357        let tool_call = steer_tools::ToolCall {
2358            id: "tc_1".to_string(),
2359            name: "test_tool".to_string(),
2360            parameters: serde_json::json!({}),
2361        };
2362
2363        let effects = reduce(
2364            &mut state,
2365            Action::ToolApprovalRequested {
2366                session_id,
2367                request_id: RequestId::new(),
2368                tool_call,
2369            },
2370        );
2371
2372        assert!(
2373            effects
2374                .iter()
2375                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2376        );
2377        assert!(state.pending_approval.is_none());
2378    }
2379
2380    #[test]
2381    fn test_denied_tool_request_emits_failure_message() {
2382        let mut state = test_state();
2383        let session_id = state.session_id;
2384        let op_id = OpId::new();
2385
2386        state.current_operation = Some(OperationState {
2387            op_id,
2388            kind: OperationKind::AgentLoop,
2389            pending_tool_calls: HashSet::new(),
2390        });
2391
2392        state
2393            .operation_models
2394            .insert(op_id, builtin::claude_sonnet_4_5());
2395
2396        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2397        config.tool_config.approval_policy = ToolApprovalPolicy {
2398            default_behavior: UnapprovedBehavior::Deny,
2399            preapproved: ApprovalRules::default(),
2400        };
2401        state.session_config = Some(config);
2402
2403        let tool_call = steer_tools::ToolCall {
2404            id: "tc_1".to_string(),
2405            name: "test_tool".to_string(),
2406            parameters: serde_json::json!({}),
2407        };
2408
2409        let effects = reduce(
2410            &mut state,
2411            Action::ToolApprovalRequested {
2412                session_id,
2413                request_id: RequestId::new(),
2414                tool_call,
2415            },
2416        );
2417
2418        assert!(effects.iter().any(|e| matches!(
2419            e,
2420            Effect::EmitEvent {
2421                event: SessionEvent::ToolCallFailed { .. },
2422                ..
2423            }
2424        )));
2425        assert!(effects.iter().any(|e| matches!(
2426            e,
2427            Effect::EmitEvent {
2428                event: SessionEvent::ToolMessageAdded { .. },
2429                ..
2430            }
2431        )));
2432        assert!(
2433            !effects
2434                .iter()
2435                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2436        );
2437        assert!(
2438            !effects
2439                .iter()
2440                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2441        );
2442        assert!(state.pending_approval.is_none());
2443        assert!(state.approval_queue.is_empty());
2444        assert_eq!(state.message_graph.messages.len(), 1);
2445
2446        match &state.message_graph.messages[0].data {
2447            MessageData::Tool { result, .. } => match result {
2448                ToolResult::Error(error) => {
2449                    assert!(
2450                        matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2451                    );
2452                }
2453                _ => panic!("expected denied tool error"),
2454            },
2455            _ => panic!("expected tool message"),
2456        }
2457    }
2458
2459    #[test]
2460    fn test_user_denied_tool_request_emits_failure_message() {
2461        let mut state = test_state();
2462        let session_id = state.session_id;
2463        let op_id = OpId::new();
2464
2465        state.current_operation = Some(OperationState {
2466            op_id,
2467            kind: OperationKind::AgentLoop,
2468            pending_tool_calls: HashSet::new(),
2469        });
2470        state
2471            .operation_models
2472            .insert(op_id, builtin::claude_sonnet_4_5());
2473
2474        let tool_call = steer_tools::ToolCall {
2475            id: "tc_1".to_string(),
2476            name: "test_tool".to_string(),
2477            parameters: serde_json::json!({}),
2478        };
2479        let request_id = RequestId::new();
2480        state.pending_approval = Some(PendingApproval {
2481            request_id,
2482            tool_call: tool_call.clone(),
2483        });
2484
2485        let effects = reduce(
2486            &mut state,
2487            Action::ToolApprovalDecided {
2488                session_id,
2489                request_id,
2490                decision: ApprovalDecision::Denied,
2491                remember: None,
2492            },
2493        );
2494
2495        assert!(effects.iter().any(|e| matches!(
2496            e,
2497            Effect::EmitEvent {
2498                event: SessionEvent::ToolCallFailed { .. },
2499                ..
2500            }
2501        )));
2502        assert!(effects.iter().any(|e| matches!(
2503            e,
2504            Effect::EmitEvent {
2505                event: SessionEvent::ToolMessageAdded { .. },
2506                ..
2507            }
2508        )));
2509        assert!(
2510            !effects
2511                .iter()
2512                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2513        );
2514        assert!(state.pending_approval.is_none());
2515        assert!(state.approval_queue.is_empty());
2516        assert_eq!(state.message_graph.messages.len(), 1);
2517
2518        match &state.message_graph.messages[0].data {
2519            MessageData::Tool { result, .. } => match result {
2520                ToolResult::Error(error) => {
2521                    assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2522                }
2523                _ => panic!("expected denied tool error"),
2524            },
2525            _ => panic!("expected tool message"),
2526        }
2527    }
2528
2529    #[test]
2530    fn test_cancel_pops_queued_item_without_auto_start() {
2531        let mut state = test_state();
2532        let session_id = state.session_id;
2533        let op_id = OpId::new();
2534
2535        state.current_operation = Some(OperationState {
2536            op_id,
2537            kind: OperationKind::AgentLoop,
2538            pending_tool_calls: HashSet::new(),
2539        });
2540        state
2541            .operation_models
2542            .insert(op_id, builtin::claude_sonnet_4_5());
2543
2544        let queued_op = OpId::new();
2545        let queued_message_id = MessageId::from_string("queued_msg");
2546        let _ = reduce(
2547            &mut state,
2548            Action::UserInput {
2549                session_id,
2550                content: vec![UserContent::Text {
2551                    text: "Queued message".to_string(),
2552                }],
2553                op_id: queued_op,
2554                message_id: queued_message_id.clone(),
2555                model: builtin::claude_sonnet_4_5(),
2556                timestamp: 1,
2557            },
2558        );
2559
2560        let effects = reduce(
2561            &mut state,
2562            Action::Cancel {
2563                session_id,
2564                op_id: None,
2565            },
2566        );
2567
2568        assert!(state.current_operation.is_none());
2569        assert!(state.queued_work.is_empty());
2570
2571        let cancellation_info = effects.iter().find_map(|effect| match effect {
2572            Effect::EmitEvent {
2573                event: SessionEvent::OperationCancelled { info, .. },
2574                ..
2575            } => Some(info),
2576            _ => None,
2577        });
2578        let info = cancellation_info.expect("expected OperationCancelled event");
2579        let popped = info
2580            .popped_queued_item
2581            .as_ref()
2582            .expect("expected popped queued item");
2583        assert_eq!(popped.content, "Queued message");
2584        assert_eq!(popped.op_id, queued_op);
2585        assert_eq!(popped.message_id, queued_message_id);
2586
2587        assert!(
2588            !effects.iter().any(|effect| matches!(
2589                effect,
2590                Effect::EmitEvent {
2591                    event: SessionEvent::OperationStarted { .. },
2592                    ..
2593                }
2594            )),
2595            "queued work should not auto-start on cancel"
2596        );
2597    }
2598
2599    #[test]
2600    fn test_cancel_injects_tool_results_for_pending_calls() {
2601        let mut state = test_state();
2602        let session_id = state.session_id;
2603        let op_id = OpId::new();
2604
2605        let tool_call = ToolCall {
2606            id: "tc_1".to_string(),
2607            name: "test_tool".to_string(),
2608            parameters: serde_json::json!({}),
2609        };
2610
2611        state.message_graph.add_message(Message {
2612            data: MessageData::Assistant {
2613                content: vec![AssistantContent::ToolCall {
2614                    tool_call: tool_call.clone(),
2615                    thought_signature: None,
2616                }],
2617            },
2618            timestamp: 0,
2619            id: "msg_1".to_string(),
2620            parent_message_id: None,
2621        });
2622
2623        state.current_operation = Some(OperationState {
2624            op_id,
2625            kind: OperationKind::AgentLoop,
2626            pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2627        });
2628        state
2629            .operation_models
2630            .insert(op_id, builtin::claude_sonnet_4_5());
2631
2632        let effects = reduce(
2633            &mut state,
2634            Action::Cancel {
2635                session_id,
2636                op_id: None,
2637            },
2638        );
2639
2640        assert!(effects.iter().any(|e| matches!(
2641            e,
2642            Effect::EmitEvent {
2643                event: SessionEvent::ToolMessageAdded { .. },
2644                ..
2645            }
2646        )));
2647
2648        let tool_message = state
2649            .message_graph
2650            .messages
2651            .iter()
2652            .find(|message| matches!(message.data, MessageData::Tool { .. }))
2653            .expect("tool result should be injected on cancel");
2654
2655        match &tool_message.data {
2656            MessageData::Tool { result, .. } => match result {
2657                ToolResult::Error(error) => {
2658                    assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2659                }
2660                _ => panic!("expected cancelled tool error"),
2661            },
2662            _ => panic!("expected tool message"),
2663        }
2664    }
2665
2666    #[test]
2667    fn test_malformed_tool_call_auto_denies() {
2668        let mut state = test_state();
2669        let session_id = state.session_id;
2670        let op_id = OpId::new();
2671
2672        state.current_operation = Some(OperationState {
2673            op_id,
2674            kind: OperationKind::AgentLoop,
2675            pending_tool_calls: HashSet::new(),
2676        });
2677
2678        state
2679            .operation_models
2680            .insert(op_id, builtin::claude_sonnet_4_5());
2681
2682        let mut properties = serde_json::Map::new();
2683        properties.insert("command".to_string(), json!({ "type": "string" }));
2684
2685        state.tools.push(ToolSchema {
2686            name: "test_tool".to_string(),
2687            display_name: "test_tool".to_string(),
2688            description: String::new(),
2689            input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2690        });
2691
2692        let tool_call = ToolCall {
2693            id: "tc_1".to_string(),
2694            name: "test_tool".to_string(),
2695            parameters: json!({}),
2696        };
2697
2698        let effects = reduce(
2699            &mut state,
2700            Action::ToolApprovalRequested {
2701                session_id,
2702                request_id: RequestId::new(),
2703                tool_call,
2704            },
2705        );
2706
2707        assert!(effects.iter().any(|e| matches!(
2708            e,
2709            Effect::EmitEvent {
2710                event: SessionEvent::ToolCallFailed { .. },
2711                ..
2712            }
2713        )));
2714        assert!(effects.iter().any(|e| matches!(
2715            e,
2716            Effect::EmitEvent {
2717                event: SessionEvent::ToolMessageAdded { .. },
2718                ..
2719            }
2720        )));
2721        assert!(
2722            !effects
2723                .iter()
2724                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2725        );
2726        assert!(
2727            !effects
2728                .iter()
2729                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2730        );
2731        assert!(state.pending_approval.is_none());
2732        assert!(state.approval_queue.is_empty());
2733        assert_eq!(state.message_graph.messages.len(), 1);
2734
2735        match &state.message_graph.messages[0].data {
2736            MessageData::Tool { result, .. } => match result {
2737                ToolResult::Error(error) => {
2738                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2739                }
2740                _ => panic!("expected invalid params tool error"),
2741            },
2742            _ => panic!("expected tool message"),
2743        }
2744    }
2745
2746    #[test]
2747    fn test_approval_queuing() {
2748        let mut state = test_state();
2749        let session_id = state.session_id;
2750        let op_id = OpId::new();
2751
2752        state.current_operation = Some(OperationState {
2753            op_id,
2754            kind: OperationKind::AgentLoop,
2755            pending_tool_calls: HashSet::new(),
2756        });
2757
2758        let tool_call_1 = steer_tools::ToolCall {
2759            id: "tc_1".to_string(),
2760            name: "tool_1".to_string(),
2761            parameters: serde_json::json!({}),
2762        };
2763        let tool_call_2 = steer_tools::ToolCall {
2764            id: "tc_2".to_string(),
2765            name: "tool_2".to_string(),
2766            parameters: serde_json::json!({}),
2767        };
2768
2769        let _ = reduce(
2770            &mut state,
2771            Action::ToolApprovalRequested {
2772                session_id,
2773                request_id: RequestId::new(),
2774                tool_call: tool_call_1,
2775            },
2776        );
2777
2778        assert!(state.pending_approval.is_some());
2779
2780        let _ = reduce(
2781            &mut state,
2782            Action::ToolApprovalRequested {
2783                session_id,
2784                request_id: RequestId::new(),
2785                tool_call: tool_call_2,
2786            },
2787        );
2788
2789        assert_eq!(state.approval_queue.len(), 1);
2790    }
2791
2792    #[test]
2793    fn test_dispatch_agent_missing_target_auto_denies() {
2794        let mut state = test_state();
2795        let session_id = state.session_id;
2796        let op_id = OpId::new();
2797
2798        state.current_operation = Some(OperationState {
2799            op_id,
2800            kind: OperationKind::AgentLoop,
2801            pending_tool_calls: HashSet::new(),
2802        });
2803
2804        state
2805            .operation_models
2806            .insert(op_id, builtin::claude_sonnet_4_5());
2807
2808        let input_schema: InputSchema =
2809            schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2810        state.tools.push(ToolSchema {
2811            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2812            display_name: "Dispatch Agent".to_string(),
2813            description: String::new(),
2814            input_schema,
2815        });
2816
2817        let tool_call = ToolCall {
2818            id: "tc_dispatch".to_string(),
2819            name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2820            parameters: json!({ "prompt": "hello world" }),
2821        };
2822
2823        let effects = reduce(
2824            &mut state,
2825            Action::ToolApprovalRequested {
2826                session_id,
2827                request_id: RequestId::new(),
2828                tool_call,
2829            },
2830        );
2831
2832        assert!(effects.iter().any(|e| matches!(
2833            e,
2834            Effect::EmitEvent {
2835                event: SessionEvent::ToolCallFailed { .. },
2836                ..
2837            }
2838        )));
2839        assert!(effects.iter().any(|e| matches!(
2840            e,
2841            Effect::EmitEvent {
2842                event: SessionEvent::ToolMessageAdded { .. },
2843                ..
2844            }
2845        )));
2846        assert!(
2847            !effects
2848                .iter()
2849                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2850        );
2851        assert!(state.pending_approval.is_none());
2852        assert!(state.approval_queue.is_empty());
2853
2854        match &state.message_graph.messages[0].data {
2855            MessageData::Tool { result, .. } => match result {
2856                ToolResult::Error(error) => {
2857                    assert!(matches!(error, ToolError::InvalidParams { .. }));
2858                }
2859                _ => panic!("expected invalid params tool error"),
2860            },
2861            _ => panic!("expected tool message"),
2862        }
2863    }
2864
2865    #[test]
2866    fn test_model_response_with_tool_calls_requests_approval() {
2867        let mut state = test_state();
2868        let session_id = state.session_id;
2869        let op_id = OpId::new();
2870        let message_id = MessageId::new();
2871
2872        state.current_operation = Some(OperationState {
2873            op_id,
2874            kind: OperationKind::AgentLoop,
2875            pending_tool_calls: HashSet::new(),
2876        });
2877        state
2878            .operation_models
2879            .insert(op_id, builtin::claude_sonnet_4_5());
2880
2881        let tool_call = steer_tools::ToolCall {
2882            id: "tc_1".to_string(),
2883            name: "bash".to_string(),
2884            parameters: serde_json::json!({"command": "ls"}),
2885        };
2886
2887        let content = vec![
2888            AssistantContent::Text {
2889                text: "Let me list the files.".to_string(),
2890            },
2891            AssistantContent::ToolCall {
2892                tool_call: tool_call.clone(),
2893                thought_signature: None,
2894            },
2895        ];
2896
2897        let effects = reduce(
2898            &mut state,
2899            Action::ModelResponseComplete {
2900                session_id,
2901                op_id,
2902                message_id,
2903                content,
2904                timestamp: 12345,
2905            },
2906        );
2907
2908        assert!(state.pending_approval.is_some());
2909        assert!(
2910            effects
2911                .iter()
2912                .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2913        );
2914        assert!(state.current_operation.is_some());
2915    }
2916
2917    #[test]
2918    fn test_model_response_no_tools_completes_operation() {
2919        let mut state = test_state();
2920        let session_id = state.session_id;
2921        let op_id = OpId::new();
2922        let message_id = MessageId::new();
2923
2924        state.current_operation = Some(OperationState {
2925            op_id,
2926            kind: OperationKind::AgentLoop,
2927            pending_tool_calls: HashSet::new(),
2928        });
2929        state
2930            .operation_models
2931            .insert(op_id, builtin::claude_sonnet_4_5());
2932
2933        let content = vec![AssistantContent::Text {
2934            text: "Hello! How can I help?".to_string(),
2935        }];
2936
2937        let effects = reduce(
2938            &mut state,
2939            Action::ModelResponseComplete {
2940                session_id,
2941                op_id,
2942                message_id,
2943                content,
2944                timestamp: 12345,
2945            },
2946        );
2947
2948        assert!(state.current_operation.is_none());
2949        assert!(effects.iter().any(|e| matches!(
2950            e,
2951            Effect::EmitEvent {
2952                event: SessionEvent::OperationCompleted { .. },
2953                ..
2954            }
2955        )));
2956    }
2957
2958    #[test]
2959    fn test_out_of_order_completion_preserves_newer_operation() {
2960        let mut state = test_state();
2961        let session_id = state.session_id;
2962        let model = builtin::claude_sonnet_4_5();
2963
2964        let op_a = OpId::new();
2965        let op_b = OpId::new();
2966
2967        let _ = reduce(
2968            &mut state,
2969            Action::UserInput {
2970                session_id,
2971                content: vec![UserContent::Text {
2972                    text: "first".to_string(),
2973                }],
2974                op_id: op_a,
2975                message_id: MessageId::new(),
2976                model: model.clone(),
2977                timestamp: 1,
2978            },
2979        );
2980
2981        let _ = reduce(
2982            &mut state,
2983            Action::UserInput {
2984                session_id,
2985                content: vec![UserContent::Text {
2986                    text: "second".to_string(),
2987                }],
2988                op_id: op_b,
2989                message_id: MessageId::new(),
2990                model: model.clone(),
2991                timestamp: 2,
2992            },
2993        );
2994
2995        let _ = reduce(
2996            &mut state,
2997            Action::ModelResponseComplete {
2998                session_id,
2999                op_id: op_a,
3000                message_id: MessageId::new(),
3001                content: vec![AssistantContent::Text {
3002                    text: "done A".to_string(),
3003                }],
3004                timestamp: 3,
3005            },
3006        );
3007
3008        assert!(
3009            state
3010                .current_operation
3011                .as_ref()
3012                .is_some_and(|op| op.op_id == op_b)
3013        );
3014        assert!(state.operation_models.contains_key(&op_b));
3015        assert!(!state.operation_models.contains_key(&op_a));
3016
3017        let effects = reduce(
3018            &mut state,
3019            Action::ModelResponseComplete {
3020                session_id,
3021                op_id: op_b,
3022                message_id: MessageId::new(),
3023                content: vec![AssistantContent::Text {
3024                    text: "done B".to_string(),
3025                }],
3026                timestamp: 4,
3027            },
3028        );
3029
3030        assert!(effects.iter().any(|e| matches!(
3031            e,
3032            Effect::EmitEvent {
3033                event: SessionEvent::OperationCompleted { op_id },
3034                ..
3035            } if *op_id == op_b
3036        )));
3037        assert!(!effects.iter().any(|e| matches!(
3038            e,
3039            Effect::EmitEvent {
3040                event: SessionEvent::Error { message },
3041                ..
3042            } if message.contains("Missing model for operation")
3043        )));
3044    }
3045
3046    #[test]
3047    fn test_tool_approval_does_not_call_model_before_result() {
3048        let mut state = test_state();
3049        let session_id = state.session_id;
3050        let op_id = OpId::new();
3051
3052        state.current_operation = Some(OperationState {
3053            op_id,
3054            kind: OperationKind::AgentLoop,
3055            pending_tool_calls: HashSet::new(),
3056        });
3057        state
3058            .operation_models
3059            .insert(op_id, builtin::claude_sonnet_4_5());
3060
3061        let tool_call = steer_tools::ToolCall {
3062            id: "tc_1".to_string(),
3063            name: "bash".to_string(),
3064            parameters: serde_json::json!({"command": "ls"}),
3065        };
3066        let request_id = RequestId::new();
3067        state.pending_approval = Some(PendingApproval {
3068            request_id,
3069            tool_call: tool_call.clone(),
3070        });
3071
3072        let effects = reduce(
3073            &mut state,
3074            Action::ToolApprovalDecided {
3075                session_id,
3076                request_id,
3077                decision: ApprovalDecision::Approved,
3078                remember: None,
3079            },
3080        );
3081
3082        assert!(
3083            effects
3084                .iter()
3085                .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3086        );
3087        assert!(
3088            !effects
3089                .iter()
3090                .any(|e| matches!(e, Effect::CallModel { .. }))
3091        );
3092        assert!(state.current_operation.as_ref().is_some_and(|op| {
3093            op.pending_tool_calls
3094                .contains(&ToolCallId::from_string("tc_1"))
3095        }));
3096    }
3097
3098    #[test]
3099    fn test_mcp_tool_visibility_and_disconnect_removal() {
3100        let mut state = test_state();
3101        let session_id = state.session_id;
3102
3103        let mut allowed = HashSet::new();
3104        allowed.insert("mcp__alpha__allowed".to_string());
3105
3106        let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3107        config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3108        state.session_config = Some(config);
3109
3110        state.tools.push(test_schema("bash"));
3111
3112        let _ = reduce(
3113            &mut state,
3114            Action::McpServerStateChanged {
3115                session_id,
3116                server_name: "alpha".to_string(),
3117                state: McpServerState::Connected {
3118                    tools: vec![
3119                        test_schema("mcp__alpha__allowed"),
3120                        test_schema("mcp__alpha__blocked"),
3121                    ],
3122                },
3123            },
3124        );
3125
3126        assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3127        assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3128
3129        let _ = reduce(
3130            &mut state,
3131            Action::McpServerStateChanged {
3132                session_id,
3133                server_name: "alpha".to_string(),
3134                state: McpServerState::Disconnected { error: None },
3135            },
3136        );
3137
3138        assert!(
3139            !state
3140                .tools
3141                .iter()
3142                .any(|t| t.name.starts_with("mcp__alpha__"))
3143        );
3144        assert!(state.tools.iter().any(|t| t.name == "bash"));
3145    }
3146
3147    #[test]
3148    fn test_tool_result_continues_agent_loop() {
3149        let mut state = test_state();
3150        let session_id = state.session_id;
3151        let op_id = OpId::new();
3152        let tool_call_id = ToolCallId::from_string("tc_1");
3153
3154        state.current_operation = Some(OperationState {
3155            op_id,
3156            kind: OperationKind::AgentLoop,
3157            pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3158        });
3159        state
3160            .operation_models
3161            .insert(op_id, builtin::claude_sonnet_4_5());
3162
3163        let effects = reduce(
3164            &mut state,
3165            Action::ToolResult {
3166                session_id,
3167                tool_call_id,
3168                tool_name: "bash".to_string(),
3169                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3170                    tool_name: "bash".to_string(),
3171                    payload: "file1.txt\nfile2.txt".to_string(),
3172                })),
3173            },
3174        );
3175
3176        assert!(
3177            effects
3178                .iter()
3179                .any(|e| matches!(e, Effect::CallModel { .. }))
3180        );
3181    }
3182
3183    #[test]
3184    fn test_tool_result_waits_for_pending_tools() {
3185        let mut state = test_state();
3186        let session_id = state.session_id;
3187        let op_id = OpId::new();
3188        let tool_call_id_1 = ToolCallId::from_string("tc_1");
3189        let tool_call_id_2 = ToolCallId::from_string("tc_2");
3190
3191        state.current_operation = Some(OperationState {
3192            op_id,
3193            kind: OperationKind::AgentLoop,
3194            pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3195                .into_iter()
3196                .collect(),
3197        });
3198        state
3199            .operation_models
3200            .insert(op_id, builtin::claude_sonnet_4_5());
3201
3202        let effects = reduce(
3203            &mut state,
3204            Action::ToolResult {
3205                session_id,
3206                tool_call_id: tool_call_id_1,
3207                tool_name: "bash".to_string(),
3208                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3209                    tool_name: "bash".to_string(),
3210                    payload: "done".to_string(),
3211                })),
3212            },
3213        );
3214
3215        assert!(
3216            !effects
3217                .iter()
3218                .any(|e| matches!(e, Effect::CallModel { .. }))
3219        );
3220
3221        let effects = reduce(
3222            &mut state,
3223            Action::ToolResult {
3224                session_id,
3225                tool_call_id: tool_call_id_2,
3226                tool_name: "bash".to_string(),
3227                result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3228                    tool_name: "bash".to_string(),
3229                    payload: "done".to_string(),
3230                })),
3231            },
3232        );
3233
3234        assert!(
3235            effects
3236                .iter()
3237                .any(|e| matches!(e, Effect::CallModel { .. }))
3238        );
3239    }
3240}