1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3
4use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
5
6use crate::app::domain::effect::{Effect, McpServerConfig};
7use crate::app::domain::event::{
8 CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
9};
10use crate::app::domain::state::{
11 AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
12};
13use crate::primary_agents::{
14 default_primary_agent_id, primary_agent_spec, resolve_effective_config,
15};
16use crate::session::state::{BackendConfig, ToolDecision};
17
18use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
19use serde_json::Value;
20use steer_tools::ToolError;
21use steer_tools::result::ToolResult;
22use steer_tools::tools::BASH_TOOL_NAME;
23use thiserror::Error;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum InvalidActionKind {
27 OperationInFlight,
28 MissingSessionConfig,
29 UnknownPrimaryAgent,
30 QueueEmpty,
31}
32
33#[derive(Debug, Error)]
34pub enum ReduceError {
35 #[error("{message}")]
36 InvalidAction {
37 message: String,
38 kind: InvalidActionKind,
39 },
40 #[error("Invariant violated: {message}")]
41 Invariant { message: String },
42}
43
44fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
45 ReduceError::InvalidAction {
46 message: message.into(),
47 kind,
48 }
49}
50
51pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
52 match action {
53 Action::UserInput {
54 session_id,
55 content,
56 op_id,
57 message_id,
58 model,
59 timestamp,
60 } => Ok(handle_user_input(
61 state, session_id, content, op_id, message_id, model, timestamp,
62 )),
63
64 Action::UserEditedMessage {
65 session_id,
66 message_id,
67 new_content,
68 op_id,
69 new_message_id,
70 model,
71 timestamp,
72 } => handle_user_edited_message(
73 state,
74 session_id,
75 UserEditedMessageParams {
76 original_message_id: message_id,
77 new_content,
78 op_id,
79 new_message_id,
80 model,
81 timestamp,
82 },
83 ),
84
85 Action::ToolApprovalRequested {
86 session_id,
87 request_id,
88 tool_call,
89 } => Ok(handle_tool_approval_requested(
90 state, session_id, request_id, tool_call,
91 )),
92
93 Action::ToolApprovalDecided {
94 session_id,
95 request_id,
96 decision,
97 remember,
98 } => Ok(handle_tool_approval_decided(
99 state, session_id, request_id, decision, remember,
100 )),
101
102 Action::ToolExecutionStarted {
103 session_id,
104 tool_call_id,
105 tool_name,
106 tool_parameters,
107 } => Ok(handle_tool_execution_started(
108 state,
109 session_id,
110 tool_call_id,
111 tool_name,
112 tool_parameters,
113 )),
114
115 Action::ToolResult {
116 session_id,
117 tool_call_id,
118 tool_name,
119 result,
120 } => Ok(handle_tool_result(
121 state,
122 session_id,
123 tool_call_id,
124 tool_name,
125 result,
126 )),
127
128 Action::ModelResponseComplete {
129 session_id,
130 op_id,
131 message_id,
132 content,
133 timestamp,
134 } => Ok(handle_model_response_complete(
135 state, session_id, op_id, message_id, content, timestamp,
136 )),
137
138 Action::ModelResponseError {
139 session_id,
140 op_id,
141 error,
142 } => Ok(handle_model_response_error(
143 state, session_id, op_id, &error,
144 )),
145
146 Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
147
148 Action::DirectBashCommand {
149 session_id,
150 op_id,
151 message_id,
152 command,
153 timestamp,
154 } => Ok(handle_direct_bash(
155 state, session_id, op_id, message_id, command, timestamp,
156 )),
157
158 Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
159
160 Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
161
162 Action::RequestCompaction {
163 session_id,
164 op_id,
165 model,
166 } => handle_request_compaction(state, session_id, op_id, model),
167
168 Action::Hydrate {
169 session_id,
170 events,
171 starting_sequence,
172 } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
173
174 Action::WorkspaceFilesListed { files, .. } => {
175 state.workspace_files = files;
176 Ok(vec![])
177 }
178
179 Action::ToolSchemasAvailable { tools, .. } => {
180 state.tools = tools;
181 Ok(vec![])
182 }
183
184 Action::ToolSchemasUpdated { schemas, .. } => {
185 state.tools = schemas;
186 Ok(vec![])
187 }
188
189 Action::SwitchPrimaryAgent {
190 session_id,
191 agent_id,
192 } => handle_switch_primary_agent(state, session_id, agent_id),
193
194 Action::McpServerStateChanged {
195 session_id,
196 server_name,
197 state: new_state,
198 } => {
199 if let McpServerState::Connected { tools } = &new_state {
201 let tools = state.session_config.as_ref().map_or_else(
202 || tools.clone(),
203 |config| config.filter_tools_by_visibility(tools.clone()),
204 );
205
206 for tool in tools {
208 if !state.tools.iter().any(|t| t.name == tool.name) {
209 state.tools.push(tool.clone());
210 }
211 }
212 }
213
214 if matches!(
216 &new_state,
217 McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
218 ) {
219 let prefix = format!("mcp__{server_name}__");
220 state.tools.retain(|t| !t.name.starts_with(&prefix));
221 }
222
223 state
224 .mcp_servers
225 .insert(server_name.clone(), new_state.clone());
226 Ok(vec![Effect::EmitEvent {
227 session_id,
228 event: SessionEvent::McpServerStateChanged {
229 server_name,
230 state: new_state,
231 },
232 }])
233 }
234
235 Action::CompactionComplete {
236 session_id,
237 op_id,
238 compaction_id,
239 summary_message_id,
240 summary,
241 compacted_head_message_id,
242 previous_active_message_id,
243 model,
244 timestamp,
245 } => Ok(handle_compaction_complete(
246 state,
247 session_id,
248 CompactionCompleteParams {
249 op_id,
250 compaction_id,
251 summary_message_id,
252 summary,
253 compacted_head_message_id,
254 previous_active_message_id,
255 model_name: model,
256 timestamp,
257 },
258 )),
259
260 Action::CompactionFailed {
261 session_id,
262 op_id,
263 error,
264 } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
265
266 Action::Shutdown => Ok(vec![]),
267 }
268}
269
270fn handle_user_input(
271 state: &mut AppState,
272 session_id: crate::app::domain::types::SessionId,
273 content: Vec<UserContent>,
274 op_id: crate::app::domain::types::OpId,
275 message_id: crate::app::domain::types::MessageId,
276 model: crate::config::model::ModelId,
277 timestamp: u64,
278) -> Vec<Effect> {
279 let mut effects = Vec::new();
280
281 if state.has_active_operation() {
282 state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
283 content,
284 op_id,
285 message_id,
286 model,
287 queued_at: timestamp,
288 });
289 effects.push(Effect::EmitEvent {
290 session_id,
291 event: SessionEvent::QueueUpdated {
292 queue: snapshot_queue(state),
293 },
294 });
295 return effects;
296 }
297
298 let parent_id = state.message_graph.active_message_id.clone();
299
300 let message = Message {
301 data: MessageData::User { content },
302 timestamp,
303 id: message_id.0.clone(),
304 parent_message_id: parent_id,
305 };
306
307 state.message_graph.add_message(message.clone());
308 state.message_graph.active_message_id = Some(message_id.0.clone());
309
310 state.start_operation(op_id, OperationKind::AgentLoop);
311 state.operation_models.insert(op_id, model.clone());
312
313 effects.push(Effect::EmitEvent {
314 session_id,
315 event: SessionEvent::UserMessageAdded {
316 message: message.clone(),
317 },
318 });
319
320 effects.push(Effect::EmitEvent {
321 session_id,
322 event: SessionEvent::OperationStarted {
323 op_id,
324 kind: OperationKind::AgentLoop,
325 },
326 });
327
328 effects.push(Effect::CallModel {
329 session_id,
330 op_id,
331 model,
332 messages: state
333 .message_graph
334 .get_thread_messages()
335 .into_iter()
336 .cloned()
337 .collect(),
338 system_context: state.cached_system_context.clone(),
339 tools: state.tools.clone(),
340 });
341
342 effects
343}
344
345struct UserEditedMessageParams {
346 original_message_id: crate::app::domain::types::MessageId,
347 new_content: Vec<UserContent>,
348 op_id: crate::app::domain::types::OpId,
349 new_message_id: crate::app::domain::types::MessageId,
350 model: crate::config::model::ModelId,
351 timestamp: u64,
352}
353
354fn handle_user_edited_message(
355 state: &mut AppState,
356 session_id: crate::app::domain::types::SessionId,
357 params: UserEditedMessageParams,
358) -> Result<Vec<Effect>, ReduceError> {
359 let UserEditedMessageParams {
360 original_message_id,
361 new_content,
362 op_id,
363 new_message_id,
364 model,
365 timestamp,
366 } = params;
367 let mut effects = Vec::new();
368
369 if state.has_active_operation() {
370 return Err(invalid_action(
371 InvalidActionKind::OperationInFlight,
372 "Cannot edit message while an operation is active.",
373 ));
374 }
375
376 let parent_id = state
377 .message_graph
378 .messages
379 .iter()
380 .find(|m| m.id() == original_message_id.0)
381 .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
382
383 let message = Message {
384 data: MessageData::User {
385 content: new_content,
386 },
387 timestamp,
388 id: new_message_id.0.clone(),
389 parent_message_id: parent_id,
390 };
391
392 state.message_graph.add_message(message.clone());
393 state.message_graph.active_message_id = Some(new_message_id.0.clone());
394
395 state.start_operation(op_id, OperationKind::AgentLoop);
396 state.operation_models.insert(op_id, model.clone());
397
398 effects.push(Effect::EmitEvent {
399 session_id,
400 event: SessionEvent::UserMessageAdded {
401 message: message.clone(),
402 },
403 });
404
405 effects.push(Effect::EmitEvent {
406 session_id,
407 event: SessionEvent::OperationStarted {
408 op_id,
409 kind: OperationKind::AgentLoop,
410 },
411 });
412
413 effects.push(Effect::CallModel {
414 session_id,
415 op_id,
416 model,
417 messages: state
418 .message_graph
419 .get_thread_messages()
420 .into_iter()
421 .cloned()
422 .collect(),
423 system_context: state.cached_system_context.clone(),
424 tools: state.tools.clone(),
425 });
426
427 Ok(effects)
428}
429
430fn handle_tool_approval_requested(
431 state: &mut AppState,
432 session_id: crate::app::domain::types::SessionId,
433 request_id: crate::app::domain::types::RequestId,
434 tool_call: steer_tools::ToolCall,
435) -> Vec<Effect> {
436 let mut effects = Vec::new();
437
438 if let Err(error) = validate_tool_call(state, &tool_call) {
439 let error_message = error.to_string();
440 return fail_tool_call_without_execution(
441 state,
442 session_id,
443 tool_call,
444 error,
445 error_message,
446 "invalid",
447 true,
448 );
449 }
450
451 let decision = get_tool_decision(state, &tool_call);
452
453 match decision {
454 ToolDecision::Allow => {
455 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
456 return vec![Effect::EmitEvent {
457 session_id,
458 event: SessionEvent::Error {
459 message: "Tool approval requested without active operation".to_string(),
460 },
461 }];
462 };
463 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
464 &tool_call.id,
465 ));
466
467 effects.push(Effect::ExecuteTool {
468 session_id,
469 op_id,
470 tool_call,
471 });
472 }
473 ToolDecision::Deny => {
474 let error = ToolError::DeniedByPolicy(tool_call.name.clone());
475 let tool_name = tool_call.name.clone();
476 effects.extend(fail_tool_call_without_execution(
477 state,
478 session_id,
479 tool_call,
480 error,
481 format!("Tool '{tool_name}' denied by policy"),
482 "denied",
483 true,
484 ));
485 }
486 ToolDecision::Ask => {
487 if state.pending_approval.is_some() {
488 state.approval_queue.push_back(QueuedApproval { tool_call });
489 return effects;
490 }
491
492 state.pending_approval = Some(PendingApproval {
493 request_id,
494 tool_call: tool_call.clone(),
495 });
496
497 effects.push(Effect::EmitEvent {
498 session_id,
499 event: SessionEvent::ApprovalRequested {
500 request_id,
501 tool_call: tool_call.clone(),
502 },
503 });
504
505 effects.push(Effect::RequestUserApproval {
506 session_id,
507 request_id,
508 tool_call,
509 });
510 }
511 }
512
513 effects
514}
515
516fn validate_tool_call(
517 state: &AppState,
518 tool_call: &steer_tools::ToolCall,
519) -> Result<(), ToolError> {
520 if tool_call.name.trim().is_empty() {
521 return Err(ToolError::invalid_params(
522 "unknown",
523 "Malformed tool call: missing tool name",
524 ));
525 }
526
527 if tool_call.id.trim().is_empty() {
528 return Err(ToolError::invalid_params(
529 tool_call.name.clone(),
530 "Malformed tool call: missing tool call id",
531 ));
532 }
533
534 if state.tools.is_empty() {
535 return Ok(());
536 }
537
538 let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
539 return Ok(());
540 };
541
542 validate_against_json_schema(
543 &tool_call.name,
544 schema.input_schema.as_value(),
545 &tool_call.parameters,
546 )
547}
548
549fn validate_against_json_schema(
550 tool_name: &str,
551 schema: &Value,
552 params: &Value,
553) -> Result<(), ToolError> {
554 let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
555 ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
556 })?;
557
558 if let Err(errors) = validator.validate(params) {
559 let message = errors
560 .into_iter()
561 .map(|error| error.to_string())
562 .next()
563 .unwrap_or_else(|| "Parameters do not match schema".to_string());
564 return Err(ToolError::invalid_params(tool_name.to_string(), message));
565 }
566
567 Ok(())
568}
569
570fn emit_tool_failure_message(
571 state: &mut AppState,
572 session_id: crate::app::domain::types::SessionId,
573 tool_call_id: &str,
574 tool_name: &str,
575 tool_error: ToolError,
576 event_error: String,
577 message_id_prefix: &str,
578) -> Vec<Effect> {
579 let mut effects = Vec::new();
580
581 let tool_result = ToolResult::Error(tool_error);
582 let parent_id = state.message_graph.active_message_id.clone();
583 let tool_message = Message {
584 data: MessageData::Tool {
585 tool_use_id: tool_call_id.to_string(),
586 result: tool_result,
587 },
588 timestamp: 0,
589 id: format!("{message_id_prefix}_{tool_call_id}"),
590 parent_message_id: parent_id,
591 };
592 state.message_graph.add_message(tool_message.clone());
593
594 if let Some(model) = state
595 .current_operation
596 .as_ref()
597 .and_then(|op| state.operation_models.get(&op.op_id).cloned())
598 {
599 effects.push(Effect::EmitEvent {
600 session_id,
601 event: SessionEvent::ToolCallFailed {
602 id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
603 name: tool_name.to_string(),
604 error: event_error,
605 model,
606 },
607 });
608 }
609
610 effects.push(Effect::EmitEvent {
611 session_id,
612 event: SessionEvent::ToolMessageAdded {
613 message: tool_message,
614 },
615 });
616
617 effects
618}
619
620fn fail_tool_call_without_execution(
621 state: &mut AppState,
622 session_id: crate::app::domain::types::SessionId,
623 tool_call: steer_tools::ToolCall,
624 tool_error: ToolError,
625 event_error: String,
626 message_id_prefix: &str,
627 call_model_if_ready: bool,
628) -> Vec<Effect> {
629 let mut effects = emit_tool_failure_message(
630 state,
631 session_id,
632 &tool_call.id,
633 &tool_call.name,
634 tool_error,
635 event_error,
636 message_id_prefix,
637 );
638
639 if !call_model_if_ready {
640 return effects;
641 }
642
643 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
644 effects.push(Effect::EmitEvent {
645 session_id,
646 event: SessionEvent::Error {
647 message: "Tool failure recorded without active operation".to_string(),
648 },
649 });
650 return effects;
651 };
652 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
653 model
654 } else {
655 effects.push(Effect::EmitEvent {
656 session_id,
657 event: SessionEvent::Error {
658 message: format!("Missing model for operation {op_id}"),
659 },
660 });
661 return effects;
662 };
663
664 let all_tools_complete = state
665 .current_operation
666 .as_ref()
667 .is_none_or(|op| op.pending_tool_calls.is_empty());
668 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
669
670 if all_tools_complete && no_pending_approvals {
671 effects.push(Effect::CallModel {
672 session_id,
673 op_id,
674 model,
675 messages: state
676 .message_graph
677 .get_thread_messages()
678 .into_iter()
679 .cloned()
680 .collect(),
681 system_context: state.cached_system_context.clone(),
682 tools: state.tools.clone(),
683 });
684 }
685
686 effects
687}
688
689fn handle_tool_approval_decided(
690 state: &mut AppState,
691 session_id: crate::app::domain::types::SessionId,
692 request_id: crate::app::domain::types::RequestId,
693 decision: ApprovalDecision,
694 remember: Option<ApprovalMemory>,
695) -> Vec<Effect> {
696 let mut effects = Vec::new();
697
698 let pending = match state.pending_approval.take() {
699 Some(p) if p.request_id == request_id => p,
700 other => {
701 state.pending_approval = other;
702 return effects;
703 }
704 };
705
706 let resolved_memory = if decision == ApprovalDecision::Approved {
707 match remember {
708 Some(ApprovalMemory::PendingTool) => {
709 Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
710 }
711 Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
712 Some(ApprovalMemory::BashPattern(pattern)) => {
713 Some(ApprovalMemory::BashPattern(pattern))
714 }
715 None => None,
716 }
717 } else {
718 None
719 };
720
721 effects.push(Effect::EmitEvent {
722 session_id,
723 event: SessionEvent::ApprovalDecided {
724 request_id,
725 decision,
726 remember: resolved_memory.clone(),
727 },
728 });
729
730 if decision == ApprovalDecision::Approved {
731 if let Some(ref memory) = resolved_memory {
732 match memory {
733 ApprovalMemory::Tool(name) => {
734 state.approved_tools.insert(name.clone());
735 }
736 ApprovalMemory::BashPattern(pattern) => {
737 state.approved_bash_patterns.insert(pattern.clone());
738 }
739 ApprovalMemory::PendingTool => {}
740 }
741 }
742
743 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
744 effects.push(Effect::EmitEvent {
745 session_id,
746 event: SessionEvent::Error {
747 message: "Tool approval decided without active operation".to_string(),
748 },
749 });
750 return effects;
751 };
752 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
753 &pending.tool_call.id,
754 ));
755
756 effects.push(Effect::ExecuteTool {
757 session_id,
758 op_id,
759 tool_call: pending.tool_call,
760 });
761 } else {
762 let tool_name = pending.tool_call.name.clone();
763 let error = ToolError::DeniedByUser(tool_name.clone());
764 effects.extend(fail_tool_call_without_execution(
765 state,
766 session_id,
767 pending.tool_call,
768 error,
769 format!("Tool '{tool_name}' denied by user"),
770 "denied",
771 false,
772 ));
773 }
774
775 effects.extend(process_next_queued_approval(state, session_id));
776
777 effects
778}
779
780fn process_next_queued_approval(
781 state: &mut AppState,
782 session_id: crate::app::domain::types::SessionId,
783) -> Vec<Effect> {
784 let mut effects = Vec::new();
785
786 while let Some(queued) = state.approval_queue.pop_front() {
787 let decision = get_tool_decision(state, &queued.tool_call);
788
789 match decision {
790 ToolDecision::Allow => {
791 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
792 effects.push(Effect::EmitEvent {
793 session_id,
794 event: SessionEvent::Error {
795 message: "Queued tool approval processed without active operation"
796 .to_string(),
797 },
798 });
799 state.approval_queue.push_front(queued);
800 break;
801 };
802 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
803 &queued.tool_call.id,
804 ));
805
806 effects.push(Effect::ExecuteTool {
807 session_id,
808 op_id,
809 tool_call: queued.tool_call,
810 });
811 }
812 ToolDecision::Deny => {
813 let tool_name = queued.tool_call.name.clone();
814 let error = ToolError::DeniedByPolicy(tool_name.clone());
815 effects.extend(fail_tool_call_without_execution(
816 state,
817 session_id,
818 queued.tool_call,
819 error,
820 format!("Tool '{tool_name}' denied by policy"),
821 "denied",
822 false,
823 ));
824 }
825 ToolDecision::Ask => {
826 let request_id = crate::app::domain::types::RequestId::new();
827 state.pending_approval = Some(PendingApproval {
828 request_id,
829 tool_call: queued.tool_call.clone(),
830 });
831
832 effects.push(Effect::EmitEvent {
833 session_id,
834 event: SessionEvent::ApprovalRequested {
835 request_id,
836 tool_call: queued.tool_call.clone(),
837 },
838 });
839
840 effects.push(Effect::RequestUserApproval {
841 session_id,
842 request_id,
843 tool_call: queued.tool_call,
844 });
845
846 break;
847 }
848 }
849 }
850
851 let all_tools_complete = state
852 .current_operation
853 .as_ref()
854 .is_none_or(|op| op.pending_tool_calls.is_empty());
855 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
856
857 if all_tools_complete
858 && no_pending_approvals
859 && let Some(op) = &state.current_operation
860 {
861 let op_id = op.op_id;
862 if let Some(model) = state.operation_models.get(&op_id).cloned() {
863 effects.push(Effect::CallModel {
864 session_id,
865 op_id,
866 model,
867 messages: state
868 .message_graph
869 .get_thread_messages()
870 .into_iter()
871 .cloned()
872 .collect(),
873 system_context: state.cached_system_context.clone(),
874 tools: state.tools.clone(),
875 });
876 }
877 }
878
879 effects
880}
881
882fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
883 if state.approved_tools.contains(&tool_call.name) {
884 return ToolDecision::Allow;
885 }
886
887 if tool_call.name == DISPATCH_AGENT_TOOL_NAME
888 && let Ok(params) =
889 serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
890 && let Some(config) = state.session_config.as_ref()
891 {
892 let policy = &config.tool_config.approval_policy;
893 match params.target {
894 DispatchAgentTarget::Resume { .. } => {
895 return ToolDecision::Allow;
896 }
897 DispatchAgentTarget::New { agent, .. } => {
898 let agent_id = agent
899 .as_deref()
900 .filter(|value| !value.trim().is_empty())
901 .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
902 if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
903 return ToolDecision::Allow;
904 }
905 }
906 }
907 }
908
909 if tool_call.name == BASH_TOOL_NAME
910 && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
911 tool_call.parameters.clone(),
912 )
913 && state.is_bash_pattern_approved(¶ms.command)
914 {
915 return ToolDecision::Allow;
916 }
917
918 state
919 .session_config
920 .as_ref()
921 .map_or(ToolDecision::Ask, |config| {
922 config
923 .tool_config
924 .approval_policy
925 .tool_decision(&tool_call.name)
926 })
927}
928
929fn handle_tool_execution_started(
930 state: &mut AppState,
931 session_id: crate::app::domain::types::SessionId,
932 tool_call_id: crate::app::domain::types::ToolCallId,
933 tool_name: String,
934 tool_parameters: serde_json::Value,
935) -> Vec<Effect> {
936 state.add_pending_tool_call(tool_call_id.clone());
937
938 let op_id = match state.current_operation.as_ref() {
939 Some(op) => op.op_id,
940 None => {
941 return vec![Effect::EmitEvent {
942 session_id,
943 event: SessionEvent::Error {
944 message: "Tool call started without active operation".to_string(),
945 },
946 }];
947 }
948 };
949
950 let is_direct_bash = matches!(
951 state.current_operation.as_ref().map(|op| &op.kind),
952 Some(OperationKind::DirectBash { .. })
953 );
954
955 if is_direct_bash {
956 return vec![];
957 }
958
959 let model = match state.operation_models.get(&op_id).cloned() {
960 Some(model) => model,
961 None => {
962 return vec![Effect::EmitEvent {
963 session_id,
964 event: SessionEvent::Error {
965 message: format!("Missing model for tool call on operation {op_id}"),
966 },
967 }];
968 }
969 };
970
971 vec![Effect::EmitEvent {
972 session_id,
973 event: SessionEvent::ToolCallStarted {
974 id: tool_call_id,
975 name: tool_name,
976 parameters: tool_parameters,
977 model,
978 },
979 }]
980}
981
982fn handle_tool_result(
983 state: &mut AppState,
984 session_id: crate::app::domain::types::SessionId,
985 tool_call_id: crate::app::domain::types::ToolCallId,
986 tool_name: String,
987 result: Result<ToolResult, ToolError>,
988) -> Vec<Effect> {
989 let mut effects = Vec::new();
990
991 let op = match &state.current_operation {
992 Some(op) => {
993 if state.cancelled_ops.contains(&op.op_id) {
994 tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
995 return effects;
996 }
997 op.clone()
998 }
999 None => return effects,
1000 };
1001 let op_id = op.op_id;
1002
1003 state.remove_pending_tool_call(&tool_call_id);
1004
1005 let tool_result = match result {
1006 Ok(r) => r,
1007 Err(e) => ToolResult::Error(e),
1008 };
1009
1010 let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1011
1012 if is_direct_bash {
1013 let command = match &op.kind {
1014 OperationKind::DirectBash { command } => command.clone(),
1015 _ => tool_name,
1016 };
1017
1018 let (stdout, stderr, exit_code) = match &tool_result {
1019 ToolResult::Bash(result) => (
1020 result.stdout.clone(),
1021 result.stderr.clone(),
1022 result.exit_code,
1023 ),
1024 ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1025 other => (format!("{other:?}"), String::new(), 0),
1026 };
1027
1028 let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1029 state
1030 .message_graph
1031 .update_command_execution(
1032 id.as_str(),
1033 command.clone(),
1034 stdout.clone(),
1035 stderr.clone(),
1036 exit_code,
1037 )
1038 .or_else(|| {
1039 let parent_id = state.message_graph.active_message_id.clone();
1040 let timestamp = Message::current_timestamp();
1041 Some(Message {
1042 data: MessageData::User {
1043 content: vec![UserContent::CommandExecution {
1044 command: command.clone(),
1045 stdout: stdout.clone(),
1046 stderr: stderr.clone(),
1047 exit_code,
1048 }],
1049 },
1050 timestamp,
1051 id: id.to_string(),
1052 parent_message_id: parent_id,
1053 })
1054 })
1055 });
1056
1057 state.complete_operation(op_id);
1058
1059 if let Some(message) = updated {
1060 effects.push(Effect::EmitEvent {
1061 session_id,
1062 event: SessionEvent::MessageUpdated { message },
1063 });
1064 }
1065
1066 effects.push(Effect::EmitEvent {
1067 session_id,
1068 event: SessionEvent::OperationCompleted { op_id },
1069 });
1070
1071 effects.extend(maybe_start_queued_work(state, session_id));
1072
1073 return effects;
1074 }
1075
1076 let model = match state.operation_models.get(&op_id).cloned() {
1077 Some(model) => model,
1078 None => {
1079 return vec![Effect::EmitEvent {
1080 session_id,
1081 event: SessionEvent::Error {
1082 message: format!("Missing model for tool result on operation {op_id}"),
1083 },
1084 }];
1085 }
1086 };
1087
1088 let event = match &tool_result {
1089 ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1090 id: tool_call_id.clone(),
1091 name: tool_name.clone(),
1092 error: e.to_string(),
1093 model: model.clone(),
1094 },
1095 _ => SessionEvent::ToolCallCompleted {
1096 id: tool_call_id.clone(),
1097 name: tool_name,
1098 result: tool_result.clone(),
1099 model: model.clone(),
1100 },
1101 };
1102
1103 effects.push(Effect::EmitEvent { session_id, event });
1104
1105 let parent_id = state.message_graph.active_message_id.clone();
1106 let tool_message = Message {
1107 data: MessageData::Tool {
1108 tool_use_id: tool_call_id.0.clone(),
1109 result: tool_result,
1110 },
1111 timestamp: 0,
1112 id: format!("tool_result_{}", tool_call_id.0),
1113 parent_message_id: parent_id,
1114 };
1115 state.message_graph.add_message(tool_message.clone());
1116
1117 effects.push(Effect::EmitEvent {
1118 session_id,
1119 event: SessionEvent::ToolMessageAdded {
1120 message: tool_message,
1121 },
1122 });
1123
1124 let all_tools_complete = state
1125 .current_operation
1126 .as_ref()
1127 .is_none_or(|op| op.pending_tool_calls.is_empty());
1128 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1129
1130 if all_tools_complete && no_pending_approvals {
1131 effects.push(Effect::CallModel {
1132 session_id,
1133 op_id,
1134 model,
1135 messages: state
1136 .message_graph
1137 .get_thread_messages()
1138 .into_iter()
1139 .cloned()
1140 .collect(),
1141 system_context: state.cached_system_context.clone(),
1142 tools: state.tools.clone(),
1143 });
1144 }
1145
1146 effects
1147}
1148
1149fn handle_model_response_complete(
1150 state: &mut AppState,
1151 session_id: crate::app::domain::types::SessionId,
1152 op_id: crate::app::domain::types::OpId,
1153 message_id: crate::app::domain::types::MessageId,
1154 content: Vec<AssistantContent>,
1155 timestamp: u64,
1156) -> Vec<Effect> {
1157 let mut effects = Vec::new();
1158
1159 if state.cancelled_ops.contains(&op_id) {
1160 tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1161 return effects;
1162 }
1163
1164 let tool_calls: Vec<_> = content
1165 .iter()
1166 .filter_map(|c| {
1167 if let AssistantContent::ToolCall { tool_call, .. } = c {
1168 Some(tool_call.clone())
1169 } else {
1170 None
1171 }
1172 })
1173 .collect();
1174
1175 let parent_id = state.message_graph.active_message_id.clone();
1176
1177 let message = Message {
1178 data: MessageData::Assistant {
1179 content: content.clone(),
1180 },
1181 timestamp,
1182 id: message_id.0.clone(),
1183 parent_message_id: parent_id,
1184 };
1185
1186 state.message_graph.add_message(message.clone());
1187 state.message_graph.active_message_id = Some(message_id.0.clone());
1188
1189 let model = match state.operation_models.get(&op_id).cloned() {
1190 Some(model) => model,
1191 None => {
1192 return vec![Effect::EmitEvent {
1193 session_id,
1194 event: SessionEvent::Error {
1195 message: format!("Missing model for operation {op_id}"),
1196 },
1197 }];
1198 }
1199 };
1200
1201 effects.push(Effect::EmitEvent {
1202 session_id,
1203 event: SessionEvent::AssistantMessageAdded { message, model },
1204 });
1205
1206 if tool_calls.is_empty() {
1207 state.complete_operation(op_id);
1208 effects.push(Effect::EmitEvent {
1209 session_id,
1210 event: SessionEvent::OperationCompleted { op_id },
1211 });
1212 effects.extend(maybe_start_queued_work(state, session_id));
1213 } else {
1214 for tool_call in tool_calls {
1215 let request_id = crate::app::domain::types::RequestId::new();
1216 effects.extend(handle_tool_approval_requested(
1217 state, session_id, request_id, tool_call,
1218 ));
1219 }
1220 }
1221
1222 effects
1223}
1224
1225fn handle_model_response_error(
1226 state: &mut AppState,
1227 session_id: crate::app::domain::types::SessionId,
1228 op_id: crate::app::domain::types::OpId,
1229 error: &str,
1230) -> Vec<Effect> {
1231 let mut effects = Vec::new();
1232
1233 if state.cancelled_ops.contains(&op_id) {
1234 return effects;
1235 }
1236
1237 state.complete_operation(op_id);
1238
1239 effects.push(Effect::EmitEvent {
1240 session_id,
1241 event: SessionEvent::Error {
1242 message: error.to_string(),
1243 },
1244 });
1245
1246 effects.push(Effect::EmitEvent {
1247 session_id,
1248 event: SessionEvent::OperationCompleted { op_id },
1249 });
1250
1251 effects.extend(maybe_start_queued_work(state, session_id));
1252
1253 effects
1254}
1255
1256fn handle_direct_bash(
1257 state: &mut AppState,
1258 session_id: crate::app::domain::types::SessionId,
1259 op_id: crate::app::domain::types::OpId,
1260 message_id: crate::app::domain::types::MessageId,
1261 command: String,
1262 timestamp: u64,
1263) -> Vec<Effect> {
1264 let mut effects = Vec::new();
1265
1266 if state.has_active_operation() {
1267 state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1268 command,
1269 op_id,
1270 message_id,
1271 queued_at: timestamp,
1272 });
1273 effects.push(Effect::EmitEvent {
1274 session_id,
1275 event: SessionEvent::QueueUpdated {
1276 queue: snapshot_queue(state),
1277 },
1278 });
1279 return effects;
1280 }
1281
1282 let parent_id = state.message_graph.active_message_id.clone();
1283 let message = Message {
1284 data: MessageData::User {
1285 content: vec![UserContent::CommandExecution {
1286 command: command.clone(),
1287 stdout: String::new(),
1288 stderr: String::new(),
1289 exit_code: 0,
1290 }],
1291 },
1292 timestamp,
1293 id: message_id.0.clone(),
1294 parent_message_id: parent_id,
1295 };
1296
1297 state.message_graph.add_message(message.clone());
1298 state.message_graph.active_message_id = Some(message_id.0.clone());
1299
1300 state.start_operation(
1301 op_id,
1302 OperationKind::DirectBash {
1303 command: command.clone(),
1304 },
1305 );
1306 state.operation_messages.insert(op_id, message_id);
1307
1308 effects.push(Effect::EmitEvent {
1309 session_id,
1310 event: SessionEvent::UserMessageAdded { message },
1311 });
1312
1313 effects.push(Effect::EmitEvent {
1314 session_id,
1315 event: SessionEvent::OperationStarted {
1316 op_id,
1317 kind: OperationKind::DirectBash {
1318 command: command.clone(),
1319 },
1320 },
1321 });
1322
1323 let tool_call = steer_tools::ToolCall {
1324 id: format!("direct_bash_{op_id}"),
1325 name: BASH_TOOL_NAME.to_string(),
1326 parameters: serde_json::json!({ "command": command }),
1327 };
1328
1329 effects.push(Effect::ExecuteTool {
1330 session_id,
1331 op_id,
1332 tool_call,
1333 });
1334
1335 effects
1336}
1337
1338fn handle_dequeue_queued_item(
1339 state: &mut AppState,
1340 session_id: crate::app::domain::types::SessionId,
1341) -> Result<Vec<Effect>, ReduceError> {
1342 if state.pop_next_queued_work().is_some() {
1343 Ok(vec![Effect::EmitEvent {
1344 session_id,
1345 event: SessionEvent::QueueUpdated {
1346 queue: snapshot_queue(state),
1347 },
1348 }])
1349 } else {
1350 Err(invalid_action(
1351 InvalidActionKind::QueueEmpty,
1352 "No queued item to remove.",
1353 ))
1354 }
1355}
1356
1357fn maybe_start_queued_work(
1358 state: &mut AppState,
1359 session_id: crate::app::domain::types::SessionId,
1360) -> Vec<Effect> {
1361 if state.has_active_operation() {
1362 return vec![];
1363 }
1364
1365 let Some(next) = state.pop_next_queued_work() else {
1366 return vec![];
1367 };
1368
1369 let mut effects = vec![Effect::EmitEvent {
1370 session_id,
1371 event: SessionEvent::QueueUpdated {
1372 queue: snapshot_queue(state),
1373 },
1374 }];
1375
1376 match next {
1377 QueuedWorkItem::UserMessage(item) => {
1378 effects.extend(handle_user_input(
1379 state,
1380 session_id,
1381 item.content,
1382 item.op_id,
1383 item.message_id,
1384 item.model,
1385 item.queued_at,
1386 ));
1387 }
1388 QueuedWorkItem::DirectBash(item) => {
1389 effects.extend(handle_direct_bash(
1390 state,
1391 session_id,
1392 item.op_id,
1393 item.message_id,
1394 item.command,
1395 item.queued_at,
1396 ));
1397 }
1398 }
1399
1400 effects
1401}
1402
1403fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1404 state
1405 .queued_work
1406 .iter()
1407 .map(snapshot_queued_work_item)
1408 .collect()
1409}
1410
1411fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1412 match item {
1413 QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1414 kind: Some(QueuedWorkKind::UserMessage),
1415 content: message
1416 .content
1417 .iter()
1418 .filter_map(|item| match item {
1419 UserContent::Text { text } => Some(text.as_str()),
1420 _ => None,
1421 })
1422 .collect::<String>(),
1423 queued_at: message.queued_at,
1424 model: Some(message.model.clone()),
1425 op_id: message.op_id,
1426 message_id: message.message_id.clone(),
1427 attachment_count: message
1428 .content
1429 .iter()
1430 .filter(|item| matches!(item, UserContent::Image { .. }))
1431 .count() as u32,
1432 },
1433 QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1434 kind: Some(QueuedWorkKind::DirectBash),
1435 content: command.command.clone(),
1436 queued_at: command.queued_at,
1437 model: None,
1438 op_id: command.op_id,
1439 message_id: command.message_id.clone(),
1440 attachment_count: 0,
1441 },
1442 }
1443}
1444
1445fn handle_request_compaction(
1446 state: &mut AppState,
1447 session_id: crate::app::domain::types::SessionId,
1448 op_id: crate::app::domain::types::OpId,
1449 model: crate::config::model::ModelId,
1450) -> Result<Vec<Effect>, ReduceError> {
1451 const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1452 let message_count = state.message_graph.get_thread_messages().len();
1453
1454 if state.has_active_operation() {
1455 return Err(invalid_action(
1456 InvalidActionKind::OperationInFlight,
1457 "Cannot compact while an operation is active.",
1458 ));
1459 }
1460
1461 if message_count < MIN_MESSAGES_FOR_COMPACT {
1462 return Ok(vec![Effect::EmitEvent {
1463 session_id,
1464 event: SessionEvent::CompactResult {
1465 result: crate::app::domain::event::CompactResult::InsufficientMessages,
1466 },
1467 }]);
1468 }
1469
1470 state.start_operation(op_id, OperationKind::Compact);
1471 state.operation_models.insert(op_id, model.clone());
1472
1473 Ok(vec![
1474 Effect::EmitEvent {
1475 session_id,
1476 event: SessionEvent::OperationStarted {
1477 op_id,
1478 kind: OperationKind::Compact,
1479 },
1480 },
1481 Effect::RequestCompaction {
1482 session_id,
1483 op_id,
1484 model,
1485 },
1486 ])
1487}
1488
1489fn handle_cancel(
1490 state: &mut AppState,
1491 session_id: crate::app::domain::types::SessionId,
1492 target_op: Option<crate::app::domain::types::OpId>,
1493) -> Vec<Effect> {
1494 let mut effects = Vec::new();
1495
1496 let op = match &state.current_operation {
1497 Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1498 _ => return effects,
1499 };
1500
1501 state.record_cancelled_op(op.op_id);
1502
1503 if matches!(op.kind, OperationKind::Compact) {
1504 effects.push(Effect::EmitEvent {
1505 session_id,
1506 event: SessionEvent::CompactResult {
1507 result: crate::app::domain::event::CompactResult::Cancelled,
1508 },
1509 });
1510 }
1511
1512 let pending_approval = state.pending_approval.take();
1513 let queued_approvals = std::mem::take(&mut state.approval_queue);
1514
1515 if matches!(op.kind, OperationKind::AgentLoop) {
1516 if let Some(pending) = pending_approval {
1517 let tool_name = pending.tool_call.name.clone();
1518 effects.extend(emit_tool_failure_message(
1519 state,
1520 session_id,
1521 &pending.tool_call.id,
1522 &tool_name,
1523 ToolError::Cancelled(tool_name.clone()),
1524 format!("Tool '{tool_name}' cancelled"),
1525 "cancelled",
1526 ));
1527 }
1528
1529 for queued in queued_approvals {
1530 let tool_name = queued.tool_call.name.clone();
1531 effects.extend(emit_tool_failure_message(
1532 state,
1533 session_id,
1534 &queued.tool_call.id,
1535 &tool_name,
1536 ToolError::Cancelled(tool_name.clone()),
1537 format!("Tool '{tool_name}' cancelled"),
1538 "cancelled",
1539 ));
1540 }
1541
1542 for tool_call_id in &op.pending_tool_calls {
1543 let tool_name = state
1544 .message_graph
1545 .find_tool_name_by_id(tool_call_id.as_str())
1546 .unwrap_or_else(|| tool_call_id.as_str().to_string());
1547 let event_error = if tool_name == tool_call_id.as_str() {
1548 format!("Tool call '{tool_call_id}' cancelled")
1549 } else {
1550 format!("Tool '{tool_name}' cancelled")
1551 };
1552 effects.extend(emit_tool_failure_message(
1553 state,
1554 session_id,
1555 tool_call_id.as_str(),
1556 &tool_name,
1557 ToolError::Cancelled(tool_name.clone()),
1558 event_error,
1559 "cancelled",
1560 ));
1561 }
1562 }
1563 state.active_streams.remove(&op.op_id);
1564
1565 let dequeued_item = state.pop_next_queued_work();
1566 let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1567
1568 effects.push(Effect::EmitEvent {
1569 session_id,
1570 event: SessionEvent::OperationCancelled {
1571 op_id: op.op_id,
1572 info: CancellationInfo {
1573 pending_tool_calls: op.pending_tool_calls.len(),
1574 popped_queued_item,
1575 },
1576 },
1577 });
1578
1579 effects.push(Effect::CancelOperation {
1580 session_id,
1581 op_id: op.op_id,
1582 });
1583
1584 state.complete_operation(op.op_id);
1585
1586 if dequeued_item.is_some() {
1587 effects.push(Effect::EmitEvent {
1588 session_id,
1589 event: SessionEvent::QueueUpdated {
1590 queue: snapshot_queue(state),
1591 },
1592 });
1593 }
1594
1595 effects
1596}
1597
1598fn handle_hydrate(
1599 state: &mut AppState,
1600 session_id: crate::app::domain::types::SessionId,
1601 events: Vec<SessionEvent>,
1602 starting_sequence: u64,
1603) -> Vec<Effect> {
1604 for event in events {
1605 apply_event_to_state(state, &event);
1606 }
1607
1608 state.event_sequence = starting_sequence;
1609
1610 emit_mcp_connect_effects(state, session_id)
1611}
1612
1613fn handle_switch_primary_agent(
1614 state: &mut AppState,
1615 session_id: crate::app::domain::types::SessionId,
1616 agent_id: String,
1617) -> Result<Vec<Effect>, ReduceError> {
1618 if state.current_operation.is_some() {
1619 return Err(invalid_action(
1620 InvalidActionKind::OperationInFlight,
1621 "Cannot switch primary agent while an operation is active.",
1622 ));
1623 }
1624
1625 let Some(base_config) = state
1626 .base_session_config
1627 .as_ref()
1628 .or(state.session_config.as_ref())
1629 else {
1630 return Err(invalid_action(
1631 InvalidActionKind::MissingSessionConfig,
1632 "Cannot switch primary agent without session config.",
1633 ));
1634 };
1635
1636 let Some(_spec) = primary_agent_spec(&agent_id) else {
1637 return Err(invalid_action(
1638 InvalidActionKind::UnknownPrimaryAgent,
1639 format!("Unknown primary agent '{agent_id}'."),
1640 ));
1641 };
1642
1643 let mut updated_config = base_config.clone();
1644 updated_config.primary_agent_id = Some(agent_id.clone());
1645 let new_config = resolve_effective_config(&updated_config);
1646 let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1647
1648 apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1649
1650 let mut effects = Vec::new();
1651 effects.push(Effect::EmitEvent {
1652 session_id,
1653 event: SessionEvent::SessionConfigUpdated {
1654 config: Box::new(new_config),
1655 primary_agent_id: agent_id,
1656 },
1657 });
1658 effects.extend(backend_effects);
1659 effects.push(Effect::ReloadToolSchemas { session_id });
1660
1661 Ok(effects)
1662}
1663
1664fn apply_session_config_state(
1665 state: &mut AppState,
1666 config: &crate::session::state::SessionConfig,
1667 primary_agent_id: Option<String>,
1668 update_base: bool,
1669) {
1670 state.apply_session_config(config, primary_agent_id, update_base);
1671}
1672
1673fn mcp_backend_diff_effects(
1674 session_id: crate::app::domain::types::SessionId,
1675 old_config: &crate::session::state::SessionConfig,
1676 new_config: &crate::session::state::SessionConfig,
1677) -> Vec<Effect> {
1678 let old_map = collect_mcp_backends(old_config);
1679 let new_map = collect_mcp_backends(new_config);
1680
1681 let mut effects = Vec::new();
1682
1683 for (server_name, (old_transport, old_filter)) in &old_map {
1684 match new_map.get(server_name) {
1685 None => {
1686 effects.push(Effect::DisconnectMcpServer {
1687 session_id,
1688 server_name: server_name.clone(),
1689 });
1690 }
1691 Some((new_transport, new_filter)) => {
1692 if new_transport != old_transport || new_filter != old_filter {
1693 effects.push(Effect::DisconnectMcpServer {
1694 session_id,
1695 server_name: server_name.clone(),
1696 });
1697 effects.push(Effect::ConnectMcpServer {
1698 session_id,
1699 config: McpServerConfig {
1700 server_name: server_name.clone(),
1701 transport: new_transport.clone(),
1702 tool_filter: new_filter.clone(),
1703 },
1704 });
1705 }
1706 }
1707 }
1708 }
1709
1710 for (server_name, (new_transport, new_filter)) in &new_map {
1711 if !old_map.contains_key(server_name) {
1712 effects.push(Effect::ConnectMcpServer {
1713 session_id,
1714 config: McpServerConfig {
1715 server_name: server_name.clone(),
1716 transport: new_transport.clone(),
1717 tool_filter: new_filter.clone(),
1718 },
1719 });
1720 }
1721 }
1722
1723 effects
1724}
1725
1726fn collect_mcp_backends(
1727 config: &crate::session::state::SessionConfig,
1728) -> std::collections::HashMap<
1729 String,
1730 (
1731 crate::tools::McpTransport,
1732 crate::session::state::ToolFilter,
1733 ),
1734> {
1735 let mut map = std::collections::HashMap::new();
1736
1737 for backend_config in &config.tool_config.backends {
1738 let BackendConfig::Mcp {
1739 server_name,
1740 transport,
1741 tool_filter,
1742 } = backend_config;
1743
1744 map.insert(
1745 server_name.clone(),
1746 (transport.clone(), tool_filter.clone()),
1747 );
1748 }
1749
1750 map
1751}
1752
1753pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1754 match event {
1755 SessionEvent::SessionCreated { config, .. } => {
1756 let primary_agent_id = config
1757 .primary_agent_id
1758 .clone()
1759 .unwrap_or_else(|| default_primary_agent_id().to_string());
1760 apply_session_config_state(state, config, Some(primary_agent_id), true);
1761 }
1762 SessionEvent::SessionConfigUpdated {
1763 config,
1764 primary_agent_id,
1765 } => {
1766 apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1767 }
1768 SessionEvent::AssistantMessageAdded { message, .. }
1769 | SessionEvent::UserMessageAdded { message }
1770 | SessionEvent::ToolMessageAdded { message } => {
1771 state.message_graph.add_message(message.clone());
1772 state.message_graph.active_message_id = Some(message.id().to_string());
1773 }
1774 SessionEvent::MessageUpdated { message } => {
1775 state.message_graph.replace_message(message.clone());
1776 }
1777 SessionEvent::ApprovalDecided {
1778 decision, remember, ..
1779 } => {
1780 if *decision == ApprovalDecision::Approved
1781 && let Some(memory) = remember
1782 {
1783 match memory {
1784 ApprovalMemory::Tool(name) => {
1785 state.approved_tools.insert(name.clone());
1786 }
1787 ApprovalMemory::BashPattern(pattern) => {
1788 state.approved_bash_patterns.insert(pattern.clone());
1789 }
1790 ApprovalMemory::PendingTool => {}
1791 }
1792 }
1793 state.pending_approval = None;
1794 }
1795 SessionEvent::OperationCompleted { op_id } => {
1796 state.complete_operation(*op_id);
1797 }
1798 SessionEvent::OperationCancelled { op_id, .. } => {
1799 state.record_cancelled_op(*op_id);
1800 state.complete_operation(*op_id);
1801 }
1802 SessionEvent::McpServerStateChanged {
1803 server_name,
1804 state: mcp_state,
1805 } => {
1806 state
1807 .mcp_servers
1808 .insert(server_name.clone(), mcp_state.clone());
1809 }
1810 SessionEvent::QueueUpdated { queue } => {
1811 let parse_content = |content: &str| {
1812 if content.trim().is_empty() {
1813 None
1814 } else {
1815 Some(vec![UserContent::Text {
1816 text: content.to_string(),
1817 }])
1818 }
1819 };
1820
1821 state.queued_work = queue
1822 .iter()
1823 .filter_map(|item| match item.kind {
1824 Some(QueuedWorkKind::UserMessage) => {
1825 let content = parse_content(item.content.as_str())?;
1826 Some(QueuedWorkItem::UserMessage(
1827 crate::app::domain::state::QueuedUserMessage {
1828 content,
1829 op_id: item.op_id,
1830 message_id: item.message_id.clone(),
1831 model: item.model.clone().unwrap_or_else(
1832 crate::config::model::builtin::claude_sonnet_4_5,
1833 ),
1834 queued_at: item.queued_at,
1835 },
1836 ))
1837 }
1838 Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1839 crate::app::domain::state::QueuedBashCommand {
1840 command: item.content.clone(),
1841 op_id: item.op_id,
1842 message_id: item.message_id.clone(),
1843 queued_at: item.queued_at,
1844 },
1845 )),
1846 None => {
1847 let content = parse_content(item.content.as_str())?;
1848 Some(QueuedWorkItem::UserMessage(
1849 crate::app::domain::state::QueuedUserMessage {
1850 content,
1851 op_id: item.op_id,
1852 message_id: item.message_id.clone(),
1853 model: item.model.clone().unwrap_or_else(
1854 crate::config::model::builtin::claude_sonnet_4_5,
1855 ),
1856 queued_at: item.queued_at,
1857 },
1858 ))
1859 }
1860 })
1861 .collect();
1862 }
1863 _ => {}
1864 }
1865
1866 state.event_sequence += 1;
1867}
1868
1869struct CompactionCompleteParams {
1870 op_id: crate::app::domain::types::OpId,
1871 compaction_id: crate::app::domain::types::CompactionId,
1872 summary_message_id: crate::app::domain::types::MessageId,
1873 summary: String,
1874 compacted_head_message_id: crate::app::domain::types::MessageId,
1875 previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1876 model_name: String,
1877 timestamp: u64,
1878}
1879
1880fn handle_compaction_complete(
1881 state: &mut AppState,
1882 session_id: crate::app::domain::types::SessionId,
1883 params: CompactionCompleteParams,
1884) -> Vec<Effect> {
1885 use crate::app::conversation::{AssistantContent, Message, MessageData};
1886 use crate::app::domain::types::CompactionRecord;
1887
1888 let CompactionCompleteParams {
1889 op_id,
1890 compaction_id,
1891 summary_message_id,
1892 summary,
1893 compacted_head_message_id,
1894 previous_active_message_id,
1895 model_name,
1896 timestamp,
1897 } = params;
1898
1899 let summary_message = Message {
1900 data: MessageData::Assistant {
1901 content: vec![AssistantContent::Text {
1902 text: summary.clone(),
1903 }],
1904 },
1905 id: summary_message_id.to_string(),
1906 parent_message_id: None,
1907 timestamp,
1908 };
1909
1910 state.message_graph.add_message(summary_message.clone());
1911
1912 let record = CompactionRecord::with_timestamp(
1913 compaction_id,
1914 summary_message_id,
1915 compacted_head_message_id,
1916 previous_active_message_id,
1917 model_name,
1918 timestamp,
1919 );
1920
1921 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1922 model
1923 } else {
1924 state.complete_operation(op_id);
1925 return vec![Effect::EmitEvent {
1926 session_id,
1927 event: SessionEvent::Error {
1928 message: format!("Missing model for compaction operation {op_id}"),
1929 },
1930 }];
1931 };
1932
1933 state.complete_operation(op_id);
1934
1935 let mut effects = vec![
1936 Effect::EmitEvent {
1937 session_id,
1938 event: SessionEvent::AssistantMessageAdded {
1939 message: summary_message,
1940 model,
1941 },
1942 },
1943 Effect::EmitEvent {
1944 session_id,
1945 event: SessionEvent::CompactResult {
1946 result: crate::app::domain::event::CompactResult::Success(summary),
1947 },
1948 },
1949 Effect::EmitEvent {
1950 session_id,
1951 event: SessionEvent::ConversationCompacted { record },
1952 },
1953 Effect::EmitEvent {
1954 session_id,
1955 event: SessionEvent::OperationCompleted { op_id },
1956 },
1957 ];
1958
1959 effects.extend(maybe_start_queued_work(state, session_id));
1960
1961 effects
1962}
1963
1964fn handle_compaction_failed(
1965 state: &mut AppState,
1966 session_id: crate::app::domain::types::SessionId,
1967 op_id: crate::app::domain::types::OpId,
1968 error: String,
1969) -> Vec<Effect> {
1970 state.complete_operation(op_id);
1971
1972 let mut effects = vec![
1973 Effect::EmitEvent {
1974 session_id,
1975 event: SessionEvent::Error { message: error },
1976 },
1977 Effect::EmitEvent {
1978 session_id,
1979 event: SessionEvent::OperationCompleted { op_id },
1980 },
1981 ];
1982
1983 effects.extend(maybe_start_queued_work(state, session_id));
1984
1985 effects
1986}
1987
1988fn emit_mcp_connect_effects(
1989 state: &AppState,
1990 session_id: crate::app::domain::types::SessionId,
1991) -> Vec<Effect> {
1992 let mut effects = Vec::new();
1993
1994 let Some(ref config) = state.session_config else {
1995 return effects;
1996 };
1997
1998 for backend_config in &config.tool_config.backends {
1999 let BackendConfig::Mcp {
2000 server_name,
2001 transport,
2002 tool_filter,
2003 } = backend_config;
2004
2005 let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2006 matches!(
2007 s,
2008 McpServerState::Connecting | McpServerState::Connected { .. }
2009 )
2010 });
2011
2012 if !already_connected {
2013 effects.push(Effect::ConnectMcpServer {
2014 session_id,
2015 config: McpServerConfig {
2016 server_name: server_name.clone(),
2017 transport: transport.clone(),
2018 tool_filter: tool_filter.clone(),
2019 },
2020 });
2021 }
2022 }
2023
2024 effects
2025}
2026
2027#[cfg(test)]
2028mod tests {
2029 use super::*;
2030 use crate::app::domain::state::{OperationState, PendingApproval};
2031 use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2032 use crate::config::model::builtin;
2033 use crate::primary_agents::resolve_effective_config;
2034 use crate::session::state::{
2035 ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2036 ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2037 };
2038 use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2039 use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2040 use schemars::schema_for;
2041 use serde_json::json;
2042 use std::collections::HashSet;
2043 use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2044
2045 fn test_state() -> AppState {
2046 AppState::new(SessionId::new())
2047 }
2048
2049 fn test_schema(name: &str) -> ToolSchema {
2050 ToolSchema {
2051 name: name.to_string(),
2052 display_name: name.to_string(),
2053 description: String::new(),
2054 input_schema: InputSchema::empty_object(),
2055 }
2056 }
2057
2058 fn base_session_config() -> SessionConfig {
2059 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2060 config.primary_agent_id = Some("normal".to_string());
2061 config.policy_overrides = SessionPolicyOverrides::empty();
2062 resolve_effective_config(&config)
2063 }
2064
2065 fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2066 super::reduce(state, action).expect("reduce failed")
2067 }
2068
2069 #[test]
2070 fn test_user_input_starts_operation() {
2071 let mut state = test_state();
2072 let session_id = state.session_id;
2073 let op_id = OpId::new();
2074 let message_id = MessageId::new();
2075 let model = builtin::claude_sonnet_4_5();
2076
2077 let effects = reduce(
2078 &mut state,
2079 Action::UserInput {
2080 session_id,
2081 content: vec![UserContent::Text {
2082 text: "Hello".to_string(),
2083 }],
2084 op_id,
2085 message_id,
2086 model,
2087 timestamp: 1_234_567_890,
2088 },
2089 );
2090
2091 assert_eq!(state.message_graph.messages.len(), 1);
2092 assert!(state.current_operation.is_some());
2093 assert!(
2094 effects
2095 .iter()
2096 .any(|e| matches!(e, Effect::CallModel { .. }))
2097 );
2098 }
2099
2100 #[test]
2101 fn test_switch_primary_agent_updates_visibility() {
2102 let mut state = test_state();
2103 let session_id = state.session_id;
2104 let config = base_session_config();
2105 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2106
2107 let effects = reduce(
2108 &mut state,
2109 Action::SwitchPrimaryAgent {
2110 session_id,
2111 agent_id: "plan".to_string(),
2112 },
2113 );
2114
2115 let updated = state.session_config.as_ref().expect("config");
2116 match &updated.tool_config.visibility {
2117 ToolVisibility::Whitelist(allowed) => {
2118 assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2119 for name in READ_ONLY_TOOL_NAMES {
2120 assert!(allowed.contains(*name));
2121 }
2122 assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2123 }
2124 other => panic!("Unexpected tool visibility: {other:?}"),
2125 }
2126 assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2127 assert!(effects.iter().any(|e| matches!(
2128 e,
2129 Effect::EmitEvent {
2130 event: SessionEvent::SessionConfigUpdated { .. },
2131 ..
2132 }
2133 )));
2134 assert!(
2135 effects
2136 .iter()
2137 .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2138 );
2139 }
2140
2141 #[test]
2142 fn test_switch_primary_agent_yolo_auto_approves() {
2143 let mut state = test_state();
2144 let session_id = state.session_id;
2145 let config = base_session_config();
2146 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2147
2148 let _ = reduce(
2149 &mut state,
2150 Action::SwitchPrimaryAgent {
2151 session_id,
2152 agent_id: "yolo".to_string(),
2153 },
2154 );
2155
2156 let updated = state.session_config.as_ref().expect("config");
2157 assert_eq!(
2158 updated.tool_config.approval_policy.default_behavior,
2159 UnapprovedBehavior::Allow
2160 );
2161 }
2162
2163 #[test]
2164 fn test_switch_primary_agent_preserves_policy_overrides() {
2165 let mut state = test_state();
2166 let session_id = state.session_id;
2167
2168 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2169 config.primary_agent_id = Some("normal".to_string());
2170 config.policy_overrides = SessionPolicyOverrides {
2171 default_model: None,
2172 tool_visibility: None,
2173 approval_policy: ToolApprovalPolicyOverrides {
2174 default_behavior: Some(UnapprovedBehavior::Deny),
2175 preapproved: ApprovalRulesOverrides::empty(),
2176 },
2177 };
2178 let config = resolve_effective_config(&config);
2179 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2180
2181 let _ = reduce(
2182 &mut state,
2183 Action::SwitchPrimaryAgent {
2184 session_id,
2185 agent_id: "yolo".to_string(),
2186 },
2187 );
2188
2189 let updated = state.session_config.as_ref().expect("config");
2190 assert_eq!(
2191 updated.tool_config.approval_policy.default_behavior,
2192 UnapprovedBehavior::Deny
2193 );
2194 assert_eq!(
2195 updated.policy_overrides.approval_policy.default_behavior,
2196 Some(UnapprovedBehavior::Deny)
2197 );
2198 }
2199
2200 #[test]
2201 fn dispatch_agent_resume_is_auto_approved() {
2202 let mut state = test_state();
2203 let session_id = state.session_id;
2204 let config = base_session_config();
2205 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2206
2207 let tool_call = ToolCall {
2208 id: "tc_dispatch_resume".to_string(),
2209 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2210 parameters: json!({
2211 "prompt": "resume work",
2212 "target": {
2213 "session": "resume",
2214 "session_id": SessionId::new().to_string()
2215 }
2216 }),
2217 };
2218
2219 let decision = get_tool_decision(&state, &tool_call);
2220 assert_eq!(decision, ToolDecision::Allow);
2221 assert_eq!(state.session_id, session_id);
2222 }
2223
2224 #[test]
2225 fn test_switch_primary_agent_restores_base_prompt() {
2226 let mut state = test_state();
2227 let session_id = state.session_id;
2228 let mut config = base_session_config();
2229 config.system_prompt = Some("base prompt".to_string());
2230 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2231
2232 let _ = reduce(
2233 &mut state,
2234 Action::SwitchPrimaryAgent {
2235 session_id,
2236 agent_id: "plan".to_string(),
2237 },
2238 );
2239
2240 let _ = reduce(
2241 &mut state,
2242 Action::SwitchPrimaryAgent {
2243 session_id,
2244 agent_id: "normal".to_string(),
2245 },
2246 );
2247
2248 let updated = state.session_config.as_ref().expect("config");
2249 assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2250 }
2251
2252 #[test]
2253 fn test_switch_primary_agent_blocked_during_operation() {
2254 let mut state = test_state();
2255 let session_id = state.session_id;
2256 let config = base_session_config();
2257 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2258
2259 state.current_operation = Some(OperationState {
2260 op_id: OpId::new(),
2261 kind: OperationKind::AgentLoop,
2262 pending_tool_calls: HashSet::new(),
2263 });
2264
2265 let result = super::reduce(
2266 &mut state,
2267 Action::SwitchPrimaryAgent {
2268 session_id,
2269 agent_id: "plan".to_string(),
2270 },
2271 );
2272
2273 assert!(matches!(
2274 result,
2275 Err(ReduceError::InvalidAction {
2276 kind: InvalidActionKind::OperationInFlight,
2277 ..
2278 })
2279 ));
2280 assert!(state.primary_agent_id.as_deref() == Some("normal"));
2281 }
2282
2283 #[test]
2284 fn test_late_result_ignored_after_cancel() {
2285 let mut state = test_state();
2286 let session_id = state.session_id;
2287 let op_id = OpId::new();
2288 let tool_call_id = ToolCallId::from_string("tc_1");
2289
2290 state.current_operation = Some(OperationState {
2291 op_id,
2292 kind: OperationKind::AgentLoop,
2293 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2294 });
2295
2296 let _ = reduce(
2297 &mut state,
2298 Action::Cancel {
2299 session_id,
2300 op_id: None,
2301 },
2302 );
2303
2304 state.current_operation = Some(OperationState {
2305 op_id,
2306 kind: OperationKind::AgentLoop,
2307 pending_tool_calls: HashSet::new(),
2308 });
2309 state
2310 .operation_models
2311 .insert(op_id, builtin::claude_sonnet_4_5());
2312 state
2313 .operation_models
2314 .insert(op_id, builtin::claude_sonnet_4_5());
2315 state
2316 .operation_models
2317 .insert(op_id, builtin::claude_sonnet_4_5());
2318
2319 let effects = reduce(
2320 &mut state,
2321 Action::ToolResult {
2322 session_id,
2323 tool_call_id,
2324 tool_name: "test".to_string(),
2325 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2326 tool_name: "test".to_string(),
2327 payload: "done".to_string(),
2328 })),
2329 },
2330 );
2331
2332 assert!(effects.is_empty());
2333 }
2334
2335 #[test]
2336 fn test_pre_approved_tool_executes_immediately() {
2337 let mut state = test_state();
2338 let session_id = state.session_id;
2339 let op_id = OpId::new();
2340
2341 state.approved_tools.insert("test_tool".to_string());
2342 state.current_operation = Some(OperationState {
2343 op_id,
2344 kind: OperationKind::AgentLoop,
2345 pending_tool_calls: HashSet::new(),
2346 });
2347 state
2348 .operation_models
2349 .insert(op_id, builtin::claude_sonnet_4_5());
2350 state
2351 .operation_models
2352 .insert(op_id, builtin::claude_sonnet_4_5());
2353 state
2354 .operation_models
2355 .insert(op_id, builtin::claude_sonnet_4_5());
2356
2357 let tool_call = steer_tools::ToolCall {
2358 id: "tc_1".to_string(),
2359 name: "test_tool".to_string(),
2360 parameters: serde_json::json!({}),
2361 };
2362
2363 let effects = reduce(
2364 &mut state,
2365 Action::ToolApprovalRequested {
2366 session_id,
2367 request_id: RequestId::new(),
2368 tool_call,
2369 },
2370 );
2371
2372 assert!(
2373 effects
2374 .iter()
2375 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2376 );
2377 assert!(state.pending_approval.is_none());
2378 }
2379
2380 #[test]
2381 fn test_denied_tool_request_emits_failure_message() {
2382 let mut state = test_state();
2383 let session_id = state.session_id;
2384 let op_id = OpId::new();
2385
2386 state.current_operation = Some(OperationState {
2387 op_id,
2388 kind: OperationKind::AgentLoop,
2389 pending_tool_calls: HashSet::new(),
2390 });
2391
2392 state
2393 .operation_models
2394 .insert(op_id, builtin::claude_sonnet_4_5());
2395
2396 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2397 config.tool_config.approval_policy = ToolApprovalPolicy {
2398 default_behavior: UnapprovedBehavior::Deny,
2399 preapproved: ApprovalRules::default(),
2400 };
2401 state.session_config = Some(config);
2402
2403 let tool_call = steer_tools::ToolCall {
2404 id: "tc_1".to_string(),
2405 name: "test_tool".to_string(),
2406 parameters: serde_json::json!({}),
2407 };
2408
2409 let effects = reduce(
2410 &mut state,
2411 Action::ToolApprovalRequested {
2412 session_id,
2413 request_id: RequestId::new(),
2414 tool_call,
2415 },
2416 );
2417
2418 assert!(effects.iter().any(|e| matches!(
2419 e,
2420 Effect::EmitEvent {
2421 event: SessionEvent::ToolCallFailed { .. },
2422 ..
2423 }
2424 )));
2425 assert!(effects.iter().any(|e| matches!(
2426 e,
2427 Effect::EmitEvent {
2428 event: SessionEvent::ToolMessageAdded { .. },
2429 ..
2430 }
2431 )));
2432 assert!(
2433 !effects
2434 .iter()
2435 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2436 );
2437 assert!(
2438 !effects
2439 .iter()
2440 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2441 );
2442 assert!(state.pending_approval.is_none());
2443 assert!(state.approval_queue.is_empty());
2444 assert_eq!(state.message_graph.messages.len(), 1);
2445
2446 match &state.message_graph.messages[0].data {
2447 MessageData::Tool { result, .. } => match result {
2448 ToolResult::Error(error) => {
2449 assert!(
2450 matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2451 );
2452 }
2453 _ => panic!("expected denied tool error"),
2454 },
2455 _ => panic!("expected tool message"),
2456 }
2457 }
2458
2459 #[test]
2460 fn test_user_denied_tool_request_emits_failure_message() {
2461 let mut state = test_state();
2462 let session_id = state.session_id;
2463 let op_id = OpId::new();
2464
2465 state.current_operation = Some(OperationState {
2466 op_id,
2467 kind: OperationKind::AgentLoop,
2468 pending_tool_calls: HashSet::new(),
2469 });
2470 state
2471 .operation_models
2472 .insert(op_id, builtin::claude_sonnet_4_5());
2473
2474 let tool_call = steer_tools::ToolCall {
2475 id: "tc_1".to_string(),
2476 name: "test_tool".to_string(),
2477 parameters: serde_json::json!({}),
2478 };
2479 let request_id = RequestId::new();
2480 state.pending_approval = Some(PendingApproval {
2481 request_id,
2482 tool_call: tool_call.clone(),
2483 });
2484
2485 let effects = reduce(
2486 &mut state,
2487 Action::ToolApprovalDecided {
2488 session_id,
2489 request_id,
2490 decision: ApprovalDecision::Denied,
2491 remember: None,
2492 },
2493 );
2494
2495 assert!(effects.iter().any(|e| matches!(
2496 e,
2497 Effect::EmitEvent {
2498 event: SessionEvent::ToolCallFailed { .. },
2499 ..
2500 }
2501 )));
2502 assert!(effects.iter().any(|e| matches!(
2503 e,
2504 Effect::EmitEvent {
2505 event: SessionEvent::ToolMessageAdded { .. },
2506 ..
2507 }
2508 )));
2509 assert!(
2510 !effects
2511 .iter()
2512 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2513 );
2514 assert!(state.pending_approval.is_none());
2515 assert!(state.approval_queue.is_empty());
2516 assert_eq!(state.message_graph.messages.len(), 1);
2517
2518 match &state.message_graph.messages[0].data {
2519 MessageData::Tool { result, .. } => match result {
2520 ToolResult::Error(error) => {
2521 assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2522 }
2523 _ => panic!("expected denied tool error"),
2524 },
2525 _ => panic!("expected tool message"),
2526 }
2527 }
2528
2529 #[test]
2530 fn test_cancel_pops_queued_item_without_auto_start() {
2531 let mut state = test_state();
2532 let session_id = state.session_id;
2533 let op_id = OpId::new();
2534
2535 state.current_operation = Some(OperationState {
2536 op_id,
2537 kind: OperationKind::AgentLoop,
2538 pending_tool_calls: HashSet::new(),
2539 });
2540 state
2541 .operation_models
2542 .insert(op_id, builtin::claude_sonnet_4_5());
2543
2544 let queued_op = OpId::new();
2545 let queued_message_id = MessageId::from_string("queued_msg");
2546 let _ = reduce(
2547 &mut state,
2548 Action::UserInput {
2549 session_id,
2550 content: vec![UserContent::Text {
2551 text: "Queued message".to_string(),
2552 }],
2553 op_id: queued_op,
2554 message_id: queued_message_id.clone(),
2555 model: builtin::claude_sonnet_4_5(),
2556 timestamp: 1,
2557 },
2558 );
2559
2560 let effects = reduce(
2561 &mut state,
2562 Action::Cancel {
2563 session_id,
2564 op_id: None,
2565 },
2566 );
2567
2568 assert!(state.current_operation.is_none());
2569 assert!(state.queued_work.is_empty());
2570
2571 let cancellation_info = effects.iter().find_map(|effect| match effect {
2572 Effect::EmitEvent {
2573 event: SessionEvent::OperationCancelled { info, .. },
2574 ..
2575 } => Some(info),
2576 _ => None,
2577 });
2578 let info = cancellation_info.expect("expected OperationCancelled event");
2579 let popped = info
2580 .popped_queued_item
2581 .as_ref()
2582 .expect("expected popped queued item");
2583 assert_eq!(popped.content, "Queued message");
2584 assert_eq!(popped.op_id, queued_op);
2585 assert_eq!(popped.message_id, queued_message_id);
2586
2587 assert!(
2588 !effects.iter().any(|effect| matches!(
2589 effect,
2590 Effect::EmitEvent {
2591 event: SessionEvent::OperationStarted { .. },
2592 ..
2593 }
2594 )),
2595 "queued work should not auto-start on cancel"
2596 );
2597 }
2598
2599 #[test]
2600 fn test_cancel_injects_tool_results_for_pending_calls() {
2601 let mut state = test_state();
2602 let session_id = state.session_id;
2603 let op_id = OpId::new();
2604
2605 let tool_call = ToolCall {
2606 id: "tc_1".to_string(),
2607 name: "test_tool".to_string(),
2608 parameters: serde_json::json!({}),
2609 };
2610
2611 state.message_graph.add_message(Message {
2612 data: MessageData::Assistant {
2613 content: vec![AssistantContent::ToolCall {
2614 tool_call: tool_call.clone(),
2615 thought_signature: None,
2616 }],
2617 },
2618 timestamp: 0,
2619 id: "msg_1".to_string(),
2620 parent_message_id: None,
2621 });
2622
2623 state.current_operation = Some(OperationState {
2624 op_id,
2625 kind: OperationKind::AgentLoop,
2626 pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2627 });
2628 state
2629 .operation_models
2630 .insert(op_id, builtin::claude_sonnet_4_5());
2631
2632 let effects = reduce(
2633 &mut state,
2634 Action::Cancel {
2635 session_id,
2636 op_id: None,
2637 },
2638 );
2639
2640 assert!(effects.iter().any(|e| matches!(
2641 e,
2642 Effect::EmitEvent {
2643 event: SessionEvent::ToolMessageAdded { .. },
2644 ..
2645 }
2646 )));
2647
2648 let tool_message = state
2649 .message_graph
2650 .messages
2651 .iter()
2652 .find(|message| matches!(message.data, MessageData::Tool { .. }))
2653 .expect("tool result should be injected on cancel");
2654
2655 match &tool_message.data {
2656 MessageData::Tool { result, .. } => match result {
2657 ToolResult::Error(error) => {
2658 assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2659 }
2660 _ => panic!("expected cancelled tool error"),
2661 },
2662 _ => panic!("expected tool message"),
2663 }
2664 }
2665
2666 #[test]
2667 fn test_malformed_tool_call_auto_denies() {
2668 let mut state = test_state();
2669 let session_id = state.session_id;
2670 let op_id = OpId::new();
2671
2672 state.current_operation = Some(OperationState {
2673 op_id,
2674 kind: OperationKind::AgentLoop,
2675 pending_tool_calls: HashSet::new(),
2676 });
2677
2678 state
2679 .operation_models
2680 .insert(op_id, builtin::claude_sonnet_4_5());
2681
2682 let mut properties = serde_json::Map::new();
2683 properties.insert("command".to_string(), json!({ "type": "string" }));
2684
2685 state.tools.push(ToolSchema {
2686 name: "test_tool".to_string(),
2687 display_name: "test_tool".to_string(),
2688 description: String::new(),
2689 input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2690 });
2691
2692 let tool_call = ToolCall {
2693 id: "tc_1".to_string(),
2694 name: "test_tool".to_string(),
2695 parameters: json!({}),
2696 };
2697
2698 let effects = reduce(
2699 &mut state,
2700 Action::ToolApprovalRequested {
2701 session_id,
2702 request_id: RequestId::new(),
2703 tool_call,
2704 },
2705 );
2706
2707 assert!(effects.iter().any(|e| matches!(
2708 e,
2709 Effect::EmitEvent {
2710 event: SessionEvent::ToolCallFailed { .. },
2711 ..
2712 }
2713 )));
2714 assert!(effects.iter().any(|e| matches!(
2715 e,
2716 Effect::EmitEvent {
2717 event: SessionEvent::ToolMessageAdded { .. },
2718 ..
2719 }
2720 )));
2721 assert!(
2722 !effects
2723 .iter()
2724 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2725 );
2726 assert!(
2727 !effects
2728 .iter()
2729 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2730 );
2731 assert!(state.pending_approval.is_none());
2732 assert!(state.approval_queue.is_empty());
2733 assert_eq!(state.message_graph.messages.len(), 1);
2734
2735 match &state.message_graph.messages[0].data {
2736 MessageData::Tool { result, .. } => match result {
2737 ToolResult::Error(error) => {
2738 assert!(matches!(error, ToolError::InvalidParams { .. }));
2739 }
2740 _ => panic!("expected invalid params tool error"),
2741 },
2742 _ => panic!("expected tool message"),
2743 }
2744 }
2745
2746 #[test]
2747 fn test_approval_queuing() {
2748 let mut state = test_state();
2749 let session_id = state.session_id;
2750 let op_id = OpId::new();
2751
2752 state.current_operation = Some(OperationState {
2753 op_id,
2754 kind: OperationKind::AgentLoop,
2755 pending_tool_calls: HashSet::new(),
2756 });
2757
2758 let tool_call_1 = steer_tools::ToolCall {
2759 id: "tc_1".to_string(),
2760 name: "tool_1".to_string(),
2761 parameters: serde_json::json!({}),
2762 };
2763 let tool_call_2 = steer_tools::ToolCall {
2764 id: "tc_2".to_string(),
2765 name: "tool_2".to_string(),
2766 parameters: serde_json::json!({}),
2767 };
2768
2769 let _ = reduce(
2770 &mut state,
2771 Action::ToolApprovalRequested {
2772 session_id,
2773 request_id: RequestId::new(),
2774 tool_call: tool_call_1,
2775 },
2776 );
2777
2778 assert!(state.pending_approval.is_some());
2779
2780 let _ = reduce(
2781 &mut state,
2782 Action::ToolApprovalRequested {
2783 session_id,
2784 request_id: RequestId::new(),
2785 tool_call: tool_call_2,
2786 },
2787 );
2788
2789 assert_eq!(state.approval_queue.len(), 1);
2790 }
2791
2792 #[test]
2793 fn test_dispatch_agent_missing_target_auto_denies() {
2794 let mut state = test_state();
2795 let session_id = state.session_id;
2796 let op_id = OpId::new();
2797
2798 state.current_operation = Some(OperationState {
2799 op_id,
2800 kind: OperationKind::AgentLoop,
2801 pending_tool_calls: HashSet::new(),
2802 });
2803
2804 state
2805 .operation_models
2806 .insert(op_id, builtin::claude_sonnet_4_5());
2807
2808 let input_schema: InputSchema =
2809 schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2810 state.tools.push(ToolSchema {
2811 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2812 display_name: "Dispatch Agent".to_string(),
2813 description: String::new(),
2814 input_schema,
2815 });
2816
2817 let tool_call = ToolCall {
2818 id: "tc_dispatch".to_string(),
2819 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2820 parameters: json!({ "prompt": "hello world" }),
2821 };
2822
2823 let effects = reduce(
2824 &mut state,
2825 Action::ToolApprovalRequested {
2826 session_id,
2827 request_id: RequestId::new(),
2828 tool_call,
2829 },
2830 );
2831
2832 assert!(effects.iter().any(|e| matches!(
2833 e,
2834 Effect::EmitEvent {
2835 event: SessionEvent::ToolCallFailed { .. },
2836 ..
2837 }
2838 )));
2839 assert!(effects.iter().any(|e| matches!(
2840 e,
2841 Effect::EmitEvent {
2842 event: SessionEvent::ToolMessageAdded { .. },
2843 ..
2844 }
2845 )));
2846 assert!(
2847 !effects
2848 .iter()
2849 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2850 );
2851 assert!(state.pending_approval.is_none());
2852 assert!(state.approval_queue.is_empty());
2853
2854 match &state.message_graph.messages[0].data {
2855 MessageData::Tool { result, .. } => match result {
2856 ToolResult::Error(error) => {
2857 assert!(matches!(error, ToolError::InvalidParams { .. }));
2858 }
2859 _ => panic!("expected invalid params tool error"),
2860 },
2861 _ => panic!("expected tool message"),
2862 }
2863 }
2864
2865 #[test]
2866 fn test_model_response_with_tool_calls_requests_approval() {
2867 let mut state = test_state();
2868 let session_id = state.session_id;
2869 let op_id = OpId::new();
2870 let message_id = MessageId::new();
2871
2872 state.current_operation = Some(OperationState {
2873 op_id,
2874 kind: OperationKind::AgentLoop,
2875 pending_tool_calls: HashSet::new(),
2876 });
2877 state
2878 .operation_models
2879 .insert(op_id, builtin::claude_sonnet_4_5());
2880
2881 let tool_call = steer_tools::ToolCall {
2882 id: "tc_1".to_string(),
2883 name: "bash".to_string(),
2884 parameters: serde_json::json!({"command": "ls"}),
2885 };
2886
2887 let content = vec![
2888 AssistantContent::Text {
2889 text: "Let me list the files.".to_string(),
2890 },
2891 AssistantContent::ToolCall {
2892 tool_call: tool_call.clone(),
2893 thought_signature: None,
2894 },
2895 ];
2896
2897 let effects = reduce(
2898 &mut state,
2899 Action::ModelResponseComplete {
2900 session_id,
2901 op_id,
2902 message_id,
2903 content,
2904 timestamp: 12345,
2905 },
2906 );
2907
2908 assert!(state.pending_approval.is_some());
2909 assert!(
2910 effects
2911 .iter()
2912 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2913 );
2914 assert!(state.current_operation.is_some());
2915 }
2916
2917 #[test]
2918 fn test_model_response_no_tools_completes_operation() {
2919 let mut state = test_state();
2920 let session_id = state.session_id;
2921 let op_id = OpId::new();
2922 let message_id = MessageId::new();
2923
2924 state.current_operation = Some(OperationState {
2925 op_id,
2926 kind: OperationKind::AgentLoop,
2927 pending_tool_calls: HashSet::new(),
2928 });
2929 state
2930 .operation_models
2931 .insert(op_id, builtin::claude_sonnet_4_5());
2932
2933 let content = vec![AssistantContent::Text {
2934 text: "Hello! How can I help?".to_string(),
2935 }];
2936
2937 let effects = reduce(
2938 &mut state,
2939 Action::ModelResponseComplete {
2940 session_id,
2941 op_id,
2942 message_id,
2943 content,
2944 timestamp: 12345,
2945 },
2946 );
2947
2948 assert!(state.current_operation.is_none());
2949 assert!(effects.iter().any(|e| matches!(
2950 e,
2951 Effect::EmitEvent {
2952 event: SessionEvent::OperationCompleted { .. },
2953 ..
2954 }
2955 )));
2956 }
2957
2958 #[test]
2959 fn test_out_of_order_completion_preserves_newer_operation() {
2960 let mut state = test_state();
2961 let session_id = state.session_id;
2962 let model = builtin::claude_sonnet_4_5();
2963
2964 let op_a = OpId::new();
2965 let op_b = OpId::new();
2966
2967 let _ = reduce(
2968 &mut state,
2969 Action::UserInput {
2970 session_id,
2971 content: vec![UserContent::Text {
2972 text: "first".to_string(),
2973 }],
2974 op_id: op_a,
2975 message_id: MessageId::new(),
2976 model: model.clone(),
2977 timestamp: 1,
2978 },
2979 );
2980
2981 let _ = reduce(
2982 &mut state,
2983 Action::UserInput {
2984 session_id,
2985 content: vec![UserContent::Text {
2986 text: "second".to_string(),
2987 }],
2988 op_id: op_b,
2989 message_id: MessageId::new(),
2990 model: model.clone(),
2991 timestamp: 2,
2992 },
2993 );
2994
2995 let _ = reduce(
2996 &mut state,
2997 Action::ModelResponseComplete {
2998 session_id,
2999 op_id: op_a,
3000 message_id: MessageId::new(),
3001 content: vec![AssistantContent::Text {
3002 text: "done A".to_string(),
3003 }],
3004 timestamp: 3,
3005 },
3006 );
3007
3008 assert!(
3009 state
3010 .current_operation
3011 .as_ref()
3012 .is_some_and(|op| op.op_id == op_b)
3013 );
3014 assert!(state.operation_models.contains_key(&op_b));
3015 assert!(!state.operation_models.contains_key(&op_a));
3016
3017 let effects = reduce(
3018 &mut state,
3019 Action::ModelResponseComplete {
3020 session_id,
3021 op_id: op_b,
3022 message_id: MessageId::new(),
3023 content: vec![AssistantContent::Text {
3024 text: "done B".to_string(),
3025 }],
3026 timestamp: 4,
3027 },
3028 );
3029
3030 assert!(effects.iter().any(|e| matches!(
3031 e,
3032 Effect::EmitEvent {
3033 event: SessionEvent::OperationCompleted { op_id },
3034 ..
3035 } if *op_id == op_b
3036 )));
3037 assert!(!effects.iter().any(|e| matches!(
3038 e,
3039 Effect::EmitEvent {
3040 event: SessionEvent::Error { message },
3041 ..
3042 } if message.contains("Missing model for operation")
3043 )));
3044 }
3045
3046 #[test]
3047 fn test_tool_approval_does_not_call_model_before_result() {
3048 let mut state = test_state();
3049 let session_id = state.session_id;
3050 let op_id = OpId::new();
3051
3052 state.current_operation = Some(OperationState {
3053 op_id,
3054 kind: OperationKind::AgentLoop,
3055 pending_tool_calls: HashSet::new(),
3056 });
3057 state
3058 .operation_models
3059 .insert(op_id, builtin::claude_sonnet_4_5());
3060
3061 let tool_call = steer_tools::ToolCall {
3062 id: "tc_1".to_string(),
3063 name: "bash".to_string(),
3064 parameters: serde_json::json!({"command": "ls"}),
3065 };
3066 let request_id = RequestId::new();
3067 state.pending_approval = Some(PendingApproval {
3068 request_id,
3069 tool_call: tool_call.clone(),
3070 });
3071
3072 let effects = reduce(
3073 &mut state,
3074 Action::ToolApprovalDecided {
3075 session_id,
3076 request_id,
3077 decision: ApprovalDecision::Approved,
3078 remember: None,
3079 },
3080 );
3081
3082 assert!(
3083 effects
3084 .iter()
3085 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3086 );
3087 assert!(
3088 !effects
3089 .iter()
3090 .any(|e| matches!(e, Effect::CallModel { .. }))
3091 );
3092 assert!(state.current_operation.as_ref().is_some_and(|op| {
3093 op.pending_tool_calls
3094 .contains(&ToolCallId::from_string("tc_1"))
3095 }));
3096 }
3097
3098 #[test]
3099 fn test_mcp_tool_visibility_and_disconnect_removal() {
3100 let mut state = test_state();
3101 let session_id = state.session_id;
3102
3103 let mut allowed = HashSet::new();
3104 allowed.insert("mcp__alpha__allowed".to_string());
3105
3106 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3107 config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3108 state.session_config = Some(config);
3109
3110 state.tools.push(test_schema("bash"));
3111
3112 let _ = reduce(
3113 &mut state,
3114 Action::McpServerStateChanged {
3115 session_id,
3116 server_name: "alpha".to_string(),
3117 state: McpServerState::Connected {
3118 tools: vec![
3119 test_schema("mcp__alpha__allowed"),
3120 test_schema("mcp__alpha__blocked"),
3121 ],
3122 },
3123 },
3124 );
3125
3126 assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3127 assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3128
3129 let _ = reduce(
3130 &mut state,
3131 Action::McpServerStateChanged {
3132 session_id,
3133 server_name: "alpha".to_string(),
3134 state: McpServerState::Disconnected { error: None },
3135 },
3136 );
3137
3138 assert!(
3139 !state
3140 .tools
3141 .iter()
3142 .any(|t| t.name.starts_with("mcp__alpha__"))
3143 );
3144 assert!(state.tools.iter().any(|t| t.name == "bash"));
3145 }
3146
3147 #[test]
3148 fn test_tool_result_continues_agent_loop() {
3149 let mut state = test_state();
3150 let session_id = state.session_id;
3151 let op_id = OpId::new();
3152 let tool_call_id = ToolCallId::from_string("tc_1");
3153
3154 state.current_operation = Some(OperationState {
3155 op_id,
3156 kind: OperationKind::AgentLoop,
3157 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3158 });
3159 state
3160 .operation_models
3161 .insert(op_id, builtin::claude_sonnet_4_5());
3162
3163 let effects = reduce(
3164 &mut state,
3165 Action::ToolResult {
3166 session_id,
3167 tool_call_id,
3168 tool_name: "bash".to_string(),
3169 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3170 tool_name: "bash".to_string(),
3171 payload: "file1.txt\nfile2.txt".to_string(),
3172 })),
3173 },
3174 );
3175
3176 assert!(
3177 effects
3178 .iter()
3179 .any(|e| matches!(e, Effect::CallModel { .. }))
3180 );
3181 }
3182
3183 #[test]
3184 fn test_tool_result_waits_for_pending_tools() {
3185 let mut state = test_state();
3186 let session_id = state.session_id;
3187 let op_id = OpId::new();
3188 let tool_call_id_1 = ToolCallId::from_string("tc_1");
3189 let tool_call_id_2 = ToolCallId::from_string("tc_2");
3190
3191 state.current_operation = Some(OperationState {
3192 op_id,
3193 kind: OperationKind::AgentLoop,
3194 pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3195 .into_iter()
3196 .collect(),
3197 });
3198 state
3199 .operation_models
3200 .insert(op_id, builtin::claude_sonnet_4_5());
3201
3202 let effects = reduce(
3203 &mut state,
3204 Action::ToolResult {
3205 session_id,
3206 tool_call_id: tool_call_id_1,
3207 tool_name: "bash".to_string(),
3208 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3209 tool_name: "bash".to_string(),
3210 payload: "done".to_string(),
3211 })),
3212 },
3213 );
3214
3215 assert!(
3216 !effects
3217 .iter()
3218 .any(|e| matches!(e, Effect::CallModel { .. }))
3219 );
3220
3221 let effects = reduce(
3222 &mut state,
3223 Action::ToolResult {
3224 session_id,
3225 tool_call_id: tool_call_id_2,
3226 tool_name: "bash".to_string(),
3227 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3228 tool_name: "bash".to_string(),
3229 payload: "done".to_string(),
3230 })),
3231 },
3232 );
3233
3234 assert!(
3235 effects
3236 .iter()
3237 .any(|e| matches!(e, Effect::CallModel { .. }))
3238 );
3239 }
3240}