1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
4use crate::app::domain::effect::{Effect, McpServerConfig};
5use crate::app::domain::event::{
6 CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
7};
8use crate::app::domain::state::{
9 AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
10};
11use crate::app::domain::types::NonEmptyString;
12use crate::primary_agents::{
13 default_primary_agent_id, primary_agent_spec, resolve_effective_config,
14};
15use crate::session::state::{BackendConfig, ToolDecision};
16use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
17use serde_json::Value;
18use steer_tools::ToolError;
19use steer_tools::result::ToolResult;
20use steer_tools::tools::BASH_TOOL_NAME;
21use thiserror::Error;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum InvalidActionKind {
25 OperationInFlight,
26 MissingSessionConfig,
27 UnknownPrimaryAgent,
28 QueueEmpty,
29}
30
31#[derive(Debug, Error)]
32pub enum ReduceError {
33 #[error("{message}")]
34 InvalidAction {
35 message: String,
36 kind: InvalidActionKind,
37 },
38 #[error("Invariant violated: {message}")]
39 Invariant { message: String },
40}
41
42fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
43 ReduceError::InvalidAction {
44 message: message.into(),
45 kind,
46 }
47}
48
49pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
50 match action {
51 Action::UserInput {
52 session_id,
53 text,
54 op_id,
55 message_id,
56 model,
57 timestamp,
58 } => Ok(handle_user_input(
59 state, session_id, text, op_id, message_id, model, timestamp,
60 )),
61
62 Action::UserEditedMessage {
63 session_id,
64 message_id,
65 new_content,
66 op_id,
67 new_message_id,
68 model,
69 timestamp,
70 } => handle_user_edited_message(
71 state,
72 session_id,
73 UserEditedMessageParams {
74 original_message_id: message_id,
75 new_content,
76 op_id,
77 new_message_id,
78 model,
79 timestamp,
80 },
81 ),
82
83 Action::ToolApprovalRequested {
84 session_id,
85 request_id,
86 tool_call,
87 } => Ok(handle_tool_approval_requested(
88 state, session_id, request_id, tool_call,
89 )),
90
91 Action::ToolApprovalDecided {
92 session_id,
93 request_id,
94 decision,
95 remember,
96 } => Ok(handle_tool_approval_decided(
97 state, session_id, request_id, decision, remember,
98 )),
99
100 Action::ToolExecutionStarted {
101 session_id,
102 tool_call_id,
103 tool_name,
104 tool_parameters,
105 } => Ok(handle_tool_execution_started(
106 state,
107 session_id,
108 tool_call_id,
109 tool_name,
110 tool_parameters,
111 )),
112
113 Action::ToolResult {
114 session_id,
115 tool_call_id,
116 tool_name,
117 result,
118 } => Ok(handle_tool_result(
119 state,
120 session_id,
121 tool_call_id,
122 tool_name,
123 result,
124 )),
125
126 Action::ModelResponseComplete {
127 session_id,
128 op_id,
129 message_id,
130 content,
131 timestamp,
132 } => Ok(handle_model_response_complete(
133 state, session_id, op_id, message_id, content, timestamp,
134 )),
135
136 Action::ModelResponseError {
137 session_id,
138 op_id,
139 error,
140 } => Ok(handle_model_response_error(
141 state, session_id, op_id, &error,
142 )),
143
144 Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
145
146 Action::DirectBashCommand {
147 session_id,
148 op_id,
149 message_id,
150 command,
151 timestamp,
152 } => Ok(handle_direct_bash(
153 state, session_id, op_id, message_id, command, timestamp,
154 )),
155
156 Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
157
158 Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
159
160 Action::RequestCompaction {
161 session_id,
162 op_id,
163 model,
164 } => handle_request_compaction(state, session_id, op_id, model),
165
166 Action::Hydrate {
167 session_id,
168 events,
169 starting_sequence,
170 } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
171
172 Action::WorkspaceFilesListed { files, .. } => {
173 state.workspace_files = files;
174 Ok(vec![])
175 }
176
177 Action::ToolSchemasAvailable { tools, .. } => {
178 state.tools = tools;
179 Ok(vec![])
180 }
181
182 Action::ToolSchemasUpdated { schemas, .. } => {
183 state.tools = schemas;
184 Ok(vec![])
185 }
186
187 Action::SwitchPrimaryAgent {
188 session_id,
189 agent_id,
190 } => handle_switch_primary_agent(state, session_id, agent_id),
191
192 Action::McpServerStateChanged {
193 session_id,
194 server_name,
195 state: new_state,
196 } => {
197 if let McpServerState::Connected { tools } = &new_state {
199 let tools = state.session_config.as_ref().map_or_else(
200 || tools.clone(),
201 |config| config.filter_tools_by_visibility(tools.clone()),
202 );
203
204 for tool in tools {
206 if !state.tools.iter().any(|t| t.name == tool.name) {
207 state.tools.push(tool.clone());
208 }
209 }
210 }
211
212 if matches!(
214 &new_state,
215 McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
216 ) {
217 let prefix = format!("mcp__{server_name}__");
218 state.tools.retain(|t| !t.name.starts_with(&prefix));
219 }
220
221 state
222 .mcp_servers
223 .insert(server_name.clone(), new_state.clone());
224 Ok(vec![Effect::EmitEvent {
225 session_id,
226 event: SessionEvent::McpServerStateChanged {
227 server_name,
228 state: new_state,
229 },
230 }])
231 }
232
233 Action::CompactionComplete {
234 session_id,
235 op_id,
236 compaction_id,
237 summary_message_id,
238 summary,
239 compacted_head_message_id,
240 previous_active_message_id,
241 model,
242 timestamp,
243 } => Ok(handle_compaction_complete(
244 state,
245 session_id,
246 CompactionCompleteParams {
247 op_id,
248 compaction_id,
249 summary_message_id,
250 summary,
251 compacted_head_message_id,
252 previous_active_message_id,
253 model_name: model,
254 timestamp,
255 },
256 )),
257
258 Action::CompactionFailed {
259 session_id,
260 op_id,
261 error,
262 } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
263
264 Action::Shutdown => Ok(vec![]),
265 }
266}
267
268fn handle_user_input(
269 state: &mut AppState,
270 session_id: crate::app::domain::types::SessionId,
271 text: crate::app::domain::types::NonEmptyString,
272 op_id: crate::app::domain::types::OpId,
273 message_id: crate::app::domain::types::MessageId,
274 model: crate::config::model::ModelId,
275 timestamp: u64,
276) -> Vec<Effect> {
277 let mut effects = Vec::new();
278
279 if state.has_active_operation() {
280 state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
281 text,
282 op_id,
283 message_id,
284 model,
285 queued_at: timestamp,
286 });
287 effects.push(Effect::EmitEvent {
288 session_id,
289 event: SessionEvent::QueueUpdated {
290 queue: snapshot_queue(state),
291 },
292 });
293 return effects;
294 }
295
296 let parent_id = state.message_graph.active_message_id.clone();
297
298 let message = Message {
299 data: MessageData::User {
300 content: vec![UserContent::Text {
301 text: text.as_str().to_string(),
302 }],
303 },
304 timestamp,
305 id: message_id.0.clone(),
306 parent_message_id: parent_id,
307 };
308
309 state.message_graph.add_message(message.clone());
310 state.message_graph.active_message_id = Some(message_id.0.clone());
311
312 state.start_operation(op_id, OperationKind::AgentLoop);
313 state.operation_models.insert(op_id, model.clone());
314
315 effects.push(Effect::EmitEvent {
316 session_id,
317 event: SessionEvent::UserMessageAdded {
318 message: message.clone(),
319 },
320 });
321
322 effects.push(Effect::EmitEvent {
323 session_id,
324 event: SessionEvent::OperationStarted {
325 op_id,
326 kind: OperationKind::AgentLoop,
327 },
328 });
329
330 effects.push(Effect::CallModel {
331 session_id,
332 op_id,
333 model,
334 messages: state
335 .message_graph
336 .get_thread_messages()
337 .into_iter()
338 .cloned()
339 .collect(),
340 system_context: state.cached_system_context.clone(),
341 tools: state.tools.clone(),
342 });
343
344 effects
345}
346
347struct UserEditedMessageParams {
348 original_message_id: crate::app::domain::types::MessageId,
349 new_content: String,
350 op_id: crate::app::domain::types::OpId,
351 new_message_id: crate::app::domain::types::MessageId,
352 model: crate::config::model::ModelId,
353 timestamp: u64,
354}
355
356fn handle_user_edited_message(
357 state: &mut AppState,
358 session_id: crate::app::domain::types::SessionId,
359 params: UserEditedMessageParams,
360) -> Result<Vec<Effect>, ReduceError> {
361 let UserEditedMessageParams {
362 original_message_id,
363 new_content,
364 op_id,
365 new_message_id,
366 model,
367 timestamp,
368 } = params;
369 let mut effects = Vec::new();
370
371 if state.has_active_operation() {
372 return Err(invalid_action(
373 InvalidActionKind::OperationInFlight,
374 "Cannot edit message while an operation is active.",
375 ));
376 }
377
378 let parent_id = state
379 .message_graph
380 .messages
381 .iter()
382 .find(|m| m.id() == original_message_id.0)
383 .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
384
385 let message = Message {
386 data: MessageData::User {
387 content: vec![UserContent::Text { text: new_content }],
388 },
389 timestamp,
390 id: new_message_id.0.clone(),
391 parent_message_id: parent_id,
392 };
393
394 state.message_graph.add_message(message.clone());
395 state.message_graph.active_message_id = Some(new_message_id.0.clone());
396
397 state.start_operation(op_id, OperationKind::AgentLoop);
398 state.operation_models.insert(op_id, model.clone());
399
400 effects.push(Effect::EmitEvent {
401 session_id,
402 event: SessionEvent::UserMessageAdded {
403 message: message.clone(),
404 },
405 });
406
407 effects.push(Effect::EmitEvent {
408 session_id,
409 event: SessionEvent::OperationStarted {
410 op_id,
411 kind: OperationKind::AgentLoop,
412 },
413 });
414
415 effects.push(Effect::CallModel {
416 session_id,
417 op_id,
418 model,
419 messages: state
420 .message_graph
421 .get_thread_messages()
422 .into_iter()
423 .cloned()
424 .collect(),
425 system_context: state.cached_system_context.clone(),
426 tools: state.tools.clone(),
427 });
428
429 Ok(effects)
430}
431
432fn handle_tool_approval_requested(
433 state: &mut AppState,
434 session_id: crate::app::domain::types::SessionId,
435 request_id: crate::app::domain::types::RequestId,
436 tool_call: steer_tools::ToolCall,
437) -> Vec<Effect> {
438 let mut effects = Vec::new();
439
440 if let Err(error) = validate_tool_call(state, &tool_call) {
441 let error_message = error.to_string();
442 return fail_tool_call_without_execution(
443 state,
444 session_id,
445 tool_call,
446 error,
447 error_message,
448 "invalid",
449 true,
450 );
451 }
452
453 let decision = get_tool_decision(state, &tool_call);
454
455 match decision {
456 ToolDecision::Allow => {
457 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
458 return vec![Effect::EmitEvent {
459 session_id,
460 event: SessionEvent::Error {
461 message: "Tool approval requested without active operation".to_string(),
462 },
463 }];
464 };
465 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
466 &tool_call.id,
467 ));
468
469 effects.push(Effect::ExecuteTool {
470 session_id,
471 op_id,
472 tool_call,
473 });
474 }
475 ToolDecision::Deny => {
476 let error = ToolError::DeniedByPolicy(tool_call.name.clone());
477 let tool_name = tool_call.name.clone();
478 effects.extend(fail_tool_call_without_execution(
479 state,
480 session_id,
481 tool_call,
482 error,
483 format!("Tool '{tool_name}' denied by policy"),
484 "denied",
485 true,
486 ));
487 }
488 ToolDecision::Ask => {
489 if state.pending_approval.is_some() {
490 state.approval_queue.push_back(QueuedApproval { tool_call });
491 return effects;
492 }
493
494 state.pending_approval = Some(PendingApproval {
495 request_id,
496 tool_call: tool_call.clone(),
497 });
498
499 effects.push(Effect::EmitEvent {
500 session_id,
501 event: SessionEvent::ApprovalRequested {
502 request_id,
503 tool_call: tool_call.clone(),
504 },
505 });
506
507 effects.push(Effect::RequestUserApproval {
508 session_id,
509 request_id,
510 tool_call,
511 });
512 }
513 }
514
515 effects
516}
517
518fn validate_tool_call(
519 state: &AppState,
520 tool_call: &steer_tools::ToolCall,
521) -> Result<(), ToolError> {
522 if tool_call.name.trim().is_empty() {
523 return Err(ToolError::invalid_params(
524 "unknown",
525 "Malformed tool call: missing tool name",
526 ));
527 }
528
529 if tool_call.id.trim().is_empty() {
530 return Err(ToolError::invalid_params(
531 tool_call.name.clone(),
532 "Malformed tool call: missing tool call id",
533 ));
534 }
535
536 if state.tools.is_empty() {
537 return Ok(());
538 }
539
540 let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
541 return Ok(());
542 };
543
544 validate_against_json_schema(
545 &tool_call.name,
546 schema.input_schema.as_value(),
547 &tool_call.parameters,
548 )
549}
550
551fn validate_against_json_schema(
552 tool_name: &str,
553 schema: &Value,
554 params: &Value,
555) -> Result<(), ToolError> {
556 let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
557 ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
558 })?;
559
560 if let Err(errors) = validator.validate(params) {
561 let message = errors
562 .into_iter()
563 .map(|error| error.to_string())
564 .next()
565 .unwrap_or_else(|| "Parameters do not match schema".to_string());
566 return Err(ToolError::invalid_params(tool_name.to_string(), message));
567 }
568
569 Ok(())
570}
571
572fn emit_tool_failure_message(
573 state: &mut AppState,
574 session_id: crate::app::domain::types::SessionId,
575 tool_call_id: &str,
576 tool_name: &str,
577 tool_error: ToolError,
578 event_error: String,
579 message_id_prefix: &str,
580) -> Vec<Effect> {
581 let mut effects = Vec::new();
582
583 let tool_result = ToolResult::Error(tool_error);
584 let parent_id = state.message_graph.active_message_id.clone();
585 let tool_message = Message {
586 data: MessageData::Tool {
587 tool_use_id: tool_call_id.to_string(),
588 result: tool_result,
589 },
590 timestamp: 0,
591 id: format!("{message_id_prefix}_{tool_call_id}"),
592 parent_message_id: parent_id,
593 };
594 state.message_graph.add_message(tool_message.clone());
595
596 if let Some(model) = state
597 .current_operation
598 .as_ref()
599 .and_then(|op| state.operation_models.get(&op.op_id).cloned())
600 {
601 effects.push(Effect::EmitEvent {
602 session_id,
603 event: SessionEvent::ToolCallFailed {
604 id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
605 name: tool_name.to_string(),
606 error: event_error,
607 model,
608 },
609 });
610 }
611
612 effects.push(Effect::EmitEvent {
613 session_id,
614 event: SessionEvent::ToolMessageAdded {
615 message: tool_message,
616 },
617 });
618
619 effects
620}
621
622fn fail_tool_call_without_execution(
623 state: &mut AppState,
624 session_id: crate::app::domain::types::SessionId,
625 tool_call: steer_tools::ToolCall,
626 tool_error: ToolError,
627 event_error: String,
628 message_id_prefix: &str,
629 call_model_if_ready: bool,
630) -> Vec<Effect> {
631 let mut effects = emit_tool_failure_message(
632 state,
633 session_id,
634 &tool_call.id,
635 &tool_call.name,
636 tool_error,
637 event_error,
638 message_id_prefix,
639 );
640
641 if !call_model_if_ready {
642 return effects;
643 }
644
645 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
646 effects.push(Effect::EmitEvent {
647 session_id,
648 event: SessionEvent::Error {
649 message: "Tool failure recorded without active operation".to_string(),
650 },
651 });
652 return effects;
653 };
654 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
655 model
656 } else {
657 effects.push(Effect::EmitEvent {
658 session_id,
659 event: SessionEvent::Error {
660 message: format!("Missing model for operation {op_id}"),
661 },
662 });
663 return effects;
664 };
665
666 let all_tools_complete = state
667 .current_operation
668 .as_ref()
669 .is_none_or(|op| op.pending_tool_calls.is_empty());
670 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
671
672 if all_tools_complete && no_pending_approvals {
673 effects.push(Effect::CallModel {
674 session_id,
675 op_id,
676 model,
677 messages: state
678 .message_graph
679 .get_thread_messages()
680 .into_iter()
681 .cloned()
682 .collect(),
683 system_context: state.cached_system_context.clone(),
684 tools: state.tools.clone(),
685 });
686 }
687
688 effects
689}
690
691fn handle_tool_approval_decided(
692 state: &mut AppState,
693 session_id: crate::app::domain::types::SessionId,
694 request_id: crate::app::domain::types::RequestId,
695 decision: ApprovalDecision,
696 remember: Option<ApprovalMemory>,
697) -> Vec<Effect> {
698 let mut effects = Vec::new();
699
700 let pending = match state.pending_approval.take() {
701 Some(p) if p.request_id == request_id => p,
702 other => {
703 state.pending_approval = other;
704 return effects;
705 }
706 };
707
708 let resolved_memory = if decision == ApprovalDecision::Approved {
709 match remember {
710 Some(ApprovalMemory::PendingTool) => {
711 Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
712 }
713 Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
714 Some(ApprovalMemory::BashPattern(pattern)) => {
715 Some(ApprovalMemory::BashPattern(pattern))
716 }
717 None => None,
718 }
719 } else {
720 None
721 };
722
723 effects.push(Effect::EmitEvent {
724 session_id,
725 event: SessionEvent::ApprovalDecided {
726 request_id,
727 decision,
728 remember: resolved_memory.clone(),
729 },
730 });
731
732 if decision == ApprovalDecision::Approved {
733 if let Some(ref memory) = resolved_memory {
734 match memory {
735 ApprovalMemory::Tool(name) => {
736 state.approved_tools.insert(name.clone());
737 }
738 ApprovalMemory::BashPattern(pattern) => {
739 state.approved_bash_patterns.insert(pattern.clone());
740 }
741 ApprovalMemory::PendingTool => {}
742 }
743 }
744
745 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
746 effects.push(Effect::EmitEvent {
747 session_id,
748 event: SessionEvent::Error {
749 message: "Tool approval decided without active operation".to_string(),
750 },
751 });
752 return effects;
753 };
754 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
755 &pending.tool_call.id,
756 ));
757
758 effects.push(Effect::ExecuteTool {
759 session_id,
760 op_id,
761 tool_call: pending.tool_call,
762 });
763 } else {
764 let tool_name = pending.tool_call.name.clone();
765 let error = ToolError::DeniedByUser(tool_name.clone());
766 effects.extend(fail_tool_call_without_execution(
767 state,
768 session_id,
769 pending.tool_call,
770 error,
771 format!("Tool '{tool_name}' denied by user"),
772 "denied",
773 false,
774 ));
775 }
776
777 effects.extend(process_next_queued_approval(state, session_id));
778
779 effects
780}
781
782fn process_next_queued_approval(
783 state: &mut AppState,
784 session_id: crate::app::domain::types::SessionId,
785) -> Vec<Effect> {
786 let mut effects = Vec::new();
787
788 while let Some(queued) = state.approval_queue.pop_front() {
789 let decision = get_tool_decision(state, &queued.tool_call);
790
791 match decision {
792 ToolDecision::Allow => {
793 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
794 effects.push(Effect::EmitEvent {
795 session_id,
796 event: SessionEvent::Error {
797 message: "Queued tool approval processed without active operation"
798 .to_string(),
799 },
800 });
801 state.approval_queue.push_front(queued);
802 break;
803 };
804 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
805 &queued.tool_call.id,
806 ));
807
808 effects.push(Effect::ExecuteTool {
809 session_id,
810 op_id,
811 tool_call: queued.tool_call,
812 });
813 }
814 ToolDecision::Deny => {
815 let tool_name = queued.tool_call.name.clone();
816 let error = ToolError::DeniedByPolicy(tool_name.clone());
817 effects.extend(fail_tool_call_without_execution(
818 state,
819 session_id,
820 queued.tool_call,
821 error,
822 format!("Tool '{tool_name}' denied by policy"),
823 "denied",
824 false,
825 ));
826 }
827 ToolDecision::Ask => {
828 let request_id = crate::app::domain::types::RequestId::new();
829 state.pending_approval = Some(PendingApproval {
830 request_id,
831 tool_call: queued.tool_call.clone(),
832 });
833
834 effects.push(Effect::EmitEvent {
835 session_id,
836 event: SessionEvent::ApprovalRequested {
837 request_id,
838 tool_call: queued.tool_call.clone(),
839 },
840 });
841
842 effects.push(Effect::RequestUserApproval {
843 session_id,
844 request_id,
845 tool_call: queued.tool_call,
846 });
847
848 break;
849 }
850 }
851 }
852
853 let all_tools_complete = state
854 .current_operation
855 .as_ref()
856 .is_none_or(|op| op.pending_tool_calls.is_empty());
857 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
858
859 if all_tools_complete
860 && no_pending_approvals
861 && let Some(op) = &state.current_operation
862 {
863 let op_id = op.op_id;
864 if let Some(model) = state.operation_models.get(&op_id).cloned() {
865 effects.push(Effect::CallModel {
866 session_id,
867 op_id,
868 model,
869 messages: state
870 .message_graph
871 .get_thread_messages()
872 .into_iter()
873 .cloned()
874 .collect(),
875 system_context: state.cached_system_context.clone(),
876 tools: state.tools.clone(),
877 });
878 }
879 }
880
881 effects
882}
883
884fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
885 if state.approved_tools.contains(&tool_call.name) {
886 return ToolDecision::Allow;
887 }
888
889 if tool_call.name == DISPATCH_AGENT_TOOL_NAME
890 && let Ok(params) =
891 serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
892 && let Some(config) = state.session_config.as_ref()
893 {
894 let policy = &config.tool_config.approval_policy;
895 match params.target {
896 DispatchAgentTarget::Resume { .. } => {
897 return ToolDecision::Allow;
898 }
899 DispatchAgentTarget::New { agent, .. } => {
900 let agent_id = agent
901 .as_deref()
902 .filter(|value| !value.trim().is_empty())
903 .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
904 if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
905 return ToolDecision::Allow;
906 }
907 }
908 }
909 }
910
911 if tool_call.name == BASH_TOOL_NAME
912 && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
913 tool_call.parameters.clone(),
914 )
915 && state.is_bash_pattern_approved(¶ms.command)
916 {
917 return ToolDecision::Allow;
918 }
919
920 state
921 .session_config
922 .as_ref()
923 .map_or(ToolDecision::Ask, |config| {
924 config
925 .tool_config
926 .approval_policy
927 .tool_decision(&tool_call.name)
928 })
929}
930
931fn handle_tool_execution_started(
932 state: &mut AppState,
933 session_id: crate::app::domain::types::SessionId,
934 tool_call_id: crate::app::domain::types::ToolCallId,
935 tool_name: String,
936 tool_parameters: serde_json::Value,
937) -> Vec<Effect> {
938 state.add_pending_tool_call(tool_call_id.clone());
939
940 let op_id = match state.current_operation.as_ref() {
941 Some(op) => op.op_id,
942 None => {
943 return vec![Effect::EmitEvent {
944 session_id,
945 event: SessionEvent::Error {
946 message: "Tool call started without active operation".to_string(),
947 },
948 }];
949 }
950 };
951
952 let is_direct_bash = matches!(
953 state.current_operation.as_ref().map(|op| &op.kind),
954 Some(OperationKind::DirectBash { .. })
955 );
956
957 if is_direct_bash {
958 return vec![];
959 }
960
961 let model = match state.operation_models.get(&op_id).cloned() {
962 Some(model) => model,
963 None => {
964 return vec![Effect::EmitEvent {
965 session_id,
966 event: SessionEvent::Error {
967 message: format!("Missing model for tool call on operation {op_id}"),
968 },
969 }];
970 }
971 };
972
973 vec![Effect::EmitEvent {
974 session_id,
975 event: SessionEvent::ToolCallStarted {
976 id: tool_call_id,
977 name: tool_name,
978 parameters: tool_parameters,
979 model,
980 },
981 }]
982}
983
984fn handle_tool_result(
985 state: &mut AppState,
986 session_id: crate::app::domain::types::SessionId,
987 tool_call_id: crate::app::domain::types::ToolCallId,
988 tool_name: String,
989 result: Result<ToolResult, ToolError>,
990) -> Vec<Effect> {
991 let mut effects = Vec::new();
992
993 let op = match &state.current_operation {
994 Some(op) => {
995 if state.cancelled_ops.contains(&op.op_id) {
996 tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
997 return effects;
998 }
999 op.clone()
1000 }
1001 None => return effects,
1002 };
1003 let op_id = op.op_id;
1004
1005 state.remove_pending_tool_call(&tool_call_id);
1006
1007 let tool_result = match result {
1008 Ok(r) => r,
1009 Err(e) => ToolResult::Error(e),
1010 };
1011
1012 let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1013
1014 if is_direct_bash {
1015 let command = match &op.kind {
1016 OperationKind::DirectBash { command } => command.clone(),
1017 _ => tool_name,
1018 };
1019
1020 let (stdout, stderr, exit_code) = match &tool_result {
1021 ToolResult::Bash(result) => (
1022 result.stdout.clone(),
1023 result.stderr.clone(),
1024 result.exit_code,
1025 ),
1026 ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1027 other => (format!("{other:?}"), String::new(), 0),
1028 };
1029
1030 let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1031 state
1032 .message_graph
1033 .update_command_execution(
1034 id.as_str(),
1035 command.clone(),
1036 stdout.clone(),
1037 stderr.clone(),
1038 exit_code,
1039 )
1040 .or_else(|| {
1041 let parent_id = state.message_graph.active_message_id.clone();
1042 let timestamp = Message::current_timestamp();
1043 Some(Message {
1044 data: MessageData::User {
1045 content: vec![UserContent::CommandExecution {
1046 command: command.clone(),
1047 stdout: stdout.clone(),
1048 stderr: stderr.clone(),
1049 exit_code,
1050 }],
1051 },
1052 timestamp,
1053 id: id.to_string(),
1054 parent_message_id: parent_id,
1055 })
1056 })
1057 });
1058
1059 state.complete_operation(op_id);
1060
1061 if let Some(message) = updated {
1062 effects.push(Effect::EmitEvent {
1063 session_id,
1064 event: SessionEvent::MessageUpdated { message },
1065 });
1066 }
1067
1068 effects.push(Effect::EmitEvent {
1069 session_id,
1070 event: SessionEvent::OperationCompleted { op_id },
1071 });
1072
1073 effects.extend(maybe_start_queued_work(state, session_id));
1074
1075 return effects;
1076 }
1077
1078 let model = match state.operation_models.get(&op_id).cloned() {
1079 Some(model) => model,
1080 None => {
1081 return vec![Effect::EmitEvent {
1082 session_id,
1083 event: SessionEvent::Error {
1084 message: format!("Missing model for tool result on operation {op_id}"),
1085 },
1086 }];
1087 }
1088 };
1089
1090 let event = match &tool_result {
1091 ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1092 id: tool_call_id.clone(),
1093 name: tool_name.clone(),
1094 error: e.to_string(),
1095 model: model.clone(),
1096 },
1097 _ => SessionEvent::ToolCallCompleted {
1098 id: tool_call_id.clone(),
1099 name: tool_name,
1100 result: tool_result.clone(),
1101 model: model.clone(),
1102 },
1103 };
1104
1105 effects.push(Effect::EmitEvent { session_id, event });
1106
1107 let parent_id = state.message_graph.active_message_id.clone();
1108 let tool_message = Message {
1109 data: MessageData::Tool {
1110 tool_use_id: tool_call_id.0.clone(),
1111 result: tool_result,
1112 },
1113 timestamp: 0,
1114 id: format!("tool_result_{}", tool_call_id.0),
1115 parent_message_id: parent_id,
1116 };
1117 state.message_graph.add_message(tool_message.clone());
1118
1119 effects.push(Effect::EmitEvent {
1120 session_id,
1121 event: SessionEvent::ToolMessageAdded {
1122 message: tool_message,
1123 },
1124 });
1125
1126 let all_tools_complete = state
1127 .current_operation
1128 .as_ref()
1129 .is_none_or(|op| op.pending_tool_calls.is_empty());
1130 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1131
1132 if all_tools_complete && no_pending_approvals {
1133 effects.push(Effect::CallModel {
1134 session_id,
1135 op_id,
1136 model,
1137 messages: state
1138 .message_graph
1139 .get_thread_messages()
1140 .into_iter()
1141 .cloned()
1142 .collect(),
1143 system_context: state.cached_system_context.clone(),
1144 tools: state.tools.clone(),
1145 });
1146 }
1147
1148 effects
1149}
1150
1151fn handle_model_response_complete(
1152 state: &mut AppState,
1153 session_id: crate::app::domain::types::SessionId,
1154 op_id: crate::app::domain::types::OpId,
1155 message_id: crate::app::domain::types::MessageId,
1156 content: Vec<AssistantContent>,
1157 timestamp: u64,
1158) -> Vec<Effect> {
1159 let mut effects = Vec::new();
1160
1161 if state.cancelled_ops.contains(&op_id) {
1162 tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1163 return effects;
1164 }
1165
1166 let tool_calls: Vec<_> = content
1167 .iter()
1168 .filter_map(|c| {
1169 if let AssistantContent::ToolCall { tool_call, .. } = c {
1170 Some(tool_call.clone())
1171 } else {
1172 None
1173 }
1174 })
1175 .collect();
1176
1177 let parent_id = state.message_graph.active_message_id.clone();
1178
1179 let message = Message {
1180 data: MessageData::Assistant {
1181 content: content.clone(),
1182 },
1183 timestamp,
1184 id: message_id.0.clone(),
1185 parent_message_id: parent_id,
1186 };
1187
1188 state.message_graph.add_message(message.clone());
1189 state.message_graph.active_message_id = Some(message_id.0.clone());
1190
1191 let model = match state.operation_models.get(&op_id).cloned() {
1192 Some(model) => model,
1193 None => {
1194 return vec![Effect::EmitEvent {
1195 session_id,
1196 event: SessionEvent::Error {
1197 message: format!("Missing model for operation {op_id}"),
1198 },
1199 }];
1200 }
1201 };
1202
1203 effects.push(Effect::EmitEvent {
1204 session_id,
1205 event: SessionEvent::AssistantMessageAdded { message, model },
1206 });
1207
1208 if tool_calls.is_empty() {
1209 state.complete_operation(op_id);
1210 effects.push(Effect::EmitEvent {
1211 session_id,
1212 event: SessionEvent::OperationCompleted { op_id },
1213 });
1214 effects.extend(maybe_start_queued_work(state, session_id));
1215 } else {
1216 for tool_call in tool_calls {
1217 let request_id = crate::app::domain::types::RequestId::new();
1218 effects.extend(handle_tool_approval_requested(
1219 state, session_id, request_id, tool_call,
1220 ));
1221 }
1222 }
1223
1224 effects
1225}
1226
1227fn handle_model_response_error(
1228 state: &mut AppState,
1229 session_id: crate::app::domain::types::SessionId,
1230 op_id: crate::app::domain::types::OpId,
1231 error: &str,
1232) -> Vec<Effect> {
1233 let mut effects = Vec::new();
1234
1235 if state.cancelled_ops.contains(&op_id) {
1236 return effects;
1237 }
1238
1239 state.complete_operation(op_id);
1240
1241 effects.push(Effect::EmitEvent {
1242 session_id,
1243 event: SessionEvent::Error {
1244 message: error.to_string(),
1245 },
1246 });
1247
1248 effects.push(Effect::EmitEvent {
1249 session_id,
1250 event: SessionEvent::OperationCompleted { op_id },
1251 });
1252
1253 effects.extend(maybe_start_queued_work(state, session_id));
1254
1255 effects
1256}
1257
1258fn handle_direct_bash(
1259 state: &mut AppState,
1260 session_id: crate::app::domain::types::SessionId,
1261 op_id: crate::app::domain::types::OpId,
1262 message_id: crate::app::domain::types::MessageId,
1263 command: String,
1264 timestamp: u64,
1265) -> Vec<Effect> {
1266 let mut effects = Vec::new();
1267
1268 if state.has_active_operation() {
1269 state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1270 command,
1271 op_id,
1272 message_id,
1273 queued_at: timestamp,
1274 });
1275 effects.push(Effect::EmitEvent {
1276 session_id,
1277 event: SessionEvent::QueueUpdated {
1278 queue: snapshot_queue(state),
1279 },
1280 });
1281 return effects;
1282 }
1283
1284 let parent_id = state.message_graph.active_message_id.clone();
1285 let message = Message {
1286 data: MessageData::User {
1287 content: vec![UserContent::CommandExecution {
1288 command: command.clone(),
1289 stdout: String::new(),
1290 stderr: String::new(),
1291 exit_code: 0,
1292 }],
1293 },
1294 timestamp,
1295 id: message_id.0.clone(),
1296 parent_message_id: parent_id,
1297 };
1298
1299 state.message_graph.add_message(message.clone());
1300 state.message_graph.active_message_id = Some(message_id.0.clone());
1301
1302 state.start_operation(
1303 op_id,
1304 OperationKind::DirectBash {
1305 command: command.clone(),
1306 },
1307 );
1308 state.operation_messages.insert(op_id, message_id);
1309
1310 effects.push(Effect::EmitEvent {
1311 session_id,
1312 event: SessionEvent::UserMessageAdded { message },
1313 });
1314
1315 effects.push(Effect::EmitEvent {
1316 session_id,
1317 event: SessionEvent::OperationStarted {
1318 op_id,
1319 kind: OperationKind::DirectBash {
1320 command: command.clone(),
1321 },
1322 },
1323 });
1324
1325 let tool_call = steer_tools::ToolCall {
1326 id: format!("direct_bash_{op_id}"),
1327 name: BASH_TOOL_NAME.to_string(),
1328 parameters: serde_json::json!({ "command": command }),
1329 };
1330
1331 effects.push(Effect::ExecuteTool {
1332 session_id,
1333 op_id,
1334 tool_call,
1335 });
1336
1337 effects
1338}
1339
1340fn handle_dequeue_queued_item(
1341 state: &mut AppState,
1342 session_id: crate::app::domain::types::SessionId,
1343) -> Result<Vec<Effect>, ReduceError> {
1344 if state.pop_next_queued_work().is_some() {
1345 Ok(vec![Effect::EmitEvent {
1346 session_id,
1347 event: SessionEvent::QueueUpdated {
1348 queue: snapshot_queue(state),
1349 },
1350 }])
1351 } else {
1352 Err(invalid_action(
1353 InvalidActionKind::QueueEmpty,
1354 "No queued item to remove.",
1355 ))
1356 }
1357}
1358
1359fn maybe_start_queued_work(
1360 state: &mut AppState,
1361 session_id: crate::app::domain::types::SessionId,
1362) -> Vec<Effect> {
1363 if state.has_active_operation() {
1364 return vec![];
1365 }
1366
1367 let Some(next) = state.pop_next_queued_work() else {
1368 return vec![];
1369 };
1370
1371 let mut effects = vec![Effect::EmitEvent {
1372 session_id,
1373 event: SessionEvent::QueueUpdated {
1374 queue: snapshot_queue(state),
1375 },
1376 }];
1377
1378 match next {
1379 QueuedWorkItem::UserMessage(item) => {
1380 effects.extend(handle_user_input(
1381 state,
1382 session_id,
1383 item.text,
1384 item.op_id,
1385 item.message_id,
1386 item.model,
1387 item.queued_at,
1388 ));
1389 }
1390 QueuedWorkItem::DirectBash(item) => {
1391 effects.extend(handle_direct_bash(
1392 state,
1393 session_id,
1394 item.op_id,
1395 item.message_id,
1396 item.command,
1397 item.queued_at,
1398 ));
1399 }
1400 }
1401
1402 effects
1403}
1404
1405fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1406 state
1407 .queued_work
1408 .iter()
1409 .map(snapshot_queued_work_item)
1410 .collect()
1411}
1412
1413fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1414 match item {
1415 QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1416 kind: Some(QueuedWorkKind::UserMessage),
1417 content: message.text.as_str().to_string(),
1418 queued_at: message.queued_at,
1419 model: Some(message.model.clone()),
1420 op_id: message.op_id,
1421 message_id: message.message_id.clone(),
1422 },
1423 QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1424 kind: Some(QueuedWorkKind::DirectBash),
1425 content: command.command.clone(),
1426 queued_at: command.queued_at,
1427 model: None,
1428 op_id: command.op_id,
1429 message_id: command.message_id.clone(),
1430 },
1431 }
1432}
1433
1434fn handle_request_compaction(
1435 state: &mut AppState,
1436 session_id: crate::app::domain::types::SessionId,
1437 op_id: crate::app::domain::types::OpId,
1438 model: crate::config::model::ModelId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440 const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1441 let message_count = state.message_graph.get_thread_messages().len();
1442
1443 if state.has_active_operation() {
1444 return Err(invalid_action(
1445 InvalidActionKind::OperationInFlight,
1446 "Cannot compact while an operation is active.",
1447 ));
1448 }
1449
1450 if message_count < MIN_MESSAGES_FOR_COMPACT {
1451 return Ok(vec![Effect::EmitEvent {
1452 session_id,
1453 event: SessionEvent::CompactResult {
1454 result: crate::app::domain::event::CompactResult::InsufficientMessages,
1455 },
1456 }]);
1457 }
1458
1459 state.start_operation(op_id, OperationKind::Compact);
1460 state.operation_models.insert(op_id, model.clone());
1461
1462 Ok(vec![
1463 Effect::EmitEvent {
1464 session_id,
1465 event: SessionEvent::OperationStarted {
1466 op_id,
1467 kind: OperationKind::Compact,
1468 },
1469 },
1470 Effect::RequestCompaction {
1471 session_id,
1472 op_id,
1473 model,
1474 },
1475 ])
1476}
1477
1478fn handle_cancel(
1479 state: &mut AppState,
1480 session_id: crate::app::domain::types::SessionId,
1481 target_op: Option<crate::app::domain::types::OpId>,
1482) -> Vec<Effect> {
1483 let mut effects = Vec::new();
1484
1485 let op = match &state.current_operation {
1486 Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1487 _ => return effects,
1488 };
1489
1490 state.record_cancelled_op(op.op_id);
1491
1492 if matches!(op.kind, OperationKind::Compact) {
1493 effects.push(Effect::EmitEvent {
1494 session_id,
1495 event: SessionEvent::CompactResult {
1496 result: crate::app::domain::event::CompactResult::Cancelled,
1497 },
1498 });
1499 }
1500
1501 let pending_approval = state.pending_approval.take();
1502 let queued_approvals = std::mem::take(&mut state.approval_queue);
1503
1504 if matches!(op.kind, OperationKind::AgentLoop) {
1505 if let Some(pending) = pending_approval {
1506 let tool_name = pending.tool_call.name.clone();
1507 effects.extend(emit_tool_failure_message(
1508 state,
1509 session_id,
1510 &pending.tool_call.id,
1511 &tool_name,
1512 ToolError::Cancelled(tool_name.clone()),
1513 format!("Tool '{tool_name}' cancelled"),
1514 "cancelled",
1515 ));
1516 }
1517
1518 for queued in queued_approvals {
1519 let tool_name = queued.tool_call.name.clone();
1520 effects.extend(emit_tool_failure_message(
1521 state,
1522 session_id,
1523 &queued.tool_call.id,
1524 &tool_name,
1525 ToolError::Cancelled(tool_name.clone()),
1526 format!("Tool '{tool_name}' cancelled"),
1527 "cancelled",
1528 ));
1529 }
1530
1531 for tool_call_id in &op.pending_tool_calls {
1532 let tool_name = state
1533 .message_graph
1534 .find_tool_name_by_id(tool_call_id.as_str())
1535 .unwrap_or_else(|| tool_call_id.as_str().to_string());
1536 let event_error = if tool_name == tool_call_id.as_str() {
1537 format!("Tool call '{tool_call_id}' cancelled")
1538 } else {
1539 format!("Tool '{tool_name}' cancelled")
1540 };
1541 effects.extend(emit_tool_failure_message(
1542 state,
1543 session_id,
1544 tool_call_id.as_str(),
1545 &tool_name,
1546 ToolError::Cancelled(tool_name.clone()),
1547 event_error,
1548 "cancelled",
1549 ));
1550 }
1551 }
1552 state.active_streams.remove(&op.op_id);
1553
1554 let dequeued_item = state.pop_next_queued_work();
1555 let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1556
1557 effects.push(Effect::EmitEvent {
1558 session_id,
1559 event: SessionEvent::OperationCancelled {
1560 op_id: op.op_id,
1561 info: CancellationInfo {
1562 pending_tool_calls: op.pending_tool_calls.len(),
1563 popped_queued_item,
1564 },
1565 },
1566 });
1567
1568 effects.push(Effect::CancelOperation {
1569 session_id,
1570 op_id: op.op_id,
1571 });
1572
1573 state.complete_operation(op.op_id);
1574
1575 if dequeued_item.is_some() {
1576 effects.push(Effect::EmitEvent {
1577 session_id,
1578 event: SessionEvent::QueueUpdated {
1579 queue: snapshot_queue(state),
1580 },
1581 });
1582 }
1583
1584 effects
1585}
1586
1587fn handle_hydrate(
1588 state: &mut AppState,
1589 session_id: crate::app::domain::types::SessionId,
1590 events: Vec<SessionEvent>,
1591 starting_sequence: u64,
1592) -> Vec<Effect> {
1593 for event in events {
1594 apply_event_to_state(state, &event);
1595 }
1596
1597 state.event_sequence = starting_sequence;
1598
1599 emit_mcp_connect_effects(state, session_id)
1600}
1601
1602fn handle_switch_primary_agent(
1603 state: &mut AppState,
1604 session_id: crate::app::domain::types::SessionId,
1605 agent_id: String,
1606) -> Result<Vec<Effect>, ReduceError> {
1607 if state.current_operation.is_some() {
1608 return Err(invalid_action(
1609 InvalidActionKind::OperationInFlight,
1610 "Cannot switch primary agent while an operation is active.",
1611 ));
1612 }
1613
1614 let Some(base_config) = state
1615 .base_session_config
1616 .as_ref()
1617 .or(state.session_config.as_ref())
1618 else {
1619 return Err(invalid_action(
1620 InvalidActionKind::MissingSessionConfig,
1621 "Cannot switch primary agent without session config.",
1622 ));
1623 };
1624
1625 let Some(_spec) = primary_agent_spec(&agent_id) else {
1626 return Err(invalid_action(
1627 InvalidActionKind::UnknownPrimaryAgent,
1628 format!("Unknown primary agent '{agent_id}'."),
1629 ));
1630 };
1631
1632 let mut updated_config = base_config.clone();
1633 updated_config.primary_agent_id = Some(agent_id.clone());
1634 let new_config = resolve_effective_config(&updated_config);
1635 let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1636
1637 apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1638
1639 let mut effects = Vec::new();
1640 effects.push(Effect::EmitEvent {
1641 session_id,
1642 event: SessionEvent::SessionConfigUpdated {
1643 config: Box::new(new_config),
1644 primary_agent_id: agent_id,
1645 },
1646 });
1647 effects.extend(backend_effects);
1648 effects.push(Effect::ReloadToolSchemas { session_id });
1649
1650 Ok(effects)
1651}
1652
1653fn apply_session_config_state(
1654 state: &mut AppState,
1655 config: &crate::session::state::SessionConfig,
1656 primary_agent_id: Option<String>,
1657 update_base: bool,
1658) {
1659 state.apply_session_config(config, primary_agent_id, update_base);
1660}
1661
1662fn mcp_backend_diff_effects(
1663 session_id: crate::app::domain::types::SessionId,
1664 old_config: &crate::session::state::SessionConfig,
1665 new_config: &crate::session::state::SessionConfig,
1666) -> Vec<Effect> {
1667 let old_map = collect_mcp_backends(old_config);
1668 let new_map = collect_mcp_backends(new_config);
1669
1670 let mut effects = Vec::new();
1671
1672 for (server_name, (old_transport, old_filter)) in &old_map {
1673 match new_map.get(server_name) {
1674 None => {
1675 effects.push(Effect::DisconnectMcpServer {
1676 session_id,
1677 server_name: server_name.clone(),
1678 });
1679 }
1680 Some((new_transport, new_filter)) => {
1681 if new_transport != old_transport || new_filter != old_filter {
1682 effects.push(Effect::DisconnectMcpServer {
1683 session_id,
1684 server_name: server_name.clone(),
1685 });
1686 effects.push(Effect::ConnectMcpServer {
1687 session_id,
1688 config: McpServerConfig {
1689 server_name: server_name.clone(),
1690 transport: new_transport.clone(),
1691 tool_filter: new_filter.clone(),
1692 },
1693 });
1694 }
1695 }
1696 }
1697 }
1698
1699 for (server_name, (new_transport, new_filter)) in &new_map {
1700 if !old_map.contains_key(server_name) {
1701 effects.push(Effect::ConnectMcpServer {
1702 session_id,
1703 config: McpServerConfig {
1704 server_name: server_name.clone(),
1705 transport: new_transport.clone(),
1706 tool_filter: new_filter.clone(),
1707 },
1708 });
1709 }
1710 }
1711
1712 effects
1713}
1714
1715fn collect_mcp_backends(
1716 config: &crate::session::state::SessionConfig,
1717) -> std::collections::HashMap<
1718 String,
1719 (
1720 crate::tools::McpTransport,
1721 crate::session::state::ToolFilter,
1722 ),
1723> {
1724 let mut map = std::collections::HashMap::new();
1725
1726 for backend_config in &config.tool_config.backends {
1727 let BackendConfig::Mcp {
1728 server_name,
1729 transport,
1730 tool_filter,
1731 } = backend_config;
1732
1733 map.insert(
1734 server_name.clone(),
1735 (transport.clone(), tool_filter.clone()),
1736 );
1737 }
1738
1739 map
1740}
1741
1742pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1743 match event {
1744 SessionEvent::SessionCreated { config, .. } => {
1745 let primary_agent_id = config
1746 .primary_agent_id
1747 .clone()
1748 .unwrap_or_else(|| default_primary_agent_id().to_string());
1749 apply_session_config_state(state, config, Some(primary_agent_id), true);
1750 }
1751 SessionEvent::SessionConfigUpdated {
1752 config,
1753 primary_agent_id,
1754 } => {
1755 apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1756 }
1757 SessionEvent::AssistantMessageAdded { message, .. }
1758 | SessionEvent::UserMessageAdded { message }
1759 | SessionEvent::ToolMessageAdded { message } => {
1760 state.message_graph.add_message(message.clone());
1761 state.message_graph.active_message_id = Some(message.id().to_string());
1762 }
1763 SessionEvent::MessageUpdated { message } => {
1764 state.message_graph.replace_message(message.clone());
1765 }
1766 SessionEvent::ApprovalDecided {
1767 decision, remember, ..
1768 } => {
1769 if *decision == ApprovalDecision::Approved
1770 && let Some(memory) = remember
1771 {
1772 match memory {
1773 ApprovalMemory::Tool(name) => {
1774 state.approved_tools.insert(name.clone());
1775 }
1776 ApprovalMemory::BashPattern(pattern) => {
1777 state.approved_bash_patterns.insert(pattern.clone());
1778 }
1779 ApprovalMemory::PendingTool => {}
1780 }
1781 }
1782 state.pending_approval = None;
1783 }
1784 SessionEvent::OperationCompleted { op_id } => {
1785 state.complete_operation(*op_id);
1786 }
1787 SessionEvent::OperationCancelled { op_id, .. } => {
1788 state.record_cancelled_op(*op_id);
1789 state.complete_operation(*op_id);
1790 }
1791 SessionEvent::McpServerStateChanged {
1792 server_name,
1793 state: mcp_state,
1794 } => {
1795 state
1796 .mcp_servers
1797 .insert(server_name.clone(), mcp_state.clone());
1798 }
1799 SessionEvent::QueueUpdated { queue } => {
1800 let normalize_text = |content: &str| {
1801 NonEmptyString::new(content.to_string())
1802 .or_else(|| NonEmptyString::new("(empty)".to_string()))
1803 };
1804
1805 state.queued_work = queue
1806 .iter()
1807 .filter_map(|item| match item.kind {
1808 Some(QueuedWorkKind::UserMessage) => {
1809 let text = normalize_text(item.content.as_str())?;
1810 Some(QueuedWorkItem::UserMessage(
1811 crate::app::domain::state::QueuedUserMessage {
1812 text,
1813 op_id: item.op_id,
1814 message_id: item.message_id.clone(),
1815 model: item.model.clone().unwrap_or_else(
1816 crate::config::model::builtin::claude_sonnet_4_5,
1817 ),
1818 queued_at: item.queued_at,
1819 },
1820 ))
1821 }
1822 Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1823 crate::app::domain::state::QueuedBashCommand {
1824 command: item.content.clone(),
1825 op_id: item.op_id,
1826 message_id: item.message_id.clone(),
1827 queued_at: item.queued_at,
1828 },
1829 )),
1830 None => {
1831 let text = normalize_text(item.content.as_str())?;
1832 Some(QueuedWorkItem::UserMessage(
1833 crate::app::domain::state::QueuedUserMessage {
1834 text,
1835 op_id: item.op_id,
1836 message_id: item.message_id.clone(),
1837 model: item.model.clone().unwrap_or_else(
1838 crate::config::model::builtin::claude_sonnet_4_5,
1839 ),
1840 queued_at: item.queued_at,
1841 },
1842 ))
1843 }
1844 })
1845 .collect();
1846 }
1847 _ => {}
1848 }
1849
1850 state.event_sequence += 1;
1851}
1852
1853struct CompactionCompleteParams {
1854 op_id: crate::app::domain::types::OpId,
1855 compaction_id: crate::app::domain::types::CompactionId,
1856 summary_message_id: crate::app::domain::types::MessageId,
1857 summary: String,
1858 compacted_head_message_id: crate::app::domain::types::MessageId,
1859 previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1860 model_name: String,
1861 timestamp: u64,
1862}
1863
1864fn handle_compaction_complete(
1865 state: &mut AppState,
1866 session_id: crate::app::domain::types::SessionId,
1867 params: CompactionCompleteParams,
1868) -> Vec<Effect> {
1869 use crate::app::conversation::{AssistantContent, Message, MessageData};
1870 use crate::app::domain::types::CompactionRecord;
1871
1872 let CompactionCompleteParams {
1873 op_id,
1874 compaction_id,
1875 summary_message_id,
1876 summary,
1877 compacted_head_message_id,
1878 previous_active_message_id,
1879 model_name,
1880 timestamp,
1881 } = params;
1882
1883 let summary_message = Message {
1884 data: MessageData::Assistant {
1885 content: vec![AssistantContent::Text {
1886 text: summary.clone(),
1887 }],
1888 },
1889 id: summary_message_id.to_string(),
1890 parent_message_id: None,
1891 timestamp,
1892 };
1893
1894 state.message_graph.add_message(summary_message.clone());
1895
1896 let record = CompactionRecord::with_timestamp(
1897 compaction_id,
1898 summary_message_id,
1899 compacted_head_message_id,
1900 previous_active_message_id,
1901 model_name,
1902 timestamp,
1903 );
1904
1905 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1906 model
1907 } else {
1908 state.complete_operation(op_id);
1909 return vec![Effect::EmitEvent {
1910 session_id,
1911 event: SessionEvent::Error {
1912 message: format!("Missing model for compaction operation {op_id}"),
1913 },
1914 }];
1915 };
1916
1917 state.complete_operation(op_id);
1918
1919 let mut effects = vec![
1920 Effect::EmitEvent {
1921 session_id,
1922 event: SessionEvent::AssistantMessageAdded {
1923 message: summary_message,
1924 model,
1925 },
1926 },
1927 Effect::EmitEvent {
1928 session_id,
1929 event: SessionEvent::CompactResult {
1930 result: crate::app::domain::event::CompactResult::Success(summary),
1931 },
1932 },
1933 Effect::EmitEvent {
1934 session_id,
1935 event: SessionEvent::ConversationCompacted { record },
1936 },
1937 Effect::EmitEvent {
1938 session_id,
1939 event: SessionEvent::OperationCompleted { op_id },
1940 },
1941 ];
1942
1943 effects.extend(maybe_start_queued_work(state, session_id));
1944
1945 effects
1946}
1947
1948fn handle_compaction_failed(
1949 state: &mut AppState,
1950 session_id: crate::app::domain::types::SessionId,
1951 op_id: crate::app::domain::types::OpId,
1952 error: String,
1953) -> Vec<Effect> {
1954 state.complete_operation(op_id);
1955
1956 let mut effects = vec![
1957 Effect::EmitEvent {
1958 session_id,
1959 event: SessionEvent::Error { message: error },
1960 },
1961 Effect::EmitEvent {
1962 session_id,
1963 event: SessionEvent::OperationCompleted { op_id },
1964 },
1965 ];
1966
1967 effects.extend(maybe_start_queued_work(state, session_id));
1968
1969 effects
1970}
1971
1972fn emit_mcp_connect_effects(
1973 state: &AppState,
1974 session_id: crate::app::domain::types::SessionId,
1975) -> Vec<Effect> {
1976 let mut effects = Vec::new();
1977
1978 let Some(ref config) = state.session_config else {
1979 return effects;
1980 };
1981
1982 for backend_config in &config.tool_config.backends {
1983 let BackendConfig::Mcp {
1984 server_name,
1985 transport,
1986 tool_filter,
1987 } = backend_config;
1988
1989 let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
1990 matches!(
1991 s,
1992 McpServerState::Connecting | McpServerState::Connected { .. }
1993 )
1994 });
1995
1996 if !already_connected {
1997 effects.push(Effect::ConnectMcpServer {
1998 session_id,
1999 config: McpServerConfig {
2000 server_name: server_name.clone(),
2001 transport: transport.clone(),
2002 tool_filter: tool_filter.clone(),
2003 },
2004 });
2005 }
2006 }
2007
2008 effects
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013 use super::*;
2014 use crate::app::domain::state::{OperationState, PendingApproval};
2015 use crate::app::domain::types::{
2016 MessageId, NonEmptyString, OpId, RequestId, SessionId, ToolCallId,
2017 };
2018 use crate::config::model::builtin;
2019 use crate::primary_agents::resolve_effective_config;
2020 use crate::session::state::{
2021 ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2022 ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2023 };
2024 use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2025 use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2026 use schemars::schema_for;
2027 use serde_json::json;
2028 use std::collections::HashSet;
2029 use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2030
2031 fn test_state() -> AppState {
2032 AppState::new(SessionId::new())
2033 }
2034
2035 fn test_schema(name: &str) -> ToolSchema {
2036 ToolSchema {
2037 name: name.to_string(),
2038 display_name: name.to_string(),
2039 description: String::new(),
2040 input_schema: InputSchema::empty_object(),
2041 }
2042 }
2043
2044 fn base_session_config() -> SessionConfig {
2045 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2046 config.primary_agent_id = Some("normal".to_string());
2047 config.policy_overrides = SessionPolicyOverrides::empty();
2048 resolve_effective_config(&config)
2049 }
2050
2051 fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2052 super::reduce(state, action).expect("reduce failed")
2053 }
2054
2055 #[test]
2056 fn test_user_input_starts_operation() {
2057 let mut state = test_state();
2058 let session_id = state.session_id;
2059 let op_id = OpId::new();
2060 let message_id = MessageId::new();
2061 let model = builtin::claude_sonnet_4_5();
2062
2063 let effects = reduce(
2064 &mut state,
2065 Action::UserInput {
2066 session_id,
2067 text: NonEmptyString::new("Hello").unwrap(),
2068 op_id,
2069 message_id,
2070 model,
2071 timestamp: 1_234_567_890,
2072 },
2073 );
2074
2075 assert_eq!(state.message_graph.messages.len(), 1);
2076 assert!(state.current_operation.is_some());
2077 assert!(
2078 effects
2079 .iter()
2080 .any(|e| matches!(e, Effect::CallModel { .. }))
2081 );
2082 }
2083
2084 #[test]
2085 fn test_switch_primary_agent_updates_visibility() {
2086 let mut state = test_state();
2087 let session_id = state.session_id;
2088 let config = base_session_config();
2089 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2090
2091 let effects = reduce(
2092 &mut state,
2093 Action::SwitchPrimaryAgent {
2094 session_id,
2095 agent_id: "plan".to_string(),
2096 },
2097 );
2098
2099 let updated = state.session_config.as_ref().expect("config");
2100 match &updated.tool_config.visibility {
2101 ToolVisibility::Whitelist(allowed) => {
2102 assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2103 for name in READ_ONLY_TOOL_NAMES {
2104 assert!(allowed.contains(*name));
2105 }
2106 assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2107 }
2108 other => panic!("Unexpected tool visibility: {other:?}"),
2109 }
2110 assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2111 assert!(effects.iter().any(|e| matches!(
2112 e,
2113 Effect::EmitEvent {
2114 event: SessionEvent::SessionConfigUpdated { .. },
2115 ..
2116 }
2117 )));
2118 assert!(
2119 effects
2120 .iter()
2121 .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2122 );
2123 }
2124
2125 #[test]
2126 fn test_switch_primary_agent_yolo_auto_approves() {
2127 let mut state = test_state();
2128 let session_id = state.session_id;
2129 let config = base_session_config();
2130 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2131
2132 let _ = reduce(
2133 &mut state,
2134 Action::SwitchPrimaryAgent {
2135 session_id,
2136 agent_id: "yolo".to_string(),
2137 },
2138 );
2139
2140 let updated = state.session_config.as_ref().expect("config");
2141 assert_eq!(
2142 updated.tool_config.approval_policy.default_behavior,
2143 UnapprovedBehavior::Allow
2144 );
2145 }
2146
2147 #[test]
2148 fn test_switch_primary_agent_preserves_policy_overrides() {
2149 let mut state = test_state();
2150 let session_id = state.session_id;
2151
2152 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2153 config.primary_agent_id = Some("normal".to_string());
2154 config.policy_overrides = SessionPolicyOverrides {
2155 default_model: None,
2156 tool_visibility: None,
2157 approval_policy: ToolApprovalPolicyOverrides {
2158 default_behavior: Some(UnapprovedBehavior::Deny),
2159 preapproved: ApprovalRulesOverrides::empty(),
2160 },
2161 };
2162 let config = resolve_effective_config(&config);
2163 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2164
2165 let _ = reduce(
2166 &mut state,
2167 Action::SwitchPrimaryAgent {
2168 session_id,
2169 agent_id: "yolo".to_string(),
2170 },
2171 );
2172
2173 let updated = state.session_config.as_ref().expect("config");
2174 assert_eq!(
2175 updated.tool_config.approval_policy.default_behavior,
2176 UnapprovedBehavior::Deny
2177 );
2178 assert_eq!(
2179 updated.policy_overrides.approval_policy.default_behavior,
2180 Some(UnapprovedBehavior::Deny)
2181 );
2182 }
2183
2184 #[test]
2185 fn dispatch_agent_resume_is_auto_approved() {
2186 let mut state = test_state();
2187 let session_id = state.session_id;
2188 let config = base_session_config();
2189 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2190
2191 let tool_call = ToolCall {
2192 id: "tc_dispatch_resume".to_string(),
2193 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2194 parameters: json!({
2195 "prompt": "resume work",
2196 "target": {
2197 "session": "resume",
2198 "session_id": SessionId::new().to_string()
2199 }
2200 }),
2201 };
2202
2203 let decision = get_tool_decision(&state, &tool_call);
2204 assert_eq!(decision, ToolDecision::Allow);
2205 assert_eq!(state.session_id, session_id);
2206 }
2207
2208 #[test]
2209 fn test_switch_primary_agent_restores_base_prompt() {
2210 let mut state = test_state();
2211 let session_id = state.session_id;
2212 let mut config = base_session_config();
2213 config.system_prompt = Some("base prompt".to_string());
2214 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2215
2216 let _ = reduce(
2217 &mut state,
2218 Action::SwitchPrimaryAgent {
2219 session_id,
2220 agent_id: "plan".to_string(),
2221 },
2222 );
2223
2224 let _ = reduce(
2225 &mut state,
2226 Action::SwitchPrimaryAgent {
2227 session_id,
2228 agent_id: "normal".to_string(),
2229 },
2230 );
2231
2232 let updated = state.session_config.as_ref().expect("config");
2233 assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2234 }
2235
2236 #[test]
2237 fn test_switch_primary_agent_blocked_during_operation() {
2238 let mut state = test_state();
2239 let session_id = state.session_id;
2240 let config = base_session_config();
2241 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2242
2243 state.current_operation = Some(OperationState {
2244 op_id: OpId::new(),
2245 kind: OperationKind::AgentLoop,
2246 pending_tool_calls: HashSet::new(),
2247 });
2248
2249 let result = super::reduce(
2250 &mut state,
2251 Action::SwitchPrimaryAgent {
2252 session_id,
2253 agent_id: "plan".to_string(),
2254 },
2255 );
2256
2257 assert!(matches!(
2258 result,
2259 Err(ReduceError::InvalidAction {
2260 kind: InvalidActionKind::OperationInFlight,
2261 ..
2262 })
2263 ));
2264 assert!(state.primary_agent_id.as_deref() == Some("normal"));
2265 }
2266
2267 #[test]
2268 fn test_late_result_ignored_after_cancel() {
2269 let mut state = test_state();
2270 let session_id = state.session_id;
2271 let op_id = OpId::new();
2272 let tool_call_id = ToolCallId::from_string("tc_1");
2273
2274 state.current_operation = Some(OperationState {
2275 op_id,
2276 kind: OperationKind::AgentLoop,
2277 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2278 });
2279
2280 let _ = reduce(
2281 &mut state,
2282 Action::Cancel {
2283 session_id,
2284 op_id: None,
2285 },
2286 );
2287
2288 state.current_operation = Some(OperationState {
2289 op_id,
2290 kind: OperationKind::AgentLoop,
2291 pending_tool_calls: HashSet::new(),
2292 });
2293 state
2294 .operation_models
2295 .insert(op_id, builtin::claude_sonnet_4_5());
2296 state
2297 .operation_models
2298 .insert(op_id, builtin::claude_sonnet_4_5());
2299 state
2300 .operation_models
2301 .insert(op_id, builtin::claude_sonnet_4_5());
2302
2303 let effects = reduce(
2304 &mut state,
2305 Action::ToolResult {
2306 session_id,
2307 tool_call_id,
2308 tool_name: "test".to_string(),
2309 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2310 tool_name: "test".to_string(),
2311 payload: "done".to_string(),
2312 })),
2313 },
2314 );
2315
2316 assert!(effects.is_empty());
2317 }
2318
2319 #[test]
2320 fn test_pre_approved_tool_executes_immediately() {
2321 let mut state = test_state();
2322 let session_id = state.session_id;
2323 let op_id = OpId::new();
2324
2325 state.approved_tools.insert("test_tool".to_string());
2326 state.current_operation = Some(OperationState {
2327 op_id,
2328 kind: OperationKind::AgentLoop,
2329 pending_tool_calls: HashSet::new(),
2330 });
2331 state
2332 .operation_models
2333 .insert(op_id, builtin::claude_sonnet_4_5());
2334 state
2335 .operation_models
2336 .insert(op_id, builtin::claude_sonnet_4_5());
2337 state
2338 .operation_models
2339 .insert(op_id, builtin::claude_sonnet_4_5());
2340
2341 let tool_call = steer_tools::ToolCall {
2342 id: "tc_1".to_string(),
2343 name: "test_tool".to_string(),
2344 parameters: serde_json::json!({}),
2345 };
2346
2347 let effects = reduce(
2348 &mut state,
2349 Action::ToolApprovalRequested {
2350 session_id,
2351 request_id: RequestId::new(),
2352 tool_call,
2353 },
2354 );
2355
2356 assert!(
2357 effects
2358 .iter()
2359 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2360 );
2361 assert!(state.pending_approval.is_none());
2362 }
2363
2364 #[test]
2365 fn test_denied_tool_request_emits_failure_message() {
2366 let mut state = test_state();
2367 let session_id = state.session_id;
2368 let op_id = OpId::new();
2369
2370 state.current_operation = Some(OperationState {
2371 op_id,
2372 kind: OperationKind::AgentLoop,
2373 pending_tool_calls: HashSet::new(),
2374 });
2375
2376 state
2377 .operation_models
2378 .insert(op_id, builtin::claude_sonnet_4_5());
2379
2380 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2381 config.tool_config.approval_policy = ToolApprovalPolicy {
2382 default_behavior: UnapprovedBehavior::Deny,
2383 preapproved: ApprovalRules::default(),
2384 };
2385 state.session_config = Some(config);
2386
2387 let tool_call = steer_tools::ToolCall {
2388 id: "tc_1".to_string(),
2389 name: "test_tool".to_string(),
2390 parameters: serde_json::json!({}),
2391 };
2392
2393 let effects = reduce(
2394 &mut state,
2395 Action::ToolApprovalRequested {
2396 session_id,
2397 request_id: RequestId::new(),
2398 tool_call,
2399 },
2400 );
2401
2402 assert!(effects.iter().any(|e| matches!(
2403 e,
2404 Effect::EmitEvent {
2405 event: SessionEvent::ToolCallFailed { .. },
2406 ..
2407 }
2408 )));
2409 assert!(effects.iter().any(|e| matches!(
2410 e,
2411 Effect::EmitEvent {
2412 event: SessionEvent::ToolMessageAdded { .. },
2413 ..
2414 }
2415 )));
2416 assert!(
2417 !effects
2418 .iter()
2419 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2420 );
2421 assert!(
2422 !effects
2423 .iter()
2424 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2425 );
2426 assert!(state.pending_approval.is_none());
2427 assert!(state.approval_queue.is_empty());
2428 assert_eq!(state.message_graph.messages.len(), 1);
2429
2430 match &state.message_graph.messages[0].data {
2431 MessageData::Tool { result, .. } => match result {
2432 ToolResult::Error(error) => {
2433 assert!(
2434 matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2435 );
2436 }
2437 _ => panic!("expected denied tool error"),
2438 },
2439 _ => panic!("expected tool message"),
2440 }
2441 }
2442
2443 #[test]
2444 fn test_user_denied_tool_request_emits_failure_message() {
2445 let mut state = test_state();
2446 let session_id = state.session_id;
2447 let op_id = OpId::new();
2448
2449 state.current_operation = Some(OperationState {
2450 op_id,
2451 kind: OperationKind::AgentLoop,
2452 pending_tool_calls: HashSet::new(),
2453 });
2454 state
2455 .operation_models
2456 .insert(op_id, builtin::claude_sonnet_4_5());
2457
2458 let tool_call = steer_tools::ToolCall {
2459 id: "tc_1".to_string(),
2460 name: "test_tool".to_string(),
2461 parameters: serde_json::json!({}),
2462 };
2463 let request_id = RequestId::new();
2464 state.pending_approval = Some(PendingApproval {
2465 request_id,
2466 tool_call: tool_call.clone(),
2467 });
2468
2469 let effects = reduce(
2470 &mut state,
2471 Action::ToolApprovalDecided {
2472 session_id,
2473 request_id,
2474 decision: ApprovalDecision::Denied,
2475 remember: None,
2476 },
2477 );
2478
2479 assert!(effects.iter().any(|e| matches!(
2480 e,
2481 Effect::EmitEvent {
2482 event: SessionEvent::ToolCallFailed { .. },
2483 ..
2484 }
2485 )));
2486 assert!(effects.iter().any(|e| matches!(
2487 e,
2488 Effect::EmitEvent {
2489 event: SessionEvent::ToolMessageAdded { .. },
2490 ..
2491 }
2492 )));
2493 assert!(
2494 !effects
2495 .iter()
2496 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2497 );
2498 assert!(state.pending_approval.is_none());
2499 assert!(state.approval_queue.is_empty());
2500 assert_eq!(state.message_graph.messages.len(), 1);
2501
2502 match &state.message_graph.messages[0].data {
2503 MessageData::Tool { result, .. } => match result {
2504 ToolResult::Error(error) => {
2505 assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2506 }
2507 _ => panic!("expected denied tool error"),
2508 },
2509 _ => panic!("expected tool message"),
2510 }
2511 }
2512
2513 #[test]
2514 fn test_cancel_pops_queued_item_without_auto_start() {
2515 let mut state = test_state();
2516 let session_id = state.session_id;
2517 let op_id = OpId::new();
2518
2519 state.current_operation = Some(OperationState {
2520 op_id,
2521 kind: OperationKind::AgentLoop,
2522 pending_tool_calls: HashSet::new(),
2523 });
2524 state
2525 .operation_models
2526 .insert(op_id, builtin::claude_sonnet_4_5());
2527
2528 let queued_op = OpId::new();
2529 let queued_message_id = MessageId::from_string("queued_msg");
2530 let _ = reduce(
2531 &mut state,
2532 Action::UserInput {
2533 session_id,
2534 text: NonEmptyString::new("Queued message").expect("non-empty"),
2535 op_id: queued_op,
2536 message_id: queued_message_id.clone(),
2537 model: builtin::claude_sonnet_4_5(),
2538 timestamp: 1,
2539 },
2540 );
2541
2542 let effects = reduce(
2543 &mut state,
2544 Action::Cancel {
2545 session_id,
2546 op_id: None,
2547 },
2548 );
2549
2550 assert!(state.current_operation.is_none());
2551 assert!(state.queued_work.is_empty());
2552
2553 let cancellation_info = effects.iter().find_map(|effect| match effect {
2554 Effect::EmitEvent {
2555 event: SessionEvent::OperationCancelled { info, .. },
2556 ..
2557 } => Some(info),
2558 _ => None,
2559 });
2560 let info = cancellation_info.expect("expected OperationCancelled event");
2561 let popped = info
2562 .popped_queued_item
2563 .as_ref()
2564 .expect("expected popped queued item");
2565 assert_eq!(popped.content, "Queued message");
2566 assert_eq!(popped.op_id, queued_op);
2567 assert_eq!(popped.message_id, queued_message_id);
2568
2569 assert!(
2570 !effects.iter().any(|effect| matches!(
2571 effect,
2572 Effect::EmitEvent {
2573 event: SessionEvent::OperationStarted { .. },
2574 ..
2575 }
2576 )),
2577 "queued work should not auto-start on cancel"
2578 );
2579 }
2580
2581 #[test]
2582 fn test_cancel_injects_tool_results_for_pending_calls() {
2583 let mut state = test_state();
2584 let session_id = state.session_id;
2585 let op_id = OpId::new();
2586
2587 let tool_call = ToolCall {
2588 id: "tc_1".to_string(),
2589 name: "test_tool".to_string(),
2590 parameters: serde_json::json!({}),
2591 };
2592
2593 state.message_graph.add_message(Message {
2594 data: MessageData::Assistant {
2595 content: vec![AssistantContent::ToolCall {
2596 tool_call: tool_call.clone(),
2597 thought_signature: None,
2598 }],
2599 },
2600 timestamp: 0,
2601 id: "msg_1".to_string(),
2602 parent_message_id: None,
2603 });
2604
2605 state.current_operation = Some(OperationState {
2606 op_id,
2607 kind: OperationKind::AgentLoop,
2608 pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2609 });
2610 state
2611 .operation_models
2612 .insert(op_id, builtin::claude_sonnet_4_5());
2613
2614 let effects = reduce(
2615 &mut state,
2616 Action::Cancel {
2617 session_id,
2618 op_id: None,
2619 },
2620 );
2621
2622 assert!(effects.iter().any(|e| matches!(
2623 e,
2624 Effect::EmitEvent {
2625 event: SessionEvent::ToolMessageAdded { .. },
2626 ..
2627 }
2628 )));
2629
2630 let tool_message = state
2631 .message_graph
2632 .messages
2633 .iter()
2634 .find(|message| matches!(message.data, MessageData::Tool { .. }))
2635 .expect("tool result should be injected on cancel");
2636
2637 match &tool_message.data {
2638 MessageData::Tool { result, .. } => match result {
2639 ToolResult::Error(error) => {
2640 assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2641 }
2642 _ => panic!("expected cancelled tool error"),
2643 },
2644 _ => panic!("expected tool message"),
2645 }
2646 }
2647
2648 #[test]
2649 fn test_malformed_tool_call_auto_denies() {
2650 let mut state = test_state();
2651 let session_id = state.session_id;
2652 let op_id = OpId::new();
2653
2654 state.current_operation = Some(OperationState {
2655 op_id,
2656 kind: OperationKind::AgentLoop,
2657 pending_tool_calls: HashSet::new(),
2658 });
2659
2660 state
2661 .operation_models
2662 .insert(op_id, builtin::claude_sonnet_4_5());
2663
2664 let mut properties = serde_json::Map::new();
2665 properties.insert("command".to_string(), json!({ "type": "string" }));
2666
2667 state.tools.push(ToolSchema {
2668 name: "test_tool".to_string(),
2669 display_name: "test_tool".to_string(),
2670 description: String::new(),
2671 input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2672 });
2673
2674 let tool_call = ToolCall {
2675 id: "tc_1".to_string(),
2676 name: "test_tool".to_string(),
2677 parameters: json!({}),
2678 };
2679
2680 let effects = reduce(
2681 &mut state,
2682 Action::ToolApprovalRequested {
2683 session_id,
2684 request_id: RequestId::new(),
2685 tool_call,
2686 },
2687 );
2688
2689 assert!(effects.iter().any(|e| matches!(
2690 e,
2691 Effect::EmitEvent {
2692 event: SessionEvent::ToolCallFailed { .. },
2693 ..
2694 }
2695 )));
2696 assert!(effects.iter().any(|e| matches!(
2697 e,
2698 Effect::EmitEvent {
2699 event: SessionEvent::ToolMessageAdded { .. },
2700 ..
2701 }
2702 )));
2703 assert!(
2704 !effects
2705 .iter()
2706 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2707 );
2708 assert!(
2709 !effects
2710 .iter()
2711 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2712 );
2713 assert!(state.pending_approval.is_none());
2714 assert!(state.approval_queue.is_empty());
2715 assert_eq!(state.message_graph.messages.len(), 1);
2716
2717 match &state.message_graph.messages[0].data {
2718 MessageData::Tool { result, .. } => match result {
2719 ToolResult::Error(error) => {
2720 assert!(matches!(error, ToolError::InvalidParams { .. }));
2721 }
2722 _ => panic!("expected invalid params tool error"),
2723 },
2724 _ => panic!("expected tool message"),
2725 }
2726 }
2727
2728 #[test]
2729 fn test_approval_queuing() {
2730 let mut state = test_state();
2731 let session_id = state.session_id;
2732 let op_id = OpId::new();
2733
2734 state.current_operation = Some(OperationState {
2735 op_id,
2736 kind: OperationKind::AgentLoop,
2737 pending_tool_calls: HashSet::new(),
2738 });
2739
2740 let tool_call_1 = steer_tools::ToolCall {
2741 id: "tc_1".to_string(),
2742 name: "tool_1".to_string(),
2743 parameters: serde_json::json!({}),
2744 };
2745 let tool_call_2 = steer_tools::ToolCall {
2746 id: "tc_2".to_string(),
2747 name: "tool_2".to_string(),
2748 parameters: serde_json::json!({}),
2749 };
2750
2751 let _ = reduce(
2752 &mut state,
2753 Action::ToolApprovalRequested {
2754 session_id,
2755 request_id: RequestId::new(),
2756 tool_call: tool_call_1,
2757 },
2758 );
2759
2760 assert!(state.pending_approval.is_some());
2761
2762 let _ = reduce(
2763 &mut state,
2764 Action::ToolApprovalRequested {
2765 session_id,
2766 request_id: RequestId::new(),
2767 tool_call: tool_call_2,
2768 },
2769 );
2770
2771 assert_eq!(state.approval_queue.len(), 1);
2772 }
2773
2774 #[test]
2775 fn test_dispatch_agent_missing_target_auto_denies() {
2776 let mut state = test_state();
2777 let session_id = state.session_id;
2778 let op_id = OpId::new();
2779
2780 state.current_operation = Some(OperationState {
2781 op_id,
2782 kind: OperationKind::AgentLoop,
2783 pending_tool_calls: HashSet::new(),
2784 });
2785
2786 state
2787 .operation_models
2788 .insert(op_id, builtin::claude_sonnet_4_5());
2789
2790 let input_schema: InputSchema =
2791 schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2792 state.tools.push(ToolSchema {
2793 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2794 display_name: "Dispatch Agent".to_string(),
2795 description: String::new(),
2796 input_schema,
2797 });
2798
2799 let tool_call = ToolCall {
2800 id: "tc_dispatch".to_string(),
2801 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2802 parameters: json!({ "prompt": "hello world" }),
2803 };
2804
2805 let effects = reduce(
2806 &mut state,
2807 Action::ToolApprovalRequested {
2808 session_id,
2809 request_id: RequestId::new(),
2810 tool_call,
2811 },
2812 );
2813
2814 assert!(effects.iter().any(|e| matches!(
2815 e,
2816 Effect::EmitEvent {
2817 event: SessionEvent::ToolCallFailed { .. },
2818 ..
2819 }
2820 )));
2821 assert!(effects.iter().any(|e| matches!(
2822 e,
2823 Effect::EmitEvent {
2824 event: SessionEvent::ToolMessageAdded { .. },
2825 ..
2826 }
2827 )));
2828 assert!(
2829 !effects
2830 .iter()
2831 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2832 );
2833 assert!(state.pending_approval.is_none());
2834 assert!(state.approval_queue.is_empty());
2835
2836 match &state.message_graph.messages[0].data {
2837 MessageData::Tool { result, .. } => match result {
2838 ToolResult::Error(error) => {
2839 assert!(matches!(error, ToolError::InvalidParams { .. }));
2840 }
2841 _ => panic!("expected invalid params tool error"),
2842 },
2843 _ => panic!("expected tool message"),
2844 }
2845 }
2846
2847 #[test]
2848 fn test_model_response_with_tool_calls_requests_approval() {
2849 let mut state = test_state();
2850 let session_id = state.session_id;
2851 let op_id = OpId::new();
2852 let message_id = MessageId::new();
2853
2854 state.current_operation = Some(OperationState {
2855 op_id,
2856 kind: OperationKind::AgentLoop,
2857 pending_tool_calls: HashSet::new(),
2858 });
2859 state
2860 .operation_models
2861 .insert(op_id, builtin::claude_sonnet_4_5());
2862
2863 let tool_call = steer_tools::ToolCall {
2864 id: "tc_1".to_string(),
2865 name: "bash".to_string(),
2866 parameters: serde_json::json!({"command": "ls"}),
2867 };
2868
2869 let content = vec![
2870 AssistantContent::Text {
2871 text: "Let me list the files.".to_string(),
2872 },
2873 AssistantContent::ToolCall {
2874 tool_call: tool_call.clone(),
2875 thought_signature: None,
2876 },
2877 ];
2878
2879 let effects = reduce(
2880 &mut state,
2881 Action::ModelResponseComplete {
2882 session_id,
2883 op_id,
2884 message_id,
2885 content,
2886 timestamp: 12345,
2887 },
2888 );
2889
2890 assert!(state.pending_approval.is_some());
2891 assert!(
2892 effects
2893 .iter()
2894 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2895 );
2896 assert!(state.current_operation.is_some());
2897 }
2898
2899 #[test]
2900 fn test_model_response_no_tools_completes_operation() {
2901 let mut state = test_state();
2902 let session_id = state.session_id;
2903 let op_id = OpId::new();
2904 let message_id = MessageId::new();
2905
2906 state.current_operation = Some(OperationState {
2907 op_id,
2908 kind: OperationKind::AgentLoop,
2909 pending_tool_calls: HashSet::new(),
2910 });
2911 state
2912 .operation_models
2913 .insert(op_id, builtin::claude_sonnet_4_5());
2914
2915 let content = vec![AssistantContent::Text {
2916 text: "Hello! How can I help?".to_string(),
2917 }];
2918
2919 let effects = reduce(
2920 &mut state,
2921 Action::ModelResponseComplete {
2922 session_id,
2923 op_id,
2924 message_id,
2925 content,
2926 timestamp: 12345,
2927 },
2928 );
2929
2930 assert!(state.current_operation.is_none());
2931 assert!(effects.iter().any(|e| matches!(
2932 e,
2933 Effect::EmitEvent {
2934 event: SessionEvent::OperationCompleted { .. },
2935 ..
2936 }
2937 )));
2938 }
2939
2940 #[test]
2941 fn test_out_of_order_completion_preserves_newer_operation() {
2942 let mut state = test_state();
2943 let session_id = state.session_id;
2944 let model = builtin::claude_sonnet_4_5();
2945
2946 let op_a = OpId::new();
2947 let op_b = OpId::new();
2948
2949 let _ = reduce(
2950 &mut state,
2951 Action::UserInput {
2952 session_id,
2953 text: NonEmptyString::new("first").unwrap(),
2954 op_id: op_a,
2955 message_id: MessageId::new(),
2956 model: model.clone(),
2957 timestamp: 1,
2958 },
2959 );
2960
2961 let _ = reduce(
2962 &mut state,
2963 Action::UserInput {
2964 session_id,
2965 text: NonEmptyString::new("second").unwrap(),
2966 op_id: op_b,
2967 message_id: MessageId::new(),
2968 model: model.clone(),
2969 timestamp: 2,
2970 },
2971 );
2972
2973 let _ = reduce(
2974 &mut state,
2975 Action::ModelResponseComplete {
2976 session_id,
2977 op_id: op_a,
2978 message_id: MessageId::new(),
2979 content: vec![AssistantContent::Text {
2980 text: "done A".to_string(),
2981 }],
2982 timestamp: 3,
2983 },
2984 );
2985
2986 assert!(
2987 state
2988 .current_operation
2989 .as_ref()
2990 .is_some_and(|op| op.op_id == op_b)
2991 );
2992 assert!(state.operation_models.contains_key(&op_b));
2993 assert!(!state.operation_models.contains_key(&op_a));
2994
2995 let effects = reduce(
2996 &mut state,
2997 Action::ModelResponseComplete {
2998 session_id,
2999 op_id: op_b,
3000 message_id: MessageId::new(),
3001 content: vec![AssistantContent::Text {
3002 text: "done B".to_string(),
3003 }],
3004 timestamp: 4,
3005 },
3006 );
3007
3008 assert!(effects.iter().any(|e| matches!(
3009 e,
3010 Effect::EmitEvent {
3011 event: SessionEvent::OperationCompleted { op_id },
3012 ..
3013 } if *op_id == op_b
3014 )));
3015 assert!(!effects.iter().any(|e| matches!(
3016 e,
3017 Effect::EmitEvent {
3018 event: SessionEvent::Error { message },
3019 ..
3020 } if message.contains("Missing model for operation")
3021 )));
3022 }
3023
3024 #[test]
3025 fn test_tool_approval_does_not_call_model_before_result() {
3026 let mut state = test_state();
3027 let session_id = state.session_id;
3028 let op_id = OpId::new();
3029
3030 state.current_operation = Some(OperationState {
3031 op_id,
3032 kind: OperationKind::AgentLoop,
3033 pending_tool_calls: HashSet::new(),
3034 });
3035 state
3036 .operation_models
3037 .insert(op_id, builtin::claude_sonnet_4_5());
3038
3039 let tool_call = steer_tools::ToolCall {
3040 id: "tc_1".to_string(),
3041 name: "bash".to_string(),
3042 parameters: serde_json::json!({"command": "ls"}),
3043 };
3044 let request_id = RequestId::new();
3045 state.pending_approval = Some(PendingApproval {
3046 request_id,
3047 tool_call: tool_call.clone(),
3048 });
3049
3050 let effects = reduce(
3051 &mut state,
3052 Action::ToolApprovalDecided {
3053 session_id,
3054 request_id,
3055 decision: ApprovalDecision::Approved,
3056 remember: None,
3057 },
3058 );
3059
3060 assert!(
3061 effects
3062 .iter()
3063 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3064 );
3065 assert!(
3066 !effects
3067 .iter()
3068 .any(|e| matches!(e, Effect::CallModel { .. }))
3069 );
3070 assert!(state.current_operation.as_ref().is_some_and(|op| {
3071 op.pending_tool_calls
3072 .contains(&ToolCallId::from_string("tc_1"))
3073 }));
3074 }
3075
3076 #[test]
3077 fn test_mcp_tool_visibility_and_disconnect_removal() {
3078 let mut state = test_state();
3079 let session_id = state.session_id;
3080
3081 let mut allowed = HashSet::new();
3082 allowed.insert("mcp__alpha__allowed".to_string());
3083
3084 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3085 config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3086 state.session_config = Some(config);
3087
3088 state.tools.push(test_schema("bash"));
3089
3090 let _ = reduce(
3091 &mut state,
3092 Action::McpServerStateChanged {
3093 session_id,
3094 server_name: "alpha".to_string(),
3095 state: McpServerState::Connected {
3096 tools: vec![
3097 test_schema("mcp__alpha__allowed"),
3098 test_schema("mcp__alpha__blocked"),
3099 ],
3100 },
3101 },
3102 );
3103
3104 assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3105 assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3106
3107 let _ = reduce(
3108 &mut state,
3109 Action::McpServerStateChanged {
3110 session_id,
3111 server_name: "alpha".to_string(),
3112 state: McpServerState::Disconnected { error: None },
3113 },
3114 );
3115
3116 assert!(
3117 !state
3118 .tools
3119 .iter()
3120 .any(|t| t.name.starts_with("mcp__alpha__"))
3121 );
3122 assert!(state.tools.iter().any(|t| t.name == "bash"));
3123 }
3124
3125 #[test]
3126 fn test_tool_result_continues_agent_loop() {
3127 let mut state = test_state();
3128 let session_id = state.session_id;
3129 let op_id = OpId::new();
3130 let tool_call_id = ToolCallId::from_string("tc_1");
3131
3132 state.current_operation = Some(OperationState {
3133 op_id,
3134 kind: OperationKind::AgentLoop,
3135 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3136 });
3137 state
3138 .operation_models
3139 .insert(op_id, builtin::claude_sonnet_4_5());
3140
3141 let effects = reduce(
3142 &mut state,
3143 Action::ToolResult {
3144 session_id,
3145 tool_call_id,
3146 tool_name: "bash".to_string(),
3147 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3148 tool_name: "bash".to_string(),
3149 payload: "file1.txt\nfile2.txt".to_string(),
3150 })),
3151 },
3152 );
3153
3154 assert!(
3155 effects
3156 .iter()
3157 .any(|e| matches!(e, Effect::CallModel { .. }))
3158 );
3159 }
3160
3161 #[test]
3162 fn test_tool_result_waits_for_pending_tools() {
3163 let mut state = test_state();
3164 let session_id = state.session_id;
3165 let op_id = OpId::new();
3166 let tool_call_id_1 = ToolCallId::from_string("tc_1");
3167 let tool_call_id_2 = ToolCallId::from_string("tc_2");
3168
3169 state.current_operation = Some(OperationState {
3170 op_id,
3171 kind: OperationKind::AgentLoop,
3172 pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3173 .into_iter()
3174 .collect(),
3175 });
3176 state
3177 .operation_models
3178 .insert(op_id, builtin::claude_sonnet_4_5());
3179
3180 let effects = reduce(
3181 &mut state,
3182 Action::ToolResult {
3183 session_id,
3184 tool_call_id: tool_call_id_1,
3185 tool_name: "bash".to_string(),
3186 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3187 tool_name: "bash".to_string(),
3188 payload: "done".to_string(),
3189 })),
3190 },
3191 );
3192
3193 assert!(
3194 !effects
3195 .iter()
3196 .any(|e| matches!(e, Effect::CallModel { .. }))
3197 );
3198
3199 let effects = reduce(
3200 &mut state,
3201 Action::ToolResult {
3202 session_id,
3203 tool_call_id: tool_call_id_2,
3204 tool_name: "bash".to_string(),
3205 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3206 tool_name: "bash".to_string(),
3207 payload: "done".to_string(),
3208 })),
3209 },
3210 );
3211
3212 assert!(
3213 effects
3214 .iter()
3215 .any(|e| matches!(e, Effect::CallModel { .. }))
3216 );
3217 }
3218}