1use crate::agents::default_agent_spec_id;
2use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
3use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
4use crate::app::domain::effect::{Effect, McpServerConfig};
5use crate::app::domain::event::{
6 CancellationInfo, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
7};
8use crate::app::domain::state::{
9 AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
10};
11use crate::app::domain::types::NonEmptyString;
12use crate::primary_agents::{
13 default_primary_agent_id, primary_agent_spec, resolve_effective_config,
14};
15use crate::session::state::{BackendConfig, ToolDecision};
16use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
17use serde_json::Value;
18use steer_tools::ToolError;
19use steer_tools::result::ToolResult;
20use steer_tools::tools::BASH_TOOL_NAME;
21use thiserror::Error;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum InvalidActionKind {
25 OperationInFlight,
26 MissingSessionConfig,
27 UnknownPrimaryAgent,
28 QueueEmpty,
29}
30
31#[derive(Debug, Error)]
32pub enum ReduceError {
33 #[error("{message}")]
34 InvalidAction {
35 message: String,
36 kind: InvalidActionKind,
37 },
38 #[error("Invariant violated: {message}")]
39 Invariant { message: String },
40}
41
42fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
43 ReduceError::InvalidAction {
44 message: message.into(),
45 kind,
46 }
47}
48
49pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
50 match action {
51 Action::UserInput {
52 session_id,
53 text,
54 op_id,
55 message_id,
56 model,
57 timestamp,
58 } => Ok(handle_user_input(
59 state, session_id, text, op_id, message_id, model, timestamp,
60 )),
61
62 Action::UserEditedMessage {
63 session_id,
64 message_id,
65 new_content,
66 op_id,
67 new_message_id,
68 model,
69 timestamp,
70 } => handle_user_edited_message(
71 state,
72 session_id,
73 UserEditedMessageParams {
74 original_message_id: message_id,
75 new_content,
76 op_id,
77 new_message_id,
78 model,
79 timestamp,
80 },
81 ),
82
83 Action::ToolApprovalRequested {
84 session_id,
85 request_id,
86 tool_call,
87 } => Ok(handle_tool_approval_requested(
88 state, session_id, request_id, tool_call,
89 )),
90
91 Action::ToolApprovalDecided {
92 session_id,
93 request_id,
94 decision,
95 remember,
96 } => Ok(handle_tool_approval_decided(
97 state, session_id, request_id, decision, remember,
98 )),
99
100 Action::ToolExecutionStarted {
101 session_id,
102 tool_call_id,
103 tool_name,
104 tool_parameters,
105 } => Ok(handle_tool_execution_started(
106 state,
107 session_id,
108 tool_call_id,
109 tool_name,
110 tool_parameters,
111 )),
112
113 Action::ToolResult {
114 session_id,
115 tool_call_id,
116 tool_name,
117 result,
118 } => Ok(handle_tool_result(
119 state,
120 session_id,
121 tool_call_id,
122 tool_name,
123 result,
124 )),
125
126 Action::ModelResponseComplete {
127 session_id,
128 op_id,
129 message_id,
130 content,
131 timestamp,
132 } => Ok(handle_model_response_complete(
133 state, session_id, op_id, message_id, content, timestamp,
134 )),
135
136 Action::ModelResponseError {
137 session_id,
138 op_id,
139 error,
140 } => Ok(handle_model_response_error(
141 state, session_id, op_id, &error,
142 )),
143
144 Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
145
146 Action::DirectBashCommand {
147 session_id,
148 op_id,
149 message_id,
150 command,
151 timestamp,
152 } => Ok(handle_direct_bash(
153 state, session_id, op_id, message_id, command, timestamp,
154 )),
155
156 Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
157
158 Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
159
160 Action::RequestCompaction {
161 session_id,
162 op_id,
163 model,
164 } => handle_request_compaction(state, session_id, op_id, model),
165
166 Action::Hydrate {
167 session_id,
168 events,
169 starting_sequence,
170 } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
171
172 Action::WorkspaceFilesListed { files, .. } => {
173 state.workspace_files = files;
174 Ok(vec![])
175 }
176
177 Action::ToolSchemasAvailable { tools, .. } => {
178 state.tools = tools;
179 Ok(vec![])
180 }
181
182 Action::ToolSchemasUpdated { schemas, .. } => {
183 state.tools = schemas;
184 Ok(vec![])
185 }
186
187 Action::SwitchPrimaryAgent {
188 session_id,
189 agent_id,
190 } => handle_switch_primary_agent(state, session_id, agent_id),
191
192 Action::McpServerStateChanged {
193 session_id,
194 server_name,
195 state: new_state,
196 } => {
197 if let McpServerState::Connected { tools } = &new_state {
199 let tools = state.session_config.as_ref().map_or_else(
200 || tools.clone(),
201 |config| config.filter_tools_by_visibility(tools.clone()),
202 );
203
204 for tool in tools {
206 if !state.tools.iter().any(|t| t.name == tool.name) {
207 state.tools.push(tool.clone());
208 }
209 }
210 }
211
212 if matches!(
214 &new_state,
215 McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
216 ) {
217 let prefix = format!("mcp__{server_name}__");
218 state.tools.retain(|t| !t.name.starts_with(&prefix));
219 }
220
221 state
222 .mcp_servers
223 .insert(server_name.clone(), new_state.clone());
224 Ok(vec![Effect::EmitEvent {
225 session_id,
226 event: SessionEvent::McpServerStateChanged {
227 server_name,
228 state: new_state,
229 },
230 }])
231 }
232
233 Action::CompactionComplete {
234 session_id,
235 op_id,
236 compaction_id,
237 summary_message_id,
238 summary,
239 compacted_head_message_id,
240 previous_active_message_id,
241 model,
242 timestamp,
243 } => Ok(handle_compaction_complete(
244 state,
245 session_id,
246 CompactionCompleteParams {
247 op_id,
248 compaction_id,
249 summary_message_id,
250 summary,
251 compacted_head_message_id,
252 previous_active_message_id,
253 model_name: model,
254 timestamp,
255 },
256 )),
257
258 Action::CompactionFailed {
259 session_id,
260 op_id,
261 error,
262 } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
263
264 Action::Shutdown => Ok(vec![]),
265 }
266}
267
268fn handle_user_input(
269 state: &mut AppState,
270 session_id: crate::app::domain::types::SessionId,
271 text: crate::app::domain::types::NonEmptyString,
272 op_id: crate::app::domain::types::OpId,
273 message_id: crate::app::domain::types::MessageId,
274 model: crate::config::model::ModelId,
275 timestamp: u64,
276) -> Vec<Effect> {
277 let mut effects = Vec::new();
278
279 if state.has_active_operation() {
280 state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
281 text,
282 op_id,
283 message_id,
284 model,
285 queued_at: timestamp,
286 });
287 effects.push(Effect::EmitEvent {
288 session_id,
289 event: SessionEvent::QueueUpdated {
290 queue: snapshot_queue(state),
291 },
292 });
293 return effects;
294 }
295
296 let parent_id = state.message_graph.active_message_id.clone();
297
298 let message = Message {
299 data: MessageData::User {
300 content: vec![UserContent::Text {
301 text: text.as_str().to_string(),
302 }],
303 },
304 timestamp,
305 id: message_id.0.clone(),
306 parent_message_id: parent_id,
307 };
308
309 state.message_graph.add_message(message.clone());
310 state.message_graph.active_message_id = Some(message_id.0.clone());
311
312 state.start_operation(op_id, OperationKind::AgentLoop);
313 state.operation_models.insert(op_id, model.clone());
314
315 effects.push(Effect::EmitEvent {
316 session_id,
317 event: SessionEvent::UserMessageAdded {
318 message: message.clone(),
319 },
320 });
321
322 effects.push(Effect::EmitEvent {
323 session_id,
324 event: SessionEvent::OperationStarted {
325 op_id,
326 kind: OperationKind::AgentLoop,
327 },
328 });
329
330 effects.push(Effect::CallModel {
331 session_id,
332 op_id,
333 model,
334 messages: state
335 .message_graph
336 .get_thread_messages()
337 .into_iter()
338 .cloned()
339 .collect(),
340 system_context: state.cached_system_context.clone(),
341 tools: state.tools.clone(),
342 });
343
344 effects
345}
346
347struct UserEditedMessageParams {
348 original_message_id: crate::app::domain::types::MessageId,
349 new_content: String,
350 op_id: crate::app::domain::types::OpId,
351 new_message_id: crate::app::domain::types::MessageId,
352 model: crate::config::model::ModelId,
353 timestamp: u64,
354}
355
356fn handle_user_edited_message(
357 state: &mut AppState,
358 session_id: crate::app::domain::types::SessionId,
359 params: UserEditedMessageParams,
360) -> Result<Vec<Effect>, ReduceError> {
361 let UserEditedMessageParams {
362 original_message_id,
363 new_content,
364 op_id,
365 new_message_id,
366 model,
367 timestamp,
368 } = params;
369 let mut effects = Vec::new();
370
371 if state.has_active_operation() {
372 return Err(invalid_action(
373 InvalidActionKind::OperationInFlight,
374 "Cannot edit message while an operation is active.",
375 ));
376 }
377
378 let parent_id = state
379 .message_graph
380 .messages
381 .iter()
382 .find(|m| m.id() == original_message_id.0)
383 .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
384
385 let message = Message {
386 data: MessageData::User {
387 content: vec![UserContent::Text { text: new_content }],
388 },
389 timestamp,
390 id: new_message_id.0.clone(),
391 parent_message_id: parent_id,
392 };
393
394 state.message_graph.add_message(message.clone());
395 state.message_graph.active_message_id = Some(new_message_id.0.clone());
396
397 state.start_operation(op_id, OperationKind::AgentLoop);
398 state.operation_models.insert(op_id, model.clone());
399
400 effects.push(Effect::EmitEvent {
401 session_id,
402 event: SessionEvent::UserMessageAdded {
403 message: message.clone(),
404 },
405 });
406
407 effects.push(Effect::EmitEvent {
408 session_id,
409 event: SessionEvent::OperationStarted {
410 op_id,
411 kind: OperationKind::AgentLoop,
412 },
413 });
414
415 effects.push(Effect::CallModel {
416 session_id,
417 op_id,
418 model,
419 messages: state
420 .message_graph
421 .get_thread_messages()
422 .into_iter()
423 .cloned()
424 .collect(),
425 system_context: state.cached_system_context.clone(),
426 tools: state.tools.clone(),
427 });
428
429 Ok(effects)
430}
431
432fn handle_tool_approval_requested(
433 state: &mut AppState,
434 session_id: crate::app::domain::types::SessionId,
435 request_id: crate::app::domain::types::RequestId,
436 tool_call: steer_tools::ToolCall,
437) -> Vec<Effect> {
438 let mut effects = Vec::new();
439
440 if let Err(error) = validate_tool_call(state, &tool_call) {
441 let error_message = error.to_string();
442 return fail_tool_call_without_execution(
443 state,
444 session_id,
445 tool_call,
446 error,
447 error_message,
448 "invalid",
449 true,
450 );
451 }
452
453 let decision = get_tool_decision(state, &tool_call);
454
455 match decision {
456 ToolDecision::Allow => {
457 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
458 return vec![Effect::EmitEvent {
459 session_id,
460 event: SessionEvent::Error {
461 message: "Tool approval requested without active operation".to_string(),
462 },
463 }];
464 };
465 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
466 &tool_call.id,
467 ));
468
469 effects.push(Effect::ExecuteTool {
470 session_id,
471 op_id,
472 tool_call,
473 });
474 }
475 ToolDecision::Deny => {
476 let error = ToolError::DeniedByPolicy(tool_call.name.clone());
477 let tool_name = tool_call.name.clone();
478 effects.extend(fail_tool_call_without_execution(
479 state,
480 session_id,
481 tool_call,
482 error,
483 format!("Tool '{tool_name}' denied by policy"),
484 "denied",
485 true,
486 ));
487 }
488 ToolDecision::Ask => {
489 if state.pending_approval.is_some() {
490 state.approval_queue.push_back(QueuedApproval { tool_call });
491 return effects;
492 }
493
494 state.pending_approval = Some(PendingApproval {
495 request_id,
496 tool_call: tool_call.clone(),
497 });
498
499 effects.push(Effect::EmitEvent {
500 session_id,
501 event: SessionEvent::ApprovalRequested {
502 request_id,
503 tool_call: tool_call.clone(),
504 },
505 });
506
507 effects.push(Effect::RequestUserApproval {
508 session_id,
509 request_id,
510 tool_call,
511 });
512 }
513 }
514
515 effects
516}
517
518fn validate_tool_call(
519 state: &AppState,
520 tool_call: &steer_tools::ToolCall,
521) -> Result<(), ToolError> {
522 if tool_call.name.trim().is_empty() {
523 return Err(ToolError::invalid_params(
524 "unknown",
525 "Malformed tool call: missing tool name",
526 ));
527 }
528
529 if tool_call.id.trim().is_empty() {
530 return Err(ToolError::invalid_params(
531 tool_call.name.clone(),
532 "Malformed tool call: missing tool call id",
533 ));
534 }
535
536 if state.tools.is_empty() {
537 return Ok(());
538 }
539
540 let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
541 return Ok(());
542 };
543
544 validate_against_json_schema(
545 &tool_call.name,
546 schema.input_schema.as_value(),
547 &tool_call.parameters,
548 )
549}
550
551fn validate_against_json_schema(
552 tool_name: &str,
553 schema: &Value,
554 params: &Value,
555) -> Result<(), ToolError> {
556 let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
557 ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
558 })?;
559
560 if let Err(errors) = validator.validate(params) {
561 let message = errors
562 .into_iter()
563 .map(|error| error.to_string())
564 .next()
565 .unwrap_or_else(|| "Parameters do not match schema".to_string());
566 return Err(ToolError::invalid_params(tool_name.to_string(), message));
567 }
568
569 Ok(())
570}
571
572fn emit_tool_failure_message(
573 state: &mut AppState,
574 session_id: crate::app::domain::types::SessionId,
575 tool_call_id: &str,
576 tool_name: &str,
577 tool_error: ToolError,
578 event_error: String,
579 message_id_prefix: &str,
580) -> Vec<Effect> {
581 let mut effects = Vec::new();
582
583 let tool_result = ToolResult::Error(tool_error);
584 let parent_id = state.message_graph.active_message_id.clone();
585 let tool_message = Message {
586 data: MessageData::Tool {
587 tool_use_id: tool_call_id.to_string(),
588 result: tool_result,
589 },
590 timestamp: 0,
591 id: format!("{message_id_prefix}_{tool_call_id}"),
592 parent_message_id: parent_id,
593 };
594 state.message_graph.add_message(tool_message.clone());
595
596 if let Some(model) = state
597 .current_operation
598 .as_ref()
599 .and_then(|op| state.operation_models.get(&op.op_id).cloned())
600 {
601 effects.push(Effect::EmitEvent {
602 session_id,
603 event: SessionEvent::ToolCallFailed {
604 id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
605 name: tool_name.to_string(),
606 error: event_error,
607 model,
608 },
609 });
610 }
611
612 effects.push(Effect::EmitEvent {
613 session_id,
614 event: SessionEvent::ToolMessageAdded {
615 message: tool_message,
616 },
617 });
618
619 effects
620}
621
622fn fail_tool_call_without_execution(
623 state: &mut AppState,
624 session_id: crate::app::domain::types::SessionId,
625 tool_call: steer_tools::ToolCall,
626 tool_error: ToolError,
627 event_error: String,
628 message_id_prefix: &str,
629 call_model_if_ready: bool,
630) -> Vec<Effect> {
631 let mut effects = emit_tool_failure_message(
632 state,
633 session_id,
634 &tool_call.id,
635 &tool_call.name,
636 tool_error,
637 event_error,
638 message_id_prefix,
639 );
640
641 if !call_model_if_ready {
642 return effects;
643 }
644
645 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
646 effects.push(Effect::EmitEvent {
647 session_id,
648 event: SessionEvent::Error {
649 message: "Tool failure recorded without active operation".to_string(),
650 },
651 });
652 return effects;
653 };
654 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
655 model
656 } else {
657 effects.push(Effect::EmitEvent {
658 session_id,
659 event: SessionEvent::Error {
660 message: format!("Missing model for operation {op_id}"),
661 },
662 });
663 return effects;
664 };
665
666 let all_tools_complete = state
667 .current_operation
668 .as_ref()
669 .is_none_or(|op| op.pending_tool_calls.is_empty());
670 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
671
672 if all_tools_complete && no_pending_approvals {
673 effects.push(Effect::CallModel {
674 session_id,
675 op_id,
676 model,
677 messages: state
678 .message_graph
679 .get_thread_messages()
680 .into_iter()
681 .cloned()
682 .collect(),
683 system_context: state.cached_system_context.clone(),
684 tools: state.tools.clone(),
685 });
686 }
687
688 effects
689}
690
691fn handle_tool_approval_decided(
692 state: &mut AppState,
693 session_id: crate::app::domain::types::SessionId,
694 request_id: crate::app::domain::types::RequestId,
695 decision: ApprovalDecision,
696 remember: Option<ApprovalMemory>,
697) -> Vec<Effect> {
698 let mut effects = Vec::new();
699
700 let pending = match state.pending_approval.take() {
701 Some(p) if p.request_id == request_id => p,
702 other => {
703 state.pending_approval = other;
704 return effects;
705 }
706 };
707
708 let resolved_memory = if decision == ApprovalDecision::Approved {
709 match remember {
710 Some(ApprovalMemory::PendingTool) => {
711 Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
712 }
713 Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
714 Some(ApprovalMemory::BashPattern(pattern)) => {
715 Some(ApprovalMemory::BashPattern(pattern))
716 }
717 None => None,
718 }
719 } else {
720 None
721 };
722
723 effects.push(Effect::EmitEvent {
724 session_id,
725 event: SessionEvent::ApprovalDecided {
726 request_id,
727 decision,
728 remember: resolved_memory.clone(),
729 },
730 });
731
732 if decision == ApprovalDecision::Approved {
733 if let Some(ref memory) = resolved_memory {
734 match memory {
735 ApprovalMemory::Tool(name) => {
736 state.approved_tools.insert(name.clone());
737 }
738 ApprovalMemory::BashPattern(pattern) => {
739 state.approved_bash_patterns.insert(pattern.clone());
740 }
741 ApprovalMemory::PendingTool => {}
742 }
743 }
744
745 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
746 effects.push(Effect::EmitEvent {
747 session_id,
748 event: SessionEvent::Error {
749 message: "Tool approval decided without active operation".to_string(),
750 },
751 });
752 return effects;
753 };
754 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
755 &pending.tool_call.id,
756 ));
757
758 effects.push(Effect::ExecuteTool {
759 session_id,
760 op_id,
761 tool_call: pending.tool_call,
762 });
763 } else {
764 let tool_name = pending.tool_call.name.clone();
765 let error = ToolError::DeniedByUser(tool_name.clone());
766 effects.extend(fail_tool_call_without_execution(
767 state,
768 session_id,
769 pending.tool_call,
770 error,
771 format!("Tool '{tool_name}' denied by user"),
772 "denied",
773 false,
774 ));
775 }
776
777 effects.extend(process_next_queued_approval(state, session_id));
778
779 effects
780}
781
782fn process_next_queued_approval(
783 state: &mut AppState,
784 session_id: crate::app::domain::types::SessionId,
785) -> Vec<Effect> {
786 let mut effects = Vec::new();
787
788 while let Some(queued) = state.approval_queue.pop_front() {
789 let decision = get_tool_decision(state, &queued.tool_call);
790
791 match decision {
792 ToolDecision::Allow => {
793 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
794 effects.push(Effect::EmitEvent {
795 session_id,
796 event: SessionEvent::Error {
797 message: "Queued tool approval processed without active operation"
798 .to_string(),
799 },
800 });
801 state.approval_queue.push_front(queued);
802 break;
803 };
804 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
805 &queued.tool_call.id,
806 ));
807
808 effects.push(Effect::ExecuteTool {
809 session_id,
810 op_id,
811 tool_call: queued.tool_call,
812 });
813 }
814 ToolDecision::Deny => {
815 let tool_name = queued.tool_call.name.clone();
816 let error = ToolError::DeniedByPolicy(tool_name.clone());
817 effects.extend(fail_tool_call_without_execution(
818 state,
819 session_id,
820 queued.tool_call,
821 error,
822 format!("Tool '{tool_name}' denied by policy"),
823 "denied",
824 false,
825 ));
826 }
827 ToolDecision::Ask => {
828 let request_id = crate::app::domain::types::RequestId::new();
829 state.pending_approval = Some(PendingApproval {
830 request_id,
831 tool_call: queued.tool_call.clone(),
832 });
833
834 effects.push(Effect::EmitEvent {
835 session_id,
836 event: SessionEvent::ApprovalRequested {
837 request_id,
838 tool_call: queued.tool_call.clone(),
839 },
840 });
841
842 effects.push(Effect::RequestUserApproval {
843 session_id,
844 request_id,
845 tool_call: queued.tool_call,
846 });
847
848 break;
849 }
850 }
851 }
852
853 let all_tools_complete = state
854 .current_operation
855 .as_ref()
856 .is_none_or(|op| op.pending_tool_calls.is_empty());
857 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
858
859 if all_tools_complete
860 && no_pending_approvals
861 && let Some(op) = &state.current_operation
862 {
863 let op_id = op.op_id;
864 if let Some(model) = state.operation_models.get(&op_id).cloned() {
865 effects.push(Effect::CallModel {
866 session_id,
867 op_id,
868 model,
869 messages: state
870 .message_graph
871 .get_thread_messages()
872 .into_iter()
873 .cloned()
874 .collect(),
875 system_context: state.cached_system_context.clone(),
876 tools: state.tools.clone(),
877 });
878 }
879 }
880
881 effects
882}
883
884fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
885 if state.approved_tools.contains(&tool_call.name) {
886 return ToolDecision::Allow;
887 }
888
889 if tool_call.name == DISPATCH_AGENT_TOOL_NAME
890 && let Ok(params) =
891 serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
892 && let Some(config) = state.session_config.as_ref()
893 {
894 let policy = &config.tool_config.approval_policy;
895 match params.target {
896 DispatchAgentTarget::Resume { .. } => {
897 return ToolDecision::Allow;
898 }
899 DispatchAgentTarget::New { agent, .. } => {
900 let agent_id = agent
901 .as_deref()
902 .filter(|value| !value.trim().is_empty())
903 .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
904 if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
905 return ToolDecision::Allow;
906 }
907 }
908 }
909 }
910
911 if tool_call.name == BASH_TOOL_NAME
912 && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
913 tool_call.parameters.clone(),
914 )
915 && state.is_bash_pattern_approved(¶ms.command)
916 {
917 return ToolDecision::Allow;
918 }
919
920 state
921 .session_config
922 .as_ref()
923 .map_or(ToolDecision::Ask, |config| {
924 config
925 .tool_config
926 .approval_policy
927 .tool_decision(&tool_call.name)
928 })
929}
930
931fn handle_tool_execution_started(
932 state: &mut AppState,
933 session_id: crate::app::domain::types::SessionId,
934 tool_call_id: crate::app::domain::types::ToolCallId,
935 tool_name: String,
936 tool_parameters: serde_json::Value,
937) -> Vec<Effect> {
938 state.add_pending_tool_call(tool_call_id.clone());
939
940 let op_id = match state.current_operation.as_ref() {
941 Some(op) => op.op_id,
942 None => {
943 return vec![Effect::EmitEvent {
944 session_id,
945 event: SessionEvent::Error {
946 message: "Tool call started without active operation".to_string(),
947 },
948 }];
949 }
950 };
951
952 let is_direct_bash = matches!(
953 state.current_operation.as_ref().map(|op| &op.kind),
954 Some(OperationKind::DirectBash { .. })
955 );
956
957 if is_direct_bash {
958 return vec![];
959 }
960
961 let model = match state.operation_models.get(&op_id).cloned() {
962 Some(model) => model,
963 None => {
964 return vec![Effect::EmitEvent {
965 session_id,
966 event: SessionEvent::Error {
967 message: format!("Missing model for tool call on operation {op_id}"),
968 },
969 }];
970 }
971 };
972
973 vec![Effect::EmitEvent {
974 session_id,
975 event: SessionEvent::ToolCallStarted {
976 id: tool_call_id,
977 name: tool_name,
978 parameters: tool_parameters,
979 model,
980 },
981 }]
982}
983
984fn handle_tool_result(
985 state: &mut AppState,
986 session_id: crate::app::domain::types::SessionId,
987 tool_call_id: crate::app::domain::types::ToolCallId,
988 tool_name: String,
989 result: Result<ToolResult, ToolError>,
990) -> Vec<Effect> {
991 let mut effects = Vec::new();
992
993 let op = match &state.current_operation {
994 Some(op) => {
995 if state.cancelled_ops.contains(&op.op_id) {
996 tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
997 return effects;
998 }
999 op.clone()
1000 }
1001 None => return effects,
1002 };
1003 let op_id = op.op_id;
1004
1005 state.remove_pending_tool_call(&tool_call_id);
1006
1007 let tool_result = match result {
1008 Ok(r) => r,
1009 Err(e) => ToolResult::Error(e),
1010 };
1011
1012 let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1013
1014 if is_direct_bash {
1015 let command = match &op.kind {
1016 OperationKind::DirectBash { command } => command.clone(),
1017 _ => tool_name,
1018 };
1019
1020 let (stdout, stderr, exit_code) = match &tool_result {
1021 ToolResult::Bash(result) => (
1022 result.stdout.clone(),
1023 result.stderr.clone(),
1024 result.exit_code,
1025 ),
1026 ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1027 other => (format!("{other:?}"), String::new(), 0),
1028 };
1029
1030 let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1031 state
1032 .message_graph
1033 .update_command_execution(
1034 id.as_str(),
1035 command.clone(),
1036 stdout.clone(),
1037 stderr.clone(),
1038 exit_code,
1039 )
1040 .or_else(|| {
1041 let parent_id = state.message_graph.active_message_id.clone();
1042 let timestamp = Message::current_timestamp();
1043 Some(Message {
1044 data: MessageData::User {
1045 content: vec![UserContent::CommandExecution {
1046 command: command.clone(),
1047 stdout: stdout.clone(),
1048 stderr: stderr.clone(),
1049 exit_code,
1050 }],
1051 },
1052 timestamp,
1053 id: id.to_string(),
1054 parent_message_id: parent_id,
1055 })
1056 })
1057 });
1058
1059 state.complete_operation(op_id);
1060
1061 if let Some(message) = updated {
1062 effects.push(Effect::EmitEvent {
1063 session_id,
1064 event: SessionEvent::MessageUpdated { message },
1065 });
1066 }
1067
1068 effects.push(Effect::EmitEvent {
1069 session_id,
1070 event: SessionEvent::OperationCompleted { op_id },
1071 });
1072
1073 effects.extend(maybe_start_queued_work(state, session_id));
1074
1075 return effects;
1076 }
1077
1078 let model = match state.operation_models.get(&op_id).cloned() {
1079 Some(model) => model,
1080 None => {
1081 return vec![Effect::EmitEvent {
1082 session_id,
1083 event: SessionEvent::Error {
1084 message: format!("Missing model for tool result on operation {op_id}"),
1085 },
1086 }];
1087 }
1088 };
1089
1090 let event = match &tool_result {
1091 ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1092 id: tool_call_id.clone(),
1093 name: tool_name.clone(),
1094 error: e.to_string(),
1095 model: model.clone(),
1096 },
1097 _ => SessionEvent::ToolCallCompleted {
1098 id: tool_call_id.clone(),
1099 name: tool_name,
1100 result: tool_result.clone(),
1101 model: model.clone(),
1102 },
1103 };
1104
1105 effects.push(Effect::EmitEvent { session_id, event });
1106
1107 let parent_id = state.message_graph.active_message_id.clone();
1108 let tool_message = Message {
1109 data: MessageData::Tool {
1110 tool_use_id: tool_call_id.0.clone(),
1111 result: tool_result,
1112 },
1113 timestamp: 0,
1114 id: format!("tool_result_{}", tool_call_id.0),
1115 parent_message_id: parent_id,
1116 };
1117 state.message_graph.add_message(tool_message.clone());
1118
1119 effects.push(Effect::EmitEvent {
1120 session_id,
1121 event: SessionEvent::ToolMessageAdded {
1122 message: tool_message,
1123 },
1124 });
1125
1126 let all_tools_complete = state
1127 .current_operation
1128 .as_ref()
1129 .is_none_or(|op| op.pending_tool_calls.is_empty());
1130 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1131
1132 if all_tools_complete && no_pending_approvals {
1133 effects.push(Effect::CallModel {
1134 session_id,
1135 op_id,
1136 model,
1137 messages: state
1138 .message_graph
1139 .get_thread_messages()
1140 .into_iter()
1141 .cloned()
1142 .collect(),
1143 system_context: state.cached_system_context.clone(),
1144 tools: state.tools.clone(),
1145 });
1146 }
1147
1148 effects
1149}
1150
1151fn handle_model_response_complete(
1152 state: &mut AppState,
1153 session_id: crate::app::domain::types::SessionId,
1154 op_id: crate::app::domain::types::OpId,
1155 message_id: crate::app::domain::types::MessageId,
1156 content: Vec<AssistantContent>,
1157 timestamp: u64,
1158) -> Vec<Effect> {
1159 let mut effects = Vec::new();
1160
1161 if state.cancelled_ops.contains(&op_id) {
1162 tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1163 return effects;
1164 }
1165
1166 let tool_calls: Vec<_> = content
1167 .iter()
1168 .filter_map(|c| {
1169 if let AssistantContent::ToolCall { tool_call, .. } = c {
1170 Some(tool_call.clone())
1171 } else {
1172 None
1173 }
1174 })
1175 .collect();
1176
1177 let parent_id = state.message_graph.active_message_id.clone();
1178
1179 let message = Message {
1180 data: MessageData::Assistant {
1181 content: content.clone(),
1182 },
1183 timestamp,
1184 id: message_id.0.clone(),
1185 parent_message_id: parent_id,
1186 };
1187
1188 state.message_graph.add_message(message.clone());
1189 state.message_graph.active_message_id = Some(message_id.0.clone());
1190
1191 let model = match state.operation_models.get(&op_id).cloned() {
1192 Some(model) => model,
1193 None => {
1194 return vec![Effect::EmitEvent {
1195 session_id,
1196 event: SessionEvent::Error {
1197 message: format!("Missing model for operation {op_id}"),
1198 },
1199 }];
1200 }
1201 };
1202
1203 effects.push(Effect::EmitEvent {
1204 session_id,
1205 event: SessionEvent::AssistantMessageAdded { message, model },
1206 });
1207
1208 if tool_calls.is_empty() {
1209 state.complete_operation(op_id);
1210 effects.push(Effect::EmitEvent {
1211 session_id,
1212 event: SessionEvent::OperationCompleted { op_id },
1213 });
1214 effects.extend(maybe_start_queued_work(state, session_id));
1215 } else {
1216 for tool_call in tool_calls {
1217 let request_id = crate::app::domain::types::RequestId::new();
1218 effects.extend(handle_tool_approval_requested(
1219 state, session_id, request_id, tool_call,
1220 ));
1221 }
1222 }
1223
1224 effects
1225}
1226
1227fn handle_model_response_error(
1228 state: &mut AppState,
1229 session_id: crate::app::domain::types::SessionId,
1230 op_id: crate::app::domain::types::OpId,
1231 error: &str,
1232) -> Vec<Effect> {
1233 let mut effects = Vec::new();
1234
1235 if state.cancelled_ops.contains(&op_id) {
1236 return effects;
1237 }
1238
1239 state.complete_operation(op_id);
1240
1241 effects.push(Effect::EmitEvent {
1242 session_id,
1243 event: SessionEvent::Error {
1244 message: error.to_string(),
1245 },
1246 });
1247
1248 effects.push(Effect::EmitEvent {
1249 session_id,
1250 event: SessionEvent::OperationCompleted { op_id },
1251 });
1252
1253 effects.extend(maybe_start_queued_work(state, session_id));
1254
1255 effects
1256}
1257
1258fn handle_direct_bash(
1259 state: &mut AppState,
1260 session_id: crate::app::domain::types::SessionId,
1261 op_id: crate::app::domain::types::OpId,
1262 message_id: crate::app::domain::types::MessageId,
1263 command: String,
1264 timestamp: u64,
1265) -> Vec<Effect> {
1266 let mut effects = Vec::new();
1267
1268 if state.has_active_operation() {
1269 state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1270 command,
1271 op_id,
1272 message_id,
1273 queued_at: timestamp,
1274 });
1275 effects.push(Effect::EmitEvent {
1276 session_id,
1277 event: SessionEvent::QueueUpdated {
1278 queue: snapshot_queue(state),
1279 },
1280 });
1281 return effects;
1282 }
1283
1284 let parent_id = state.message_graph.active_message_id.clone();
1285 let message = Message {
1286 data: MessageData::User {
1287 content: vec![UserContent::CommandExecution {
1288 command: command.clone(),
1289 stdout: String::new(),
1290 stderr: String::new(),
1291 exit_code: 0,
1292 }],
1293 },
1294 timestamp,
1295 id: message_id.0.clone(),
1296 parent_message_id: parent_id,
1297 };
1298
1299 state.message_graph.add_message(message.clone());
1300 state.message_graph.active_message_id = Some(message_id.0.clone());
1301
1302 state.start_operation(
1303 op_id,
1304 OperationKind::DirectBash {
1305 command: command.clone(),
1306 },
1307 );
1308 state.operation_messages.insert(op_id, message_id);
1309
1310 effects.push(Effect::EmitEvent {
1311 session_id,
1312 event: SessionEvent::UserMessageAdded { message },
1313 });
1314
1315 effects.push(Effect::EmitEvent {
1316 session_id,
1317 event: SessionEvent::OperationStarted {
1318 op_id,
1319 kind: OperationKind::DirectBash {
1320 command: command.clone(),
1321 },
1322 },
1323 });
1324
1325 let tool_call = steer_tools::ToolCall {
1326 id: format!("direct_bash_{op_id}"),
1327 name: BASH_TOOL_NAME.to_string(),
1328 parameters: serde_json::json!({ "command": command }),
1329 };
1330
1331 effects.push(Effect::ExecuteTool {
1332 session_id,
1333 op_id,
1334 tool_call,
1335 });
1336
1337 effects
1338}
1339
1340fn handle_dequeue_queued_item(
1341 state: &mut AppState,
1342 session_id: crate::app::domain::types::SessionId,
1343) -> Result<Vec<Effect>, ReduceError> {
1344 if state.pop_next_queued_work().is_some() {
1345 Ok(vec![Effect::EmitEvent {
1346 session_id,
1347 event: SessionEvent::QueueUpdated {
1348 queue: snapshot_queue(state),
1349 },
1350 }])
1351 } else {
1352 Err(invalid_action(
1353 InvalidActionKind::QueueEmpty,
1354 "No queued item to remove.",
1355 ))
1356 }
1357}
1358
1359fn maybe_start_queued_work(
1360 state: &mut AppState,
1361 session_id: crate::app::domain::types::SessionId,
1362) -> Vec<Effect> {
1363 if state.has_active_operation() {
1364 return vec![];
1365 }
1366
1367 let Some(next) = state.pop_next_queued_work() else {
1368 return vec![];
1369 };
1370
1371 let mut effects = vec![Effect::EmitEvent {
1372 session_id,
1373 event: SessionEvent::QueueUpdated {
1374 queue: snapshot_queue(state),
1375 },
1376 }];
1377
1378 match next {
1379 QueuedWorkItem::UserMessage(item) => {
1380 effects.extend(handle_user_input(
1381 state,
1382 session_id,
1383 item.text,
1384 item.op_id,
1385 item.message_id,
1386 item.model,
1387 item.queued_at,
1388 ));
1389 }
1390 QueuedWorkItem::DirectBash(item) => {
1391 effects.extend(handle_direct_bash(
1392 state,
1393 session_id,
1394 item.op_id,
1395 item.message_id,
1396 item.command,
1397 item.queued_at,
1398 ));
1399 }
1400 }
1401
1402 effects
1403}
1404
1405fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1406 state
1407 .queued_work
1408 .iter()
1409 .map(snapshot_queued_work_item)
1410 .collect()
1411}
1412
1413fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1414 match item {
1415 QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1416 kind: Some(QueuedWorkKind::UserMessage),
1417 content: message.text.as_str().to_string(),
1418 queued_at: message.queued_at,
1419 model: Some(message.model.clone()),
1420 op_id: message.op_id,
1421 message_id: message.message_id.clone(),
1422 },
1423 QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1424 kind: Some(QueuedWorkKind::DirectBash),
1425 content: command.command.clone(),
1426 queued_at: command.queued_at,
1427 model: None,
1428 op_id: command.op_id,
1429 message_id: command.message_id.clone(),
1430 },
1431 }
1432}
1433
1434fn handle_request_compaction(
1435 state: &mut AppState,
1436 session_id: crate::app::domain::types::SessionId,
1437 op_id: crate::app::domain::types::OpId,
1438 model: crate::config::model::ModelId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440 const MIN_MESSAGES_FOR_COMPACT: usize = 3;
1441 let message_count = state.message_graph.get_thread_messages().len();
1442
1443 if state.has_active_operation() {
1444 return Err(invalid_action(
1445 InvalidActionKind::OperationInFlight,
1446 "Cannot compact while an operation is active.",
1447 ));
1448 }
1449
1450 if message_count < MIN_MESSAGES_FOR_COMPACT {
1451 return Ok(vec![Effect::EmitEvent {
1452 session_id,
1453 event: SessionEvent::CompactResult {
1454 result: crate::app::domain::event::CompactResult::InsufficientMessages,
1455 },
1456 }]);
1457 }
1458
1459 state.start_operation(op_id, OperationKind::Compact);
1460 state.operation_models.insert(op_id, model.clone());
1461
1462 Ok(vec![
1463 Effect::EmitEvent {
1464 session_id,
1465 event: SessionEvent::OperationStarted {
1466 op_id,
1467 kind: OperationKind::Compact,
1468 },
1469 },
1470 Effect::RequestCompaction {
1471 session_id,
1472 op_id,
1473 model,
1474 },
1475 ])
1476}
1477
1478fn handle_cancel(
1479 state: &mut AppState,
1480 session_id: crate::app::domain::types::SessionId,
1481 target_op: Option<crate::app::domain::types::OpId>,
1482) -> Vec<Effect> {
1483 let mut effects = Vec::new();
1484
1485 let op = match &state.current_operation {
1486 Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1487 _ => return effects,
1488 };
1489
1490 state.record_cancelled_op(op.op_id);
1491
1492 if matches!(op.kind, OperationKind::Compact) {
1493 effects.push(Effect::EmitEvent {
1494 session_id,
1495 event: SessionEvent::CompactResult {
1496 result: crate::app::domain::event::CompactResult::Cancelled,
1497 },
1498 });
1499 }
1500
1501 let pending_approval = state.pending_approval.take();
1502 let queued_approvals = std::mem::take(&mut state.approval_queue);
1503
1504 if matches!(op.kind, OperationKind::AgentLoop) {
1505 if let Some(pending) = pending_approval {
1506 let tool_name = pending.tool_call.name.clone();
1507 effects.extend(emit_tool_failure_message(
1508 state,
1509 session_id,
1510 &pending.tool_call.id,
1511 &tool_name,
1512 ToolError::Cancelled(tool_name.clone()),
1513 format!("Tool '{tool_name}' cancelled"),
1514 "cancelled",
1515 ));
1516 }
1517
1518 for queued in queued_approvals {
1519 let tool_name = queued.tool_call.name.clone();
1520 effects.extend(emit_tool_failure_message(
1521 state,
1522 session_id,
1523 &queued.tool_call.id,
1524 &tool_name,
1525 ToolError::Cancelled(tool_name.clone()),
1526 format!("Tool '{tool_name}' cancelled"),
1527 "cancelled",
1528 ));
1529 }
1530
1531 for tool_call_id in &op.pending_tool_calls {
1532 let tool_name = state
1533 .message_graph
1534 .find_tool_name_by_id(tool_call_id.as_str())
1535 .unwrap_or_else(|| tool_call_id.as_str().to_string());
1536 let event_error = if tool_name == tool_call_id.as_str() {
1537 format!("Tool call '{tool_call_id}' cancelled")
1538 } else {
1539 format!("Tool '{tool_name}' cancelled")
1540 };
1541 effects.extend(emit_tool_failure_message(
1542 state,
1543 session_id,
1544 tool_call_id.as_str(),
1545 &tool_name,
1546 ToolError::Cancelled(tool_name.clone()),
1547 event_error,
1548 "cancelled",
1549 ));
1550 }
1551 }
1552 state.active_streams.remove(&op.op_id);
1553
1554 let dequeued_item = state.pop_next_queued_work();
1555 let popped_queued_item = dequeued_item
1556 .as_ref()
1557 .map(snapshot_queued_work_item);
1558
1559 effects.push(Effect::EmitEvent {
1560 session_id,
1561 event: SessionEvent::OperationCancelled {
1562 op_id: op.op_id,
1563 info: CancellationInfo {
1564 pending_tool_calls: op.pending_tool_calls.len(),
1565 popped_queued_item,
1566 },
1567 },
1568 });
1569
1570 effects.push(Effect::CancelOperation {
1571 session_id,
1572 op_id: op.op_id,
1573 });
1574
1575 state.complete_operation(op.op_id);
1576
1577 if dequeued_item.is_some() {
1578 effects.push(Effect::EmitEvent {
1579 session_id,
1580 event: SessionEvent::QueueUpdated {
1581 queue: snapshot_queue(state),
1582 },
1583 });
1584 }
1585
1586 effects
1587}
1588
1589fn handle_hydrate(
1590 state: &mut AppState,
1591 session_id: crate::app::domain::types::SessionId,
1592 events: Vec<SessionEvent>,
1593 starting_sequence: u64,
1594) -> Vec<Effect> {
1595 for event in events {
1596 apply_event_to_state(state, &event);
1597 }
1598
1599 state.event_sequence = starting_sequence;
1600
1601 emit_mcp_connect_effects(state, session_id)
1602}
1603
1604fn handle_switch_primary_agent(
1605 state: &mut AppState,
1606 session_id: crate::app::domain::types::SessionId,
1607 agent_id: String,
1608) -> Result<Vec<Effect>, ReduceError> {
1609 if state.current_operation.is_some() {
1610 return Err(invalid_action(
1611 InvalidActionKind::OperationInFlight,
1612 "Cannot switch primary agent while an operation is active.",
1613 ));
1614 }
1615
1616 let Some(base_config) = state
1617 .base_session_config
1618 .as_ref()
1619 .or(state.session_config.as_ref())
1620 else {
1621 return Err(invalid_action(
1622 InvalidActionKind::MissingSessionConfig,
1623 "Cannot switch primary agent without session config.",
1624 ));
1625 };
1626
1627 let Some(_spec) = primary_agent_spec(&agent_id) else {
1628 return Err(invalid_action(
1629 InvalidActionKind::UnknownPrimaryAgent,
1630 format!("Unknown primary agent '{agent_id}'."),
1631 ));
1632 };
1633
1634 let mut updated_config = base_config.clone();
1635 updated_config.primary_agent_id = Some(agent_id.clone());
1636 let new_config = resolve_effective_config(&updated_config);
1637 let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1638
1639 apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1640
1641 let mut effects = Vec::new();
1642 effects.push(Effect::EmitEvent {
1643 session_id,
1644 event: SessionEvent::SessionConfigUpdated {
1645 config: Box::new(new_config),
1646 primary_agent_id: agent_id,
1647 },
1648 });
1649 effects.extend(backend_effects);
1650 effects.push(Effect::ReloadToolSchemas { session_id });
1651
1652 Ok(effects)
1653}
1654
1655fn apply_session_config_state(
1656 state: &mut AppState,
1657 config: &crate::session::state::SessionConfig,
1658 primary_agent_id: Option<String>,
1659 update_base: bool,
1660) {
1661 state.apply_session_config(config, primary_agent_id, update_base);
1662}
1663
1664fn mcp_backend_diff_effects(
1665 session_id: crate::app::domain::types::SessionId,
1666 old_config: &crate::session::state::SessionConfig,
1667 new_config: &crate::session::state::SessionConfig,
1668) -> Vec<Effect> {
1669 let old_map = collect_mcp_backends(old_config);
1670 let new_map = collect_mcp_backends(new_config);
1671
1672 let mut effects = Vec::new();
1673
1674 for (server_name, (old_transport, old_filter)) in &old_map {
1675 match new_map.get(server_name) {
1676 None => {
1677 effects.push(Effect::DisconnectMcpServer {
1678 session_id,
1679 server_name: server_name.clone(),
1680 });
1681 }
1682 Some((new_transport, new_filter)) => {
1683 if new_transport != old_transport || new_filter != old_filter {
1684 effects.push(Effect::DisconnectMcpServer {
1685 session_id,
1686 server_name: server_name.clone(),
1687 });
1688 effects.push(Effect::ConnectMcpServer {
1689 session_id,
1690 config: McpServerConfig {
1691 server_name: server_name.clone(),
1692 transport: new_transport.clone(),
1693 tool_filter: new_filter.clone(),
1694 },
1695 });
1696 }
1697 }
1698 }
1699 }
1700
1701 for (server_name, (new_transport, new_filter)) in &new_map {
1702 if !old_map.contains_key(server_name) {
1703 effects.push(Effect::ConnectMcpServer {
1704 session_id,
1705 config: McpServerConfig {
1706 server_name: server_name.clone(),
1707 transport: new_transport.clone(),
1708 tool_filter: new_filter.clone(),
1709 },
1710 });
1711 }
1712 }
1713
1714 effects
1715}
1716
1717fn collect_mcp_backends(
1718 config: &crate::session::state::SessionConfig,
1719) -> std::collections::HashMap<
1720 String,
1721 (
1722 crate::tools::McpTransport,
1723 crate::session::state::ToolFilter,
1724 ),
1725> {
1726 let mut map = std::collections::HashMap::new();
1727
1728 for backend_config in &config.tool_config.backends {
1729 let BackendConfig::Mcp {
1730 server_name,
1731 transport,
1732 tool_filter,
1733 } = backend_config;
1734
1735 map.insert(
1736 server_name.clone(),
1737 (transport.clone(), tool_filter.clone()),
1738 );
1739 }
1740
1741 map
1742}
1743
1744pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1745 match event {
1746 SessionEvent::SessionCreated { config, .. } => {
1747 let primary_agent_id = config
1748 .primary_agent_id
1749 .clone()
1750 .unwrap_or_else(|| default_primary_agent_id().to_string());
1751 apply_session_config_state(state, config, Some(primary_agent_id), true);
1752 }
1753 SessionEvent::SessionConfigUpdated {
1754 config,
1755 primary_agent_id,
1756 } => {
1757 apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1758 }
1759 SessionEvent::AssistantMessageAdded { message, .. }
1760 | SessionEvent::UserMessageAdded { message }
1761 | SessionEvent::ToolMessageAdded { message } => {
1762 state.message_graph.add_message(message.clone());
1763 state.message_graph.active_message_id = Some(message.id().to_string());
1764 }
1765 SessionEvent::MessageUpdated { message } => {
1766 state.message_graph.replace_message(message.clone());
1767 }
1768 SessionEvent::ApprovalDecided {
1769 decision, remember, ..
1770 } => {
1771 if *decision == ApprovalDecision::Approved
1772 && let Some(memory) = remember
1773 {
1774 match memory {
1775 ApprovalMemory::Tool(name) => {
1776 state.approved_tools.insert(name.clone());
1777 }
1778 ApprovalMemory::BashPattern(pattern) => {
1779 state.approved_bash_patterns.insert(pattern.clone());
1780 }
1781 ApprovalMemory::PendingTool => {}
1782 }
1783 }
1784 state.pending_approval = None;
1785 }
1786 SessionEvent::OperationCompleted { op_id } => {
1787 state.complete_operation(*op_id);
1788 }
1789 SessionEvent::OperationCancelled { op_id, .. } => {
1790 state.record_cancelled_op(*op_id);
1791 state.complete_operation(*op_id);
1792 }
1793 SessionEvent::McpServerStateChanged {
1794 server_name,
1795 state: mcp_state,
1796 } => {
1797 state
1798 .mcp_servers
1799 .insert(server_name.clone(), mcp_state.clone());
1800 }
1801 SessionEvent::QueueUpdated { queue } => {
1802 let normalize_text = |content: &str| {
1803 NonEmptyString::new(content.to_string())
1804 .or_else(|| NonEmptyString::new("(empty)".to_string()))
1805 };
1806
1807 state.queued_work = queue
1808 .iter()
1809 .filter_map(|item| match item.kind {
1810 Some(QueuedWorkKind::UserMessage) => {
1811 let text = normalize_text(item.content.as_str())?;
1812 Some(QueuedWorkItem::UserMessage(
1813 crate::app::domain::state::QueuedUserMessage {
1814 text,
1815 op_id: item.op_id,
1816 message_id: item.message_id.clone(),
1817 model: item.model.clone().unwrap_or_else(
1818 crate::config::model::builtin::claude_sonnet_4_5,
1819 ),
1820 queued_at: item.queued_at,
1821 },
1822 ))
1823 }
1824 Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1825 crate::app::domain::state::QueuedBashCommand {
1826 command: item.content.clone(),
1827 op_id: item.op_id,
1828 message_id: item.message_id.clone(),
1829 queued_at: item.queued_at,
1830 },
1831 )),
1832 None => {
1833 let text = normalize_text(item.content.as_str())?;
1834 Some(QueuedWorkItem::UserMessage(
1835 crate::app::domain::state::QueuedUserMessage {
1836 text,
1837 op_id: item.op_id,
1838 message_id: item.message_id.clone(),
1839 model: item.model.clone().unwrap_or_else(
1840 crate::config::model::builtin::claude_sonnet_4_5,
1841 ),
1842 queued_at: item.queued_at,
1843 },
1844 ))
1845 }
1846 })
1847 .collect();
1848 }
1849 _ => {}
1850 }
1851
1852 state.event_sequence += 1;
1853}
1854
1855struct CompactionCompleteParams {
1856 op_id: crate::app::domain::types::OpId,
1857 compaction_id: crate::app::domain::types::CompactionId,
1858 summary_message_id: crate::app::domain::types::MessageId,
1859 summary: String,
1860 compacted_head_message_id: crate::app::domain::types::MessageId,
1861 previous_active_message_id: Option<crate::app::domain::types::MessageId>,
1862 model_name: String,
1863 timestamp: u64,
1864}
1865
1866fn handle_compaction_complete(
1867 state: &mut AppState,
1868 session_id: crate::app::domain::types::SessionId,
1869 params: CompactionCompleteParams,
1870) -> Vec<Effect> {
1871 use crate::app::conversation::{AssistantContent, Message, MessageData};
1872 use crate::app::domain::types::CompactionRecord;
1873
1874 let CompactionCompleteParams {
1875 op_id,
1876 compaction_id,
1877 summary_message_id,
1878 summary,
1879 compacted_head_message_id,
1880 previous_active_message_id,
1881 model_name,
1882 timestamp,
1883 } = params;
1884
1885 let summary_message = Message {
1886 data: MessageData::Assistant {
1887 content: vec![AssistantContent::Text {
1888 text: summary.clone(),
1889 }],
1890 },
1891 id: summary_message_id.to_string(),
1892 parent_message_id: None,
1893 timestamp,
1894 };
1895
1896 state.message_graph.add_message(summary_message.clone());
1897
1898 let record = CompactionRecord::with_timestamp(
1899 compaction_id,
1900 summary_message_id,
1901 compacted_head_message_id,
1902 previous_active_message_id,
1903 model_name,
1904 timestamp,
1905 );
1906
1907 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
1908 model
1909 } else {
1910 state.complete_operation(op_id);
1911 return vec![Effect::EmitEvent {
1912 session_id,
1913 event: SessionEvent::Error {
1914 message: format!("Missing model for compaction operation {op_id}"),
1915 },
1916 }];
1917 };
1918
1919 state.complete_operation(op_id);
1920
1921 let mut effects = vec![
1922 Effect::EmitEvent {
1923 session_id,
1924 event: SessionEvent::AssistantMessageAdded {
1925 message: summary_message,
1926 model,
1927 },
1928 },
1929 Effect::EmitEvent {
1930 session_id,
1931 event: SessionEvent::CompactResult {
1932 result: crate::app::domain::event::CompactResult::Success(summary),
1933 },
1934 },
1935 Effect::EmitEvent {
1936 session_id,
1937 event: SessionEvent::ConversationCompacted { record },
1938 },
1939 Effect::EmitEvent {
1940 session_id,
1941 event: SessionEvent::OperationCompleted { op_id },
1942 },
1943 ];
1944
1945 effects.extend(maybe_start_queued_work(state, session_id));
1946
1947 effects
1948}
1949
1950fn handle_compaction_failed(
1951 state: &mut AppState,
1952 session_id: crate::app::domain::types::SessionId,
1953 op_id: crate::app::domain::types::OpId,
1954 error: String,
1955) -> Vec<Effect> {
1956 state.complete_operation(op_id);
1957
1958 let mut effects = vec![
1959 Effect::EmitEvent {
1960 session_id,
1961 event: SessionEvent::Error { message: error },
1962 },
1963 Effect::EmitEvent {
1964 session_id,
1965 event: SessionEvent::OperationCompleted { op_id },
1966 },
1967 ];
1968
1969 effects.extend(maybe_start_queued_work(state, session_id));
1970
1971 effects
1972}
1973
1974fn emit_mcp_connect_effects(
1975 state: &AppState,
1976 session_id: crate::app::domain::types::SessionId,
1977) -> Vec<Effect> {
1978 let mut effects = Vec::new();
1979
1980 let Some(ref config) = state.session_config else {
1981 return effects;
1982 };
1983
1984 for backend_config in &config.tool_config.backends {
1985 let BackendConfig::Mcp {
1986 server_name,
1987 transport,
1988 tool_filter,
1989 } = backend_config;
1990
1991 let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
1992 matches!(
1993 s,
1994 McpServerState::Connecting | McpServerState::Connected { .. }
1995 )
1996 });
1997
1998 if !already_connected {
1999 effects.push(Effect::ConnectMcpServer {
2000 session_id,
2001 config: McpServerConfig {
2002 server_name: server_name.clone(),
2003 transport: transport.clone(),
2004 tool_filter: tool_filter.clone(),
2005 },
2006 });
2007 }
2008 }
2009
2010 effects
2011}
2012
2013#[cfg(test)]
2014mod tests {
2015 use super::*;
2016 use crate::app::domain::state::{OperationState, PendingApproval};
2017 use crate::app::domain::types::{
2018 MessageId, NonEmptyString, OpId, RequestId, SessionId, ToolCallId,
2019 };
2020 use crate::config::model::builtin;
2021 use crate::primary_agents::resolve_effective_config;
2022 use crate::session::state::{
2023 ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2024 ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2025 };
2026 use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2027 use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2028 use schemars::schema_for;
2029 use serde_json::json;
2030 use std::collections::HashSet;
2031 use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2032
2033 fn test_state() -> AppState {
2034 AppState::new(SessionId::new())
2035 }
2036
2037 fn test_schema(name: &str) -> ToolSchema {
2038 ToolSchema {
2039 name: name.to_string(),
2040 display_name: name.to_string(),
2041 description: String::new(),
2042 input_schema: InputSchema::empty_object(),
2043 }
2044 }
2045
2046 fn base_session_config() -> SessionConfig {
2047 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2048 config.primary_agent_id = Some("normal".to_string());
2049 config.policy_overrides = SessionPolicyOverrides::empty();
2050 resolve_effective_config(&config)
2051 }
2052
2053 fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2054 super::reduce(state, action).expect("reduce failed")
2055 }
2056
2057 #[test]
2058 fn test_user_input_starts_operation() {
2059 let mut state = test_state();
2060 let session_id = state.session_id;
2061 let op_id = OpId::new();
2062 let message_id = MessageId::new();
2063 let model = builtin::claude_sonnet_4_5();
2064
2065 let effects = reduce(
2066 &mut state,
2067 Action::UserInput {
2068 session_id,
2069 text: NonEmptyString::new("Hello").unwrap(),
2070 op_id,
2071 message_id,
2072 model,
2073 timestamp: 1_234_567_890,
2074 },
2075 );
2076
2077 assert_eq!(state.message_graph.messages.len(), 1);
2078 assert!(state.current_operation.is_some());
2079 assert!(
2080 effects
2081 .iter()
2082 .any(|e| matches!(e, Effect::CallModel { .. }))
2083 );
2084 }
2085
2086 #[test]
2087 fn test_switch_primary_agent_updates_visibility() {
2088 let mut state = test_state();
2089 let session_id = state.session_id;
2090 let config = base_session_config();
2091 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2092
2093 let effects = reduce(
2094 &mut state,
2095 Action::SwitchPrimaryAgent {
2096 session_id,
2097 agent_id: "plan".to_string(),
2098 },
2099 );
2100
2101 let updated = state.session_config.as_ref().expect("config");
2102 match &updated.tool_config.visibility {
2103 ToolVisibility::Whitelist(allowed) => {
2104 assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2105 for name in READ_ONLY_TOOL_NAMES {
2106 assert!(allowed.contains(*name));
2107 }
2108 assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2109 }
2110 other => panic!("Unexpected tool visibility: {other:?}"),
2111 }
2112 assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2113 assert!(effects.iter().any(|e| matches!(
2114 e,
2115 Effect::EmitEvent {
2116 event: SessionEvent::SessionConfigUpdated { .. },
2117 ..
2118 }
2119 )));
2120 assert!(
2121 effects
2122 .iter()
2123 .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2124 );
2125 }
2126
2127 #[test]
2128 fn test_switch_primary_agent_yolo_auto_approves() {
2129 let mut state = test_state();
2130 let session_id = state.session_id;
2131 let config = base_session_config();
2132 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2133
2134 let _ = reduce(
2135 &mut state,
2136 Action::SwitchPrimaryAgent {
2137 session_id,
2138 agent_id: "yolo".to_string(),
2139 },
2140 );
2141
2142 let updated = state.session_config.as_ref().expect("config");
2143 assert_eq!(
2144 updated.tool_config.approval_policy.default_behavior,
2145 UnapprovedBehavior::Allow
2146 );
2147 }
2148
2149 #[test]
2150 fn test_switch_primary_agent_preserves_policy_overrides() {
2151 let mut state = test_state();
2152 let session_id = state.session_id;
2153
2154 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2155 config.primary_agent_id = Some("normal".to_string());
2156 config.policy_overrides = SessionPolicyOverrides {
2157 default_model: None,
2158 tool_visibility: None,
2159 approval_policy: ToolApprovalPolicyOverrides {
2160 default_behavior: Some(UnapprovedBehavior::Deny),
2161 preapproved: ApprovalRulesOverrides::empty(),
2162 },
2163 };
2164 let config = resolve_effective_config(&config);
2165 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2166
2167 let _ = reduce(
2168 &mut state,
2169 Action::SwitchPrimaryAgent {
2170 session_id,
2171 agent_id: "yolo".to_string(),
2172 },
2173 );
2174
2175 let updated = state.session_config.as_ref().expect("config");
2176 assert_eq!(
2177 updated.tool_config.approval_policy.default_behavior,
2178 UnapprovedBehavior::Deny
2179 );
2180 assert_eq!(
2181 updated.policy_overrides.approval_policy.default_behavior,
2182 Some(UnapprovedBehavior::Deny)
2183 );
2184 }
2185
2186 #[test]
2187 fn dispatch_agent_resume_is_auto_approved() {
2188 let mut state = test_state();
2189 let session_id = state.session_id;
2190 let config = base_session_config();
2191 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2192
2193 let tool_call = ToolCall {
2194 id: "tc_dispatch_resume".to_string(),
2195 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2196 parameters: json!({
2197 "prompt": "resume work",
2198 "target": {
2199 "session": "resume",
2200 "session_id": SessionId::new().to_string()
2201 }
2202 }),
2203 };
2204
2205 let decision = get_tool_decision(&state, &tool_call);
2206 assert_eq!(decision, ToolDecision::Allow);
2207 assert_eq!(state.session_id, session_id);
2208 }
2209
2210 #[test]
2211 fn test_switch_primary_agent_restores_base_prompt() {
2212 let mut state = test_state();
2213 let session_id = state.session_id;
2214 let mut config = base_session_config();
2215 config.system_prompt = Some("base prompt".to_string());
2216 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2217
2218 let _ = reduce(
2219 &mut state,
2220 Action::SwitchPrimaryAgent {
2221 session_id,
2222 agent_id: "plan".to_string(),
2223 },
2224 );
2225
2226 let _ = reduce(
2227 &mut state,
2228 Action::SwitchPrimaryAgent {
2229 session_id,
2230 agent_id: "normal".to_string(),
2231 },
2232 );
2233
2234 let updated = state.session_config.as_ref().expect("config");
2235 assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2236 }
2237
2238 #[test]
2239 fn test_switch_primary_agent_blocked_during_operation() {
2240 let mut state = test_state();
2241 let session_id = state.session_id;
2242 let config = base_session_config();
2243 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2244
2245 state.current_operation = Some(OperationState {
2246 op_id: OpId::new(),
2247 kind: OperationKind::AgentLoop,
2248 pending_tool_calls: HashSet::new(),
2249 });
2250
2251 let result = super::reduce(
2252 &mut state,
2253 Action::SwitchPrimaryAgent {
2254 session_id,
2255 agent_id: "plan".to_string(),
2256 },
2257 );
2258
2259 assert!(matches!(
2260 result,
2261 Err(ReduceError::InvalidAction {
2262 kind: InvalidActionKind::OperationInFlight,
2263 ..
2264 })
2265 ));
2266 assert!(state.primary_agent_id.as_deref() == Some("normal"));
2267 }
2268
2269 #[test]
2270 fn test_late_result_ignored_after_cancel() {
2271 let mut state = test_state();
2272 let session_id = state.session_id;
2273 let op_id = OpId::new();
2274 let tool_call_id = ToolCallId::from_string("tc_1");
2275
2276 state.current_operation = Some(OperationState {
2277 op_id,
2278 kind: OperationKind::AgentLoop,
2279 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2280 });
2281
2282 let _ = reduce(
2283 &mut state,
2284 Action::Cancel {
2285 session_id,
2286 op_id: None,
2287 },
2288 );
2289
2290 state.current_operation = Some(OperationState {
2291 op_id,
2292 kind: OperationKind::AgentLoop,
2293 pending_tool_calls: HashSet::new(),
2294 });
2295 state
2296 .operation_models
2297 .insert(op_id, builtin::claude_sonnet_4_5());
2298 state
2299 .operation_models
2300 .insert(op_id, builtin::claude_sonnet_4_5());
2301 state
2302 .operation_models
2303 .insert(op_id, builtin::claude_sonnet_4_5());
2304
2305 let effects = reduce(
2306 &mut state,
2307 Action::ToolResult {
2308 session_id,
2309 tool_call_id,
2310 tool_name: "test".to_string(),
2311 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2312 tool_name: "test".to_string(),
2313 payload: "done".to_string(),
2314 })),
2315 },
2316 );
2317
2318 assert!(effects.is_empty());
2319 }
2320
2321 #[test]
2322 fn test_pre_approved_tool_executes_immediately() {
2323 let mut state = test_state();
2324 let session_id = state.session_id;
2325 let op_id = OpId::new();
2326
2327 state.approved_tools.insert("test_tool".to_string());
2328 state.current_operation = Some(OperationState {
2329 op_id,
2330 kind: OperationKind::AgentLoop,
2331 pending_tool_calls: HashSet::new(),
2332 });
2333 state
2334 .operation_models
2335 .insert(op_id, builtin::claude_sonnet_4_5());
2336 state
2337 .operation_models
2338 .insert(op_id, builtin::claude_sonnet_4_5());
2339 state
2340 .operation_models
2341 .insert(op_id, builtin::claude_sonnet_4_5());
2342
2343 let tool_call = steer_tools::ToolCall {
2344 id: "tc_1".to_string(),
2345 name: "test_tool".to_string(),
2346 parameters: serde_json::json!({}),
2347 };
2348
2349 let effects = reduce(
2350 &mut state,
2351 Action::ToolApprovalRequested {
2352 session_id,
2353 request_id: RequestId::new(),
2354 tool_call,
2355 },
2356 );
2357
2358 assert!(
2359 effects
2360 .iter()
2361 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2362 );
2363 assert!(state.pending_approval.is_none());
2364 }
2365
2366 #[test]
2367 fn test_denied_tool_request_emits_failure_message() {
2368 let mut state = test_state();
2369 let session_id = state.session_id;
2370 let op_id = OpId::new();
2371
2372 state.current_operation = Some(OperationState {
2373 op_id,
2374 kind: OperationKind::AgentLoop,
2375 pending_tool_calls: HashSet::new(),
2376 });
2377
2378 state
2379 .operation_models
2380 .insert(op_id, builtin::claude_sonnet_4_5());
2381
2382 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2383 config.tool_config.approval_policy = ToolApprovalPolicy {
2384 default_behavior: UnapprovedBehavior::Deny,
2385 preapproved: ApprovalRules::default(),
2386 };
2387 state.session_config = Some(config);
2388
2389 let tool_call = steer_tools::ToolCall {
2390 id: "tc_1".to_string(),
2391 name: "test_tool".to_string(),
2392 parameters: serde_json::json!({}),
2393 };
2394
2395 let effects = reduce(
2396 &mut state,
2397 Action::ToolApprovalRequested {
2398 session_id,
2399 request_id: RequestId::new(),
2400 tool_call,
2401 },
2402 );
2403
2404 assert!(effects.iter().any(|e| matches!(
2405 e,
2406 Effect::EmitEvent {
2407 event: SessionEvent::ToolCallFailed { .. },
2408 ..
2409 }
2410 )));
2411 assert!(effects.iter().any(|e| matches!(
2412 e,
2413 Effect::EmitEvent {
2414 event: SessionEvent::ToolMessageAdded { .. },
2415 ..
2416 }
2417 )));
2418 assert!(
2419 !effects
2420 .iter()
2421 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2422 );
2423 assert!(
2424 !effects
2425 .iter()
2426 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2427 );
2428 assert!(state.pending_approval.is_none());
2429 assert!(state.approval_queue.is_empty());
2430 assert_eq!(state.message_graph.messages.len(), 1);
2431
2432 match &state.message_graph.messages[0].data {
2433 MessageData::Tool { result, .. } => match result {
2434 ToolResult::Error(error) => {
2435 assert!(
2436 matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2437 );
2438 }
2439 _ => panic!("expected denied tool error"),
2440 },
2441 _ => panic!("expected tool message"),
2442 }
2443 }
2444
2445 #[test]
2446 fn test_user_denied_tool_request_emits_failure_message() {
2447 let mut state = test_state();
2448 let session_id = state.session_id;
2449 let op_id = OpId::new();
2450
2451 state.current_operation = Some(OperationState {
2452 op_id,
2453 kind: OperationKind::AgentLoop,
2454 pending_tool_calls: HashSet::new(),
2455 });
2456 state
2457 .operation_models
2458 .insert(op_id, builtin::claude_sonnet_4_5());
2459
2460 let tool_call = steer_tools::ToolCall {
2461 id: "tc_1".to_string(),
2462 name: "test_tool".to_string(),
2463 parameters: serde_json::json!({}),
2464 };
2465 let request_id = RequestId::new();
2466 state.pending_approval = Some(PendingApproval {
2467 request_id,
2468 tool_call: tool_call.clone(),
2469 });
2470
2471 let effects = reduce(
2472 &mut state,
2473 Action::ToolApprovalDecided {
2474 session_id,
2475 request_id,
2476 decision: ApprovalDecision::Denied,
2477 remember: None,
2478 },
2479 );
2480
2481 assert!(effects.iter().any(|e| matches!(
2482 e,
2483 Effect::EmitEvent {
2484 event: SessionEvent::ToolCallFailed { .. },
2485 ..
2486 }
2487 )));
2488 assert!(effects.iter().any(|e| matches!(
2489 e,
2490 Effect::EmitEvent {
2491 event: SessionEvent::ToolMessageAdded { .. },
2492 ..
2493 }
2494 )));
2495 assert!(
2496 !effects
2497 .iter()
2498 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2499 );
2500 assert!(state.pending_approval.is_none());
2501 assert!(state.approval_queue.is_empty());
2502 assert_eq!(state.message_graph.messages.len(), 1);
2503
2504 match &state.message_graph.messages[0].data {
2505 MessageData::Tool { result, .. } => match result {
2506 ToolResult::Error(error) => {
2507 assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2508 }
2509 _ => panic!("expected denied tool error"),
2510 },
2511 _ => panic!("expected tool message"),
2512 }
2513 }
2514
2515 #[test]
2516 fn test_cancel_pops_queued_item_without_auto_start() {
2517 let mut state = test_state();
2518 let session_id = state.session_id;
2519 let op_id = OpId::new();
2520
2521 state.current_operation = Some(OperationState {
2522 op_id,
2523 kind: OperationKind::AgentLoop,
2524 pending_tool_calls: HashSet::new(),
2525 });
2526 state
2527 .operation_models
2528 .insert(op_id, builtin::claude_sonnet_4_5());
2529
2530 let queued_op = OpId::new();
2531 let queued_message_id = MessageId::from_string("queued_msg");
2532 let _ = reduce(
2533 &mut state,
2534 Action::UserInput {
2535 session_id,
2536 text: NonEmptyString::new("Queued message").expect("non-empty"),
2537 op_id: queued_op,
2538 message_id: queued_message_id.clone(),
2539 model: builtin::claude_sonnet_4_5(),
2540 timestamp: 1,
2541 },
2542 );
2543
2544 let effects = reduce(
2545 &mut state,
2546 Action::Cancel {
2547 session_id,
2548 op_id: None,
2549 },
2550 );
2551
2552 assert!(state.current_operation.is_none());
2553 assert!(state.queued_work.is_empty());
2554
2555 let cancellation_info = effects.iter().find_map(|effect| match effect {
2556 Effect::EmitEvent {
2557 event: SessionEvent::OperationCancelled { info, .. },
2558 ..
2559 } => Some(info),
2560 _ => None,
2561 });
2562 let info = cancellation_info.expect("expected OperationCancelled event");
2563 let popped = info
2564 .popped_queued_item
2565 .as_ref()
2566 .expect("expected popped queued item");
2567 assert_eq!(popped.content, "Queued message");
2568 assert_eq!(popped.op_id, queued_op);
2569 assert_eq!(popped.message_id, queued_message_id);
2570
2571 assert!(
2572 !effects.iter().any(|effect| matches!(
2573 effect,
2574 Effect::EmitEvent {
2575 event: SessionEvent::OperationStarted { .. },
2576 ..
2577 }
2578 )),
2579 "queued work should not auto-start on cancel"
2580 );
2581 }
2582
2583 #[test]
2584 fn test_cancel_injects_tool_results_for_pending_calls() {
2585 let mut state = test_state();
2586 let session_id = state.session_id;
2587 let op_id = OpId::new();
2588
2589 let tool_call = ToolCall {
2590 id: "tc_1".to_string(),
2591 name: "test_tool".to_string(),
2592 parameters: serde_json::json!({}),
2593 };
2594
2595 state.message_graph.add_message(Message {
2596 data: MessageData::Assistant {
2597 content: vec![AssistantContent::ToolCall {
2598 tool_call: tool_call.clone(),
2599 thought_signature: None,
2600 }],
2601 },
2602 timestamp: 0,
2603 id: "msg_1".to_string(),
2604 parent_message_id: None,
2605 });
2606
2607 state.current_operation = Some(OperationState {
2608 op_id,
2609 kind: OperationKind::AgentLoop,
2610 pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2611 });
2612 state
2613 .operation_models
2614 .insert(op_id, builtin::claude_sonnet_4_5());
2615
2616 let effects = reduce(
2617 &mut state,
2618 Action::Cancel {
2619 session_id,
2620 op_id: None,
2621 },
2622 );
2623
2624 assert!(effects.iter().any(|e| matches!(
2625 e,
2626 Effect::EmitEvent {
2627 event: SessionEvent::ToolMessageAdded { .. },
2628 ..
2629 }
2630 )));
2631
2632 let tool_message = state
2633 .message_graph
2634 .messages
2635 .iter()
2636 .find(|message| matches!(message.data, MessageData::Tool { .. }))
2637 .expect("tool result should be injected on cancel");
2638
2639 match &tool_message.data {
2640 MessageData::Tool { result, .. } => match result {
2641 ToolResult::Error(error) => {
2642 assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2643 }
2644 _ => panic!("expected cancelled tool error"),
2645 },
2646 _ => panic!("expected tool message"),
2647 }
2648 }
2649
2650 #[test]
2651 fn test_malformed_tool_call_auto_denies() {
2652 let mut state = test_state();
2653 let session_id = state.session_id;
2654 let op_id = OpId::new();
2655
2656 state.current_operation = Some(OperationState {
2657 op_id,
2658 kind: OperationKind::AgentLoop,
2659 pending_tool_calls: HashSet::new(),
2660 });
2661
2662 state
2663 .operation_models
2664 .insert(op_id, builtin::claude_sonnet_4_5());
2665
2666 let mut properties = serde_json::Map::new();
2667 properties.insert("command".to_string(), json!({ "type": "string" }));
2668
2669 state.tools.push(ToolSchema {
2670 name: "test_tool".to_string(),
2671 display_name: "test_tool".to_string(),
2672 description: String::new(),
2673 input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2674 });
2675
2676 let tool_call = ToolCall {
2677 id: "tc_1".to_string(),
2678 name: "test_tool".to_string(),
2679 parameters: json!({}),
2680 };
2681
2682 let effects = reduce(
2683 &mut state,
2684 Action::ToolApprovalRequested {
2685 session_id,
2686 request_id: RequestId::new(),
2687 tool_call,
2688 },
2689 );
2690
2691 assert!(effects.iter().any(|e| matches!(
2692 e,
2693 Effect::EmitEvent {
2694 event: SessionEvent::ToolCallFailed { .. },
2695 ..
2696 }
2697 )));
2698 assert!(effects.iter().any(|e| matches!(
2699 e,
2700 Effect::EmitEvent {
2701 event: SessionEvent::ToolMessageAdded { .. },
2702 ..
2703 }
2704 )));
2705 assert!(
2706 !effects
2707 .iter()
2708 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2709 );
2710 assert!(
2711 !effects
2712 .iter()
2713 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2714 );
2715 assert!(state.pending_approval.is_none());
2716 assert!(state.approval_queue.is_empty());
2717 assert_eq!(state.message_graph.messages.len(), 1);
2718
2719 match &state.message_graph.messages[0].data {
2720 MessageData::Tool { result, .. } => match result {
2721 ToolResult::Error(error) => {
2722 assert!(matches!(error, ToolError::InvalidParams { .. }));
2723 }
2724 _ => panic!("expected invalid params tool error"),
2725 },
2726 _ => panic!("expected tool message"),
2727 }
2728 }
2729
2730 #[test]
2731 fn test_approval_queuing() {
2732 let mut state = test_state();
2733 let session_id = state.session_id;
2734 let op_id = OpId::new();
2735
2736 state.current_operation = Some(OperationState {
2737 op_id,
2738 kind: OperationKind::AgentLoop,
2739 pending_tool_calls: HashSet::new(),
2740 });
2741
2742 let tool_call_1 = steer_tools::ToolCall {
2743 id: "tc_1".to_string(),
2744 name: "tool_1".to_string(),
2745 parameters: serde_json::json!({}),
2746 };
2747 let tool_call_2 = steer_tools::ToolCall {
2748 id: "tc_2".to_string(),
2749 name: "tool_2".to_string(),
2750 parameters: serde_json::json!({}),
2751 };
2752
2753 let _ = reduce(
2754 &mut state,
2755 Action::ToolApprovalRequested {
2756 session_id,
2757 request_id: RequestId::new(),
2758 tool_call: tool_call_1,
2759 },
2760 );
2761
2762 assert!(state.pending_approval.is_some());
2763
2764 let _ = reduce(
2765 &mut state,
2766 Action::ToolApprovalRequested {
2767 session_id,
2768 request_id: RequestId::new(),
2769 tool_call: tool_call_2,
2770 },
2771 );
2772
2773 assert_eq!(state.approval_queue.len(), 1);
2774 }
2775
2776 #[test]
2777 fn test_dispatch_agent_missing_target_auto_denies() {
2778 let mut state = test_state();
2779 let session_id = state.session_id;
2780 let op_id = OpId::new();
2781
2782 state.current_operation = Some(OperationState {
2783 op_id,
2784 kind: OperationKind::AgentLoop,
2785 pending_tool_calls: HashSet::new(),
2786 });
2787
2788 state
2789 .operation_models
2790 .insert(op_id, builtin::claude_sonnet_4_5());
2791
2792 let input_schema: InputSchema =
2793 schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
2794 state.tools.push(ToolSchema {
2795 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2796 display_name: "Dispatch Agent".to_string(),
2797 description: String::new(),
2798 input_schema,
2799 });
2800
2801 let tool_call = ToolCall {
2802 id: "tc_dispatch".to_string(),
2803 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2804 parameters: json!({ "prompt": "hello world" }),
2805 };
2806
2807 let effects = reduce(
2808 &mut state,
2809 Action::ToolApprovalRequested {
2810 session_id,
2811 request_id: RequestId::new(),
2812 tool_call,
2813 },
2814 );
2815
2816 assert!(effects.iter().any(|e| matches!(
2817 e,
2818 Effect::EmitEvent {
2819 event: SessionEvent::ToolCallFailed { .. },
2820 ..
2821 }
2822 )));
2823 assert!(effects.iter().any(|e| matches!(
2824 e,
2825 Effect::EmitEvent {
2826 event: SessionEvent::ToolMessageAdded { .. },
2827 ..
2828 }
2829 )));
2830 assert!(
2831 !effects
2832 .iter()
2833 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2834 );
2835 assert!(state.pending_approval.is_none());
2836 assert!(state.approval_queue.is_empty());
2837
2838 match &state.message_graph.messages[0].data {
2839 MessageData::Tool { result, .. } => match result {
2840 ToolResult::Error(error) => {
2841 assert!(matches!(error, ToolError::InvalidParams { .. }));
2842 }
2843 _ => panic!("expected invalid params tool error"),
2844 },
2845 _ => panic!("expected tool message"),
2846 }
2847 }
2848
2849 #[test]
2850 fn test_model_response_with_tool_calls_requests_approval() {
2851 let mut state = test_state();
2852 let session_id = state.session_id;
2853 let op_id = OpId::new();
2854 let message_id = MessageId::new();
2855
2856 state.current_operation = Some(OperationState {
2857 op_id,
2858 kind: OperationKind::AgentLoop,
2859 pending_tool_calls: HashSet::new(),
2860 });
2861 state
2862 .operation_models
2863 .insert(op_id, builtin::claude_sonnet_4_5());
2864
2865 let tool_call = steer_tools::ToolCall {
2866 id: "tc_1".to_string(),
2867 name: "bash".to_string(),
2868 parameters: serde_json::json!({"command": "ls"}),
2869 };
2870
2871 let content = vec![
2872 AssistantContent::Text {
2873 text: "Let me list the files.".to_string(),
2874 },
2875 AssistantContent::ToolCall {
2876 tool_call: tool_call.clone(),
2877 thought_signature: None,
2878 },
2879 ];
2880
2881 let effects = reduce(
2882 &mut state,
2883 Action::ModelResponseComplete {
2884 session_id,
2885 op_id,
2886 message_id,
2887 content,
2888 timestamp: 12345,
2889 },
2890 );
2891
2892 assert!(state.pending_approval.is_some());
2893 assert!(
2894 effects
2895 .iter()
2896 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2897 );
2898 assert!(state.current_operation.is_some());
2899 }
2900
2901 #[test]
2902 fn test_model_response_no_tools_completes_operation() {
2903 let mut state = test_state();
2904 let session_id = state.session_id;
2905 let op_id = OpId::new();
2906 let message_id = MessageId::new();
2907
2908 state.current_operation = Some(OperationState {
2909 op_id,
2910 kind: OperationKind::AgentLoop,
2911 pending_tool_calls: HashSet::new(),
2912 });
2913 state
2914 .operation_models
2915 .insert(op_id, builtin::claude_sonnet_4_5());
2916
2917 let content = vec![AssistantContent::Text {
2918 text: "Hello! How can I help?".to_string(),
2919 }];
2920
2921 let effects = reduce(
2922 &mut state,
2923 Action::ModelResponseComplete {
2924 session_id,
2925 op_id,
2926 message_id,
2927 content,
2928 timestamp: 12345,
2929 },
2930 );
2931
2932 assert!(state.current_operation.is_none());
2933 assert!(effects.iter().any(|e| matches!(
2934 e,
2935 Effect::EmitEvent {
2936 event: SessionEvent::OperationCompleted { .. },
2937 ..
2938 }
2939 )));
2940 }
2941
2942 #[test]
2943 fn test_out_of_order_completion_preserves_newer_operation() {
2944 let mut state = test_state();
2945 let session_id = state.session_id;
2946 let model = builtin::claude_sonnet_4_5();
2947
2948 let op_a = OpId::new();
2949 let op_b = OpId::new();
2950
2951 let _ = reduce(
2952 &mut state,
2953 Action::UserInput {
2954 session_id,
2955 text: NonEmptyString::new("first").unwrap(),
2956 op_id: op_a,
2957 message_id: MessageId::new(),
2958 model: model.clone(),
2959 timestamp: 1,
2960 },
2961 );
2962
2963 let _ = reduce(
2964 &mut state,
2965 Action::UserInput {
2966 session_id,
2967 text: NonEmptyString::new("second").unwrap(),
2968 op_id: op_b,
2969 message_id: MessageId::new(),
2970 model: model.clone(),
2971 timestamp: 2,
2972 },
2973 );
2974
2975 let _ = reduce(
2976 &mut state,
2977 Action::ModelResponseComplete {
2978 session_id,
2979 op_id: op_a,
2980 message_id: MessageId::new(),
2981 content: vec![AssistantContent::Text {
2982 text: "done A".to_string(),
2983 }],
2984 timestamp: 3,
2985 },
2986 );
2987
2988 assert!(
2989 state
2990 .current_operation
2991 .as_ref()
2992 .is_some_and(|op| op.op_id == op_b)
2993 );
2994 assert!(state.operation_models.contains_key(&op_b));
2995 assert!(!state.operation_models.contains_key(&op_a));
2996
2997 let effects = reduce(
2998 &mut state,
2999 Action::ModelResponseComplete {
3000 session_id,
3001 op_id: op_b,
3002 message_id: MessageId::new(),
3003 content: vec![AssistantContent::Text {
3004 text: "done B".to_string(),
3005 }],
3006 timestamp: 4,
3007 },
3008 );
3009
3010 assert!(effects.iter().any(|e| matches!(
3011 e,
3012 Effect::EmitEvent {
3013 event: SessionEvent::OperationCompleted { op_id },
3014 ..
3015 } if *op_id == op_b
3016 )));
3017 assert!(!effects.iter().any(|e| matches!(
3018 e,
3019 Effect::EmitEvent {
3020 event: SessionEvent::Error { message },
3021 ..
3022 } if message.contains("Missing model for operation")
3023 )));
3024 }
3025
3026 #[test]
3027 fn test_tool_approval_does_not_call_model_before_result() {
3028 let mut state = test_state();
3029 let session_id = state.session_id;
3030 let op_id = OpId::new();
3031
3032 state.current_operation = Some(OperationState {
3033 op_id,
3034 kind: OperationKind::AgentLoop,
3035 pending_tool_calls: HashSet::new(),
3036 });
3037 state
3038 .operation_models
3039 .insert(op_id, builtin::claude_sonnet_4_5());
3040
3041 let tool_call = steer_tools::ToolCall {
3042 id: "tc_1".to_string(),
3043 name: "bash".to_string(),
3044 parameters: serde_json::json!({"command": "ls"}),
3045 };
3046 let request_id = RequestId::new();
3047 state.pending_approval = Some(PendingApproval {
3048 request_id,
3049 tool_call: tool_call.clone(),
3050 });
3051
3052 let effects = reduce(
3053 &mut state,
3054 Action::ToolApprovalDecided {
3055 session_id,
3056 request_id,
3057 decision: ApprovalDecision::Approved,
3058 remember: None,
3059 },
3060 );
3061
3062 assert!(
3063 effects
3064 .iter()
3065 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3066 );
3067 assert!(
3068 !effects
3069 .iter()
3070 .any(|e| matches!(e, Effect::CallModel { .. }))
3071 );
3072 assert!(state.current_operation.as_ref().is_some_and(|op| {
3073 op.pending_tool_calls
3074 .contains(&ToolCallId::from_string("tc_1"))
3075 }));
3076 }
3077
3078 #[test]
3079 fn test_mcp_tool_visibility_and_disconnect_removal() {
3080 let mut state = test_state();
3081 let session_id = state.session_id;
3082
3083 let mut allowed = HashSet::new();
3084 allowed.insert("mcp__alpha__allowed".to_string());
3085
3086 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3087 config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3088 state.session_config = Some(config);
3089
3090 state.tools.push(test_schema("bash"));
3091
3092 let _ = reduce(
3093 &mut state,
3094 Action::McpServerStateChanged {
3095 session_id,
3096 server_name: "alpha".to_string(),
3097 state: McpServerState::Connected {
3098 tools: vec![
3099 test_schema("mcp__alpha__allowed"),
3100 test_schema("mcp__alpha__blocked"),
3101 ],
3102 },
3103 },
3104 );
3105
3106 assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3107 assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3108
3109 let _ = reduce(
3110 &mut state,
3111 Action::McpServerStateChanged {
3112 session_id,
3113 server_name: "alpha".to_string(),
3114 state: McpServerState::Disconnected { error: None },
3115 },
3116 );
3117
3118 assert!(
3119 !state
3120 .tools
3121 .iter()
3122 .any(|t| t.name.starts_with("mcp__alpha__"))
3123 );
3124 assert!(state.tools.iter().any(|t| t.name == "bash"));
3125 }
3126
3127 #[test]
3128 fn test_tool_result_continues_agent_loop() {
3129 let mut state = test_state();
3130 let session_id = state.session_id;
3131 let op_id = OpId::new();
3132 let tool_call_id = ToolCallId::from_string("tc_1");
3133
3134 state.current_operation = Some(OperationState {
3135 op_id,
3136 kind: OperationKind::AgentLoop,
3137 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3138 });
3139 state
3140 .operation_models
3141 .insert(op_id, builtin::claude_sonnet_4_5());
3142
3143 let effects = reduce(
3144 &mut state,
3145 Action::ToolResult {
3146 session_id,
3147 tool_call_id,
3148 tool_name: "bash".to_string(),
3149 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3150 tool_name: "bash".to_string(),
3151 payload: "file1.txt\nfile2.txt".to_string(),
3152 })),
3153 },
3154 );
3155
3156 assert!(
3157 effects
3158 .iter()
3159 .any(|e| matches!(e, Effect::CallModel { .. }))
3160 );
3161 }
3162
3163 #[test]
3164 fn test_tool_result_waits_for_pending_tools() {
3165 let mut state = test_state();
3166 let session_id = state.session_id;
3167 let op_id = OpId::new();
3168 let tool_call_id_1 = ToolCallId::from_string("tc_1");
3169 let tool_call_id_2 = ToolCallId::from_string("tc_2");
3170
3171 state.current_operation = Some(OperationState {
3172 op_id,
3173 kind: OperationKind::AgentLoop,
3174 pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3175 .into_iter()
3176 .collect(),
3177 });
3178 state
3179 .operation_models
3180 .insert(op_id, builtin::claude_sonnet_4_5());
3181
3182 let effects = reduce(
3183 &mut state,
3184 Action::ToolResult {
3185 session_id,
3186 tool_call_id: tool_call_id_1,
3187 tool_name: "bash".to_string(),
3188 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3189 tool_name: "bash".to_string(),
3190 payload: "done".to_string(),
3191 })),
3192 },
3193 );
3194
3195 assert!(
3196 !effects
3197 .iter()
3198 .any(|e| matches!(e, Effect::CallModel { .. }))
3199 );
3200
3201 let effects = reduce(
3202 &mut state,
3203 Action::ToolResult {
3204 session_id,
3205 tool_call_id: tool_call_id_2,
3206 tool_name: "bash".to_string(),
3207 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3208 tool_name: "bash".to_string(),
3209 payload: "done".to_string(),
3210 })),
3211 },
3212 );
3213
3214 assert!(
3215 effects
3216 .iter()
3217 .any(|e| matches!(e, Effect::CallModel { .. }))
3218 );
3219 }
3220}