1use crate::agents::default_agent_spec_id;
2use crate::api::provider::TokenUsage;
3use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
4
5use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
6
7use crate::app::domain::effect::{Effect, McpServerConfig};
8use crate::app::domain::event::{
9 CancellationInfo, ContextWindowUsage, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
10};
11use crate::app::domain::state::{
12 AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
13};
14use crate::primary_agents::{
15 default_primary_agent_id, primary_agent_spec, resolve_effective_config,
16};
17use crate::session::state::{BackendConfig, ToolDecision};
18
19use crate::app::domain::event::CompactTrigger;
20use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
21use serde_json::Value;
22use steer_tools::ToolError;
23use steer_tools::result::ToolResult;
24use steer_tools::tools::BASH_TOOL_NAME;
25use thiserror::Error;
26
27const MIN_MESSAGES_FOR_COMPACT: usize = 3;
28const COMPACTION_CONTINUE_PROMPT: &str =
29 "Continue from the compaction summary and resume the conversation.";
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum InvalidActionKind {
33 OperationInFlight,
34 MissingSessionConfig,
35 UnknownPrimaryAgent,
36 QueueEmpty,
37}
38
39#[derive(Debug, Error)]
40pub enum ReduceError {
41 #[error("{message}")]
42 InvalidAction {
43 message: String,
44 kind: InvalidActionKind,
45 },
46 #[error("Invariant violated: {message}")]
47 Invariant { message: String },
48}
49
50fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
51 ReduceError::InvalidAction {
52 message: message.into(),
53 kind,
54 }
55}
56
57fn derive_context_window_usage(
58 total_tokens: u32,
59 context_window_tokens: Option<u32>,
60 configured_max_output_tokens: Option<u32>,
61) -> Option<ContextWindowUsage> {
62 context_window_tokens.map(|max_context_tokens| {
63 let reserve = configured_max_output_tokens.unwrap_or(0);
64 let effective_context_tokens = max_context_tokens.saturating_sub(reserve);
65 let remaining_tokens = effective_context_tokens.saturating_sub(total_tokens);
66 let (utilization_ratio, estimated) = if effective_context_tokens > 0 {
67 let ratio =
68 (f64::from(total_tokens) / f64::from(effective_context_tokens)).clamp(0.0, 1.0);
69 (Some(ratio), false)
70 } else {
71 (None, true)
72 };
73
74 ContextWindowUsage {
75 max_context_tokens: Some(max_context_tokens),
76 remaining_tokens: Some(remaining_tokens),
77 utilization_ratio,
78 estimated,
79 }
80 })
81}
82
83pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
84 match action {
85 Action::UserInput {
86 session_id,
87 content,
88 op_id,
89 message_id,
90 model,
91 timestamp,
92 } => Ok(handle_user_input(
93 state, session_id, content, op_id, message_id, model, timestamp,
94 )),
95
96 Action::UserEditedMessage {
97 session_id,
98 message_id,
99 new_content,
100 op_id,
101 new_message_id,
102 model,
103 timestamp,
104 } => handle_user_edited_message(
105 state,
106 session_id,
107 UserEditedMessageParams {
108 original_message_id: message_id,
109 new_content,
110 op_id,
111 new_message_id,
112 model,
113 timestamp,
114 },
115 ),
116
117 Action::ToolApprovalRequested {
118 session_id,
119 request_id,
120 tool_call,
121 } => Ok(handle_tool_approval_requested(
122 state, session_id, request_id, tool_call,
123 )),
124
125 Action::ToolApprovalDecided {
126 session_id,
127 request_id,
128 decision,
129 remember,
130 } => Ok(handle_tool_approval_decided(
131 state, session_id, request_id, decision, remember,
132 )),
133
134 Action::ToolExecutionStarted {
135 session_id,
136 tool_call_id,
137 tool_name,
138 tool_parameters,
139 } => Ok(handle_tool_execution_started(
140 state,
141 session_id,
142 tool_call_id,
143 tool_name,
144 tool_parameters,
145 )),
146
147 Action::ToolResult {
148 session_id,
149 tool_call_id,
150 tool_name,
151 result,
152 } => Ok(handle_tool_result(
153 state,
154 session_id,
155 tool_call_id,
156 tool_name,
157 result,
158 )),
159
160 Action::ModelResponseComplete {
161 session_id,
162 op_id,
163 message_id,
164 content,
165 usage,
166 context_window_tokens,
167 configured_max_output_tokens,
168 timestamp,
169 } => Ok(handle_model_response_complete(
170 state,
171 session_id,
172 ModelResponseCompleteParams {
173 op_id,
174 message_id,
175 content,
176 usage,
177 context_window_tokens,
178 configured_max_output_tokens,
179 timestamp,
180 },
181 )),
182
183 Action::ModelResponseError {
184 session_id,
185 op_id,
186 error,
187 } => Ok(handle_model_response_error(
188 state, session_id, op_id, &error,
189 )),
190
191 Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
192
193 Action::DirectBashCommand {
194 session_id,
195 op_id,
196 message_id,
197 command,
198 timestamp,
199 } => Ok(handle_direct_bash(
200 state, session_id, op_id, message_id, command, timestamp,
201 )),
202
203 Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
204
205 Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
206
207 Action::RequestCompaction {
208 session_id,
209 op_id,
210 model,
211 } => handle_request_compaction(state, session_id, op_id, model),
212
213 Action::Hydrate {
214 session_id,
215 events,
216 starting_sequence,
217 } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
218
219 Action::WorkspaceFilesListed { files, .. } => {
220 state.workspace_files = files;
221 Ok(vec![])
222 }
223
224 Action::ToolSchemasAvailable { tools, .. } => {
225 state.tools = tools;
226 Ok(vec![])
227 }
228
229 Action::ToolSchemasUpdated { schemas, .. } => {
230 state.tools = schemas;
231 Ok(vec![])
232 }
233
234 Action::SwitchPrimaryAgent {
235 session_id,
236 agent_id,
237 } => handle_switch_primary_agent(state, session_id, agent_id),
238
239 Action::McpServerStateChanged {
240 session_id,
241 server_name,
242 state: new_state,
243 } => {
244 if let McpServerState::Connected { tools } = &new_state {
246 let tools = state.session_config.as_ref().map_or_else(
247 || tools.clone(),
248 |config| config.filter_tools_by_visibility(tools.clone()),
249 );
250
251 for tool in tools {
253 if !state.tools.iter().any(|t| t.name == tool.name) {
254 state.tools.push(tool.clone());
255 }
256 }
257 }
258
259 if matches!(
261 &new_state,
262 McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
263 ) {
264 let prefix = format!("mcp__{server_name}__");
265 state.tools.retain(|t| !t.name.starts_with(&prefix));
266 }
267
268 state
269 .mcp_servers
270 .insert(server_name.clone(), new_state.clone());
271 Ok(vec![Effect::EmitEvent {
272 session_id,
273 event: SessionEvent::McpServerStateChanged {
274 server_name,
275 state: new_state,
276 },
277 }])
278 }
279
280 Action::CompactionComplete {
281 session_id,
282 op_id,
283 compaction_id,
284 summary_message_id,
285 summary,
286 compacted_head_message_id,
287 previous_active_message_id,
288 model,
289 timestamp,
290 } => Ok(handle_compaction_complete(
291 state,
292 session_id,
293 CompactionCompleteParams {
294 op_id,
295 compaction_id,
296 summary_message_id,
297 summary,
298 compacted_head_message_id,
299 previous_active_message_id,
300 model_name: model,
301 timestamp,
302 },
303 )),
304
305 Action::CompactionFailed {
306 session_id,
307 op_id,
308 error,
309 } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
310
311 Action::Shutdown => Ok(vec![]),
312 }
313}
314
315fn handle_user_input(
316 state: &mut AppState,
317 session_id: crate::app::domain::types::SessionId,
318 content: Vec<UserContent>,
319 op_id: crate::app::domain::types::OpId,
320 message_id: crate::app::domain::types::MessageId,
321 model: crate::config::model::ModelId,
322 timestamp: u64,
323) -> Vec<Effect> {
324 let mut effects = Vec::new();
325
326 if state.has_active_operation() {
327 state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
328 content,
329 op_id,
330 message_id,
331 model,
332 queued_at: timestamp,
333 });
334 effects.push(Effect::EmitEvent {
335 session_id,
336 event: SessionEvent::QueueUpdated {
337 queue: snapshot_queue(state),
338 },
339 });
340 return effects;
341 }
342
343 let parent_id = state.message_graph.active_message_id.clone();
344
345 let message = Message {
346 data: MessageData::User { content },
347 timestamp,
348 id: message_id.0.clone(),
349 parent_message_id: parent_id,
350 };
351
352 state.message_graph.add_message(message.clone());
353 state.message_graph.active_message_id = Some(message_id.0.clone());
354
355 state.start_operation(op_id, OperationKind::AgentLoop);
356 state.operation_models.insert(op_id, model.clone());
357
358 effects.push(Effect::EmitEvent {
359 session_id,
360 event: SessionEvent::UserMessageAdded {
361 message: message.clone(),
362 },
363 });
364
365 effects.push(Effect::EmitEvent {
366 session_id,
367 event: SessionEvent::OperationStarted {
368 op_id,
369 kind: OperationKind::AgentLoop,
370 },
371 });
372
373 effects.push(Effect::CallModel {
374 session_id,
375 op_id,
376 model,
377 messages: state
378 .message_graph
379 .get_thread_messages()
380 .into_iter()
381 .cloned()
382 .collect(),
383 system_context: state.cached_system_context.clone(),
384 tools: state.tools.clone(),
385 });
386
387 effects
388}
389
390struct UserEditedMessageParams {
391 original_message_id: crate::app::domain::types::MessageId,
392 new_content: Vec<UserContent>,
393 op_id: crate::app::domain::types::OpId,
394 new_message_id: crate::app::domain::types::MessageId,
395 model: crate::config::model::ModelId,
396 timestamp: u64,
397}
398
399fn handle_user_edited_message(
400 state: &mut AppState,
401 session_id: crate::app::domain::types::SessionId,
402 params: UserEditedMessageParams,
403) -> Result<Vec<Effect>, ReduceError> {
404 let UserEditedMessageParams {
405 original_message_id,
406 new_content,
407 op_id,
408 new_message_id,
409 model,
410 timestamp,
411 } = params;
412 let mut effects = Vec::new();
413
414 if state.has_active_operation() {
415 return Err(invalid_action(
416 InvalidActionKind::OperationInFlight,
417 "Cannot edit message while an operation is active.",
418 ));
419 }
420
421 let parent_id = state
422 .message_graph
423 .messages
424 .iter()
425 .find(|m| m.id() == original_message_id.0)
426 .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
427
428 let message = Message {
429 data: MessageData::User {
430 content: new_content,
431 },
432 timestamp,
433 id: new_message_id.0.clone(),
434 parent_message_id: parent_id,
435 };
436
437 state.message_graph.add_message(message.clone());
438 state.message_graph.active_message_id = Some(new_message_id.0.clone());
439
440 state.start_operation(op_id, OperationKind::AgentLoop);
441 state.operation_models.insert(op_id, model.clone());
442
443 effects.push(Effect::EmitEvent {
444 session_id,
445 event: SessionEvent::UserMessageAdded {
446 message: message.clone(),
447 },
448 });
449
450 effects.push(Effect::EmitEvent {
451 session_id,
452 event: SessionEvent::OperationStarted {
453 op_id,
454 kind: OperationKind::AgentLoop,
455 },
456 });
457
458 effects.push(Effect::CallModel {
459 session_id,
460 op_id,
461 model,
462 messages: state
463 .message_graph
464 .get_thread_messages()
465 .into_iter()
466 .cloned()
467 .collect(),
468 system_context: state.cached_system_context.clone(),
469 tools: state.tools.clone(),
470 });
471
472 Ok(effects)
473}
474
475fn handle_tool_approval_requested(
476 state: &mut AppState,
477 session_id: crate::app::domain::types::SessionId,
478 request_id: crate::app::domain::types::RequestId,
479 tool_call: steer_tools::ToolCall,
480) -> Vec<Effect> {
481 let mut effects = Vec::new();
482
483 if let Err(error) = validate_tool_call(state, &tool_call) {
484 let error_message = error.to_string();
485 return fail_tool_call_without_execution(
486 state,
487 session_id,
488 tool_call,
489 error,
490 error_message,
491 "invalid",
492 true,
493 );
494 }
495
496 let decision = get_tool_decision(state, &tool_call);
497
498 match decision {
499 ToolDecision::Allow => {
500 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
501 return vec![Effect::EmitEvent {
502 session_id,
503 event: SessionEvent::Error {
504 message: "Tool approval requested without active operation".to_string(),
505 },
506 }];
507 };
508 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
509 &tool_call.id,
510 ));
511
512 effects.push(Effect::ExecuteTool {
513 session_id,
514 op_id,
515 tool_call,
516 });
517 }
518 ToolDecision::Deny => {
519 let error = ToolError::DeniedByPolicy(tool_call.name.clone());
520 let tool_name = tool_call.name.clone();
521 effects.extend(fail_tool_call_without_execution(
522 state,
523 session_id,
524 tool_call,
525 error,
526 format!("Tool '{tool_name}' denied by policy"),
527 "denied",
528 true,
529 ));
530 }
531 ToolDecision::Ask => {
532 if state.pending_approval.is_some() {
533 state.approval_queue.push_back(QueuedApproval { tool_call });
534 return effects;
535 }
536
537 state.pending_approval = Some(PendingApproval {
538 request_id,
539 tool_call: tool_call.clone(),
540 });
541
542 effects.push(Effect::EmitEvent {
543 session_id,
544 event: SessionEvent::ApprovalRequested {
545 request_id,
546 tool_call: tool_call.clone(),
547 },
548 });
549
550 effects.push(Effect::RequestUserApproval {
551 session_id,
552 request_id,
553 tool_call,
554 });
555 }
556 }
557
558 effects
559}
560
561fn validate_tool_call(
562 state: &AppState,
563 tool_call: &steer_tools::ToolCall,
564) -> Result<(), ToolError> {
565 if tool_call.name.trim().is_empty() {
566 return Err(ToolError::invalid_params(
567 "unknown",
568 "Malformed tool call: missing tool name",
569 ));
570 }
571
572 if tool_call.id.trim().is_empty() {
573 return Err(ToolError::invalid_params(
574 tool_call.name.clone(),
575 "Malformed tool call: missing tool call id",
576 ));
577 }
578
579 if state.tools.is_empty() {
580 return Ok(());
581 }
582
583 let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
584 return Err(ToolError::UnknownTool(tool_call.name.clone()));
585 };
586
587 validate_against_json_schema(
588 &tool_call.name,
589 schema.input_schema.as_value(),
590 &tool_call.parameters,
591 )
592}
593
594fn validate_against_json_schema(
595 tool_name: &str,
596 schema: &Value,
597 params: &Value,
598) -> Result<(), ToolError> {
599 let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
600 ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
601 })?;
602
603 if let Err(errors) = validator.validate(params) {
604 let message = errors
605 .into_iter()
606 .map(|error| error.to_string())
607 .next()
608 .unwrap_or_else(|| "Parameters do not match schema".to_string());
609 return Err(ToolError::invalid_params(tool_name.to_string(), message));
610 }
611
612 Ok(())
613}
614
615fn emit_tool_failure_message(
616 state: &mut AppState,
617 session_id: crate::app::domain::types::SessionId,
618 tool_call_id: &str,
619 tool_name: &str,
620 tool_error: ToolError,
621 event_error: String,
622 message_id_prefix: &str,
623) -> Vec<Effect> {
624 let mut effects = Vec::new();
625
626 let tool_result = ToolResult::Error(tool_error);
627 let parent_id = state.message_graph.active_message_id.clone();
628 let tool_message = Message {
629 data: MessageData::Tool {
630 tool_use_id: tool_call_id.to_string(),
631 result: tool_result,
632 },
633 timestamp: 0,
634 id: format!("{message_id_prefix}_{tool_call_id}"),
635 parent_message_id: parent_id,
636 };
637 state.message_graph.add_message(tool_message.clone());
638
639 if let Some(model) = state
640 .current_operation
641 .as_ref()
642 .and_then(|op| state.operation_models.get(&op.op_id).cloned())
643 {
644 effects.push(Effect::EmitEvent {
645 session_id,
646 event: SessionEvent::ToolCallFailed {
647 id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
648 name: tool_name.to_string(),
649 error: event_error,
650 model,
651 },
652 });
653 }
654
655 effects.push(Effect::EmitEvent {
656 session_id,
657 event: SessionEvent::ToolMessageAdded {
658 message: tool_message,
659 },
660 });
661
662 effects
663}
664
665fn fail_tool_call_without_execution(
666 state: &mut AppState,
667 session_id: crate::app::domain::types::SessionId,
668 tool_call: steer_tools::ToolCall,
669 tool_error: ToolError,
670 event_error: String,
671 message_id_prefix: &str,
672 call_model_if_ready: bool,
673) -> Vec<Effect> {
674 let mut effects = emit_tool_failure_message(
675 state,
676 session_id,
677 &tool_call.id,
678 &tool_call.name,
679 tool_error,
680 event_error,
681 message_id_prefix,
682 );
683
684 if !call_model_if_ready {
685 return effects;
686 }
687
688 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
689 effects.push(Effect::EmitEvent {
690 session_id,
691 event: SessionEvent::Error {
692 message: "Tool failure recorded without active operation".to_string(),
693 },
694 });
695 return effects;
696 };
697 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
698 model
699 } else {
700 effects.push(Effect::EmitEvent {
701 session_id,
702 event: SessionEvent::Error {
703 message: format!("Missing model for operation {op_id}"),
704 },
705 });
706 return effects;
707 };
708
709 let all_tools_complete = state
710 .current_operation
711 .as_ref()
712 .is_none_or(|op| op.pending_tool_calls.is_empty());
713 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
714
715 if all_tools_complete && no_pending_approvals {
716 effects.push(Effect::CallModel {
717 session_id,
718 op_id,
719 model,
720 messages: state
721 .message_graph
722 .get_thread_messages()
723 .into_iter()
724 .cloned()
725 .collect(),
726 system_context: state.cached_system_context.clone(),
727 tools: state.tools.clone(),
728 });
729 }
730
731 effects
732}
733
734fn handle_tool_approval_decided(
735 state: &mut AppState,
736 session_id: crate::app::domain::types::SessionId,
737 request_id: crate::app::domain::types::RequestId,
738 decision: ApprovalDecision,
739 remember: Option<ApprovalMemory>,
740) -> Vec<Effect> {
741 let mut effects = Vec::new();
742
743 let pending = match state.pending_approval.take() {
744 Some(p) if p.request_id == request_id => p,
745 other => {
746 state.pending_approval = other;
747 return effects;
748 }
749 };
750
751 let resolved_memory = if decision == ApprovalDecision::Approved {
752 match remember {
753 Some(ApprovalMemory::PendingTool) => {
754 Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
755 }
756 Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
757 Some(ApprovalMemory::BashPattern(pattern)) => {
758 Some(ApprovalMemory::BashPattern(pattern))
759 }
760 None => None,
761 }
762 } else {
763 None
764 };
765
766 effects.push(Effect::EmitEvent {
767 session_id,
768 event: SessionEvent::ApprovalDecided {
769 request_id,
770 decision,
771 remember: resolved_memory.clone(),
772 },
773 });
774
775 if decision == ApprovalDecision::Approved {
776 if let Some(ref memory) = resolved_memory {
777 match memory {
778 ApprovalMemory::Tool(name) => {
779 state.approved_tools.insert(name.clone());
780 }
781 ApprovalMemory::BashPattern(pattern) => {
782 state.approved_bash_patterns.insert(pattern.clone());
783 }
784 ApprovalMemory::PendingTool => {}
785 }
786 }
787
788 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
789 effects.push(Effect::EmitEvent {
790 session_id,
791 event: SessionEvent::Error {
792 message: "Tool approval decided without active operation".to_string(),
793 },
794 });
795 return effects;
796 };
797 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
798 &pending.tool_call.id,
799 ));
800
801 effects.push(Effect::ExecuteTool {
802 session_id,
803 op_id,
804 tool_call: pending.tool_call,
805 });
806 } else {
807 let tool_name = pending.tool_call.name.clone();
808 let error = ToolError::DeniedByUser(tool_name.clone());
809 effects.extend(fail_tool_call_without_execution(
810 state,
811 session_id,
812 pending.tool_call,
813 error,
814 format!("Tool '{tool_name}' denied by user"),
815 "denied",
816 false,
817 ));
818 }
819
820 effects.extend(process_next_queued_approval(state, session_id));
821
822 effects
823}
824
825fn process_next_queued_approval(
826 state: &mut AppState,
827 session_id: crate::app::domain::types::SessionId,
828) -> Vec<Effect> {
829 let mut effects = Vec::new();
830
831 while let Some(queued) = state.approval_queue.pop_front() {
832 let decision = get_tool_decision(state, &queued.tool_call);
833
834 match decision {
835 ToolDecision::Allow => {
836 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
837 effects.push(Effect::EmitEvent {
838 session_id,
839 event: SessionEvent::Error {
840 message: "Queued tool approval processed without active operation"
841 .to_string(),
842 },
843 });
844 state.approval_queue.push_front(queued);
845 break;
846 };
847 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
848 &queued.tool_call.id,
849 ));
850
851 effects.push(Effect::ExecuteTool {
852 session_id,
853 op_id,
854 tool_call: queued.tool_call,
855 });
856 }
857 ToolDecision::Deny => {
858 let tool_name = queued.tool_call.name.clone();
859 let error = ToolError::DeniedByPolicy(tool_name.clone());
860 effects.extend(fail_tool_call_without_execution(
861 state,
862 session_id,
863 queued.tool_call,
864 error,
865 format!("Tool '{tool_name}' denied by policy"),
866 "denied",
867 false,
868 ));
869 }
870 ToolDecision::Ask => {
871 let request_id = crate::app::domain::types::RequestId::new();
872 state.pending_approval = Some(PendingApproval {
873 request_id,
874 tool_call: queued.tool_call.clone(),
875 });
876
877 effects.push(Effect::EmitEvent {
878 session_id,
879 event: SessionEvent::ApprovalRequested {
880 request_id,
881 tool_call: queued.tool_call.clone(),
882 },
883 });
884
885 effects.push(Effect::RequestUserApproval {
886 session_id,
887 request_id,
888 tool_call: queued.tool_call,
889 });
890
891 break;
892 }
893 }
894 }
895
896 let all_tools_complete = state
897 .current_operation
898 .as_ref()
899 .is_none_or(|op| op.pending_tool_calls.is_empty());
900 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
901
902 if all_tools_complete
903 && no_pending_approvals
904 && let Some(op) = &state.current_operation
905 {
906 let op_id = op.op_id;
907 if let Some(model) = state.operation_models.get(&op_id).cloned() {
908 effects.push(Effect::CallModel {
909 session_id,
910 op_id,
911 model,
912 messages: state
913 .message_graph
914 .get_thread_messages()
915 .into_iter()
916 .cloned()
917 .collect(),
918 system_context: state.cached_system_context.clone(),
919 tools: state.tools.clone(),
920 });
921 }
922 }
923
924 effects
925}
926
927fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
928 if state.approved_tools.contains(&tool_call.name) {
929 return ToolDecision::Allow;
930 }
931
932 if tool_call.name == DISPATCH_AGENT_TOOL_NAME
933 && let Ok(params) =
934 serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
935 && let Some(config) = state.session_config.as_ref()
936 {
937 let policy = &config.tool_config.approval_policy;
938 match params.target {
939 DispatchAgentTarget::Resume { .. } => {
940 return ToolDecision::Allow;
941 }
942 DispatchAgentTarget::New { agent, .. } => {
943 let agent_id = agent
944 .as_deref()
945 .filter(|value| !value.trim().is_empty())
946 .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
947 if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
948 return ToolDecision::Allow;
949 }
950 }
951 }
952 }
953
954 if tool_call.name == BASH_TOOL_NAME
955 && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
956 tool_call.parameters.clone(),
957 )
958 && state.is_bash_pattern_approved(¶ms.command)
959 {
960 return ToolDecision::Allow;
961 }
962
963 state
964 .session_config
965 .as_ref()
966 .map_or(ToolDecision::Ask, |config| {
967 config
968 .tool_config
969 .approval_policy
970 .tool_decision(&tool_call.name)
971 })
972}
973
974fn handle_tool_execution_started(
975 state: &mut AppState,
976 session_id: crate::app::domain::types::SessionId,
977 tool_call_id: crate::app::domain::types::ToolCallId,
978 tool_name: String,
979 tool_parameters: serde_json::Value,
980) -> Vec<Effect> {
981 state.add_pending_tool_call(tool_call_id.clone());
982
983 let op_id = match state.current_operation.as_ref() {
984 Some(op) => op.op_id,
985 None => {
986 return vec![Effect::EmitEvent {
987 session_id,
988 event: SessionEvent::Error {
989 message: "Tool call started without active operation".to_string(),
990 },
991 }];
992 }
993 };
994
995 let is_direct_bash = matches!(
996 state.current_operation.as_ref().map(|op| &op.kind),
997 Some(OperationKind::DirectBash { .. })
998 );
999
1000 if is_direct_bash {
1001 return vec![];
1002 }
1003
1004 let model = match state.operation_models.get(&op_id).cloned() {
1005 Some(model) => model,
1006 None => {
1007 return vec![Effect::EmitEvent {
1008 session_id,
1009 event: SessionEvent::Error {
1010 message: format!("Missing model for tool call on operation {op_id}"),
1011 },
1012 }];
1013 }
1014 };
1015
1016 vec![Effect::EmitEvent {
1017 session_id,
1018 event: SessionEvent::ToolCallStarted {
1019 id: tool_call_id,
1020 name: tool_name,
1021 parameters: tool_parameters,
1022 model,
1023 },
1024 }]
1025}
1026
1027fn handle_tool_result(
1028 state: &mut AppState,
1029 session_id: crate::app::domain::types::SessionId,
1030 tool_call_id: crate::app::domain::types::ToolCallId,
1031 tool_name: String,
1032 result: Result<ToolResult, ToolError>,
1033) -> Vec<Effect> {
1034 let mut effects = Vec::new();
1035
1036 let op = match &state.current_operation {
1037 Some(op) => {
1038 if state.cancelled_ops.contains(&op.op_id) {
1039 tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
1040 return effects;
1041 }
1042 op.clone()
1043 }
1044 None => return effects,
1045 };
1046 let op_id = op.op_id;
1047
1048 state.remove_pending_tool_call(&tool_call_id);
1049
1050 let tool_result = match result {
1051 Ok(r) => r,
1052 Err(e) => ToolResult::Error(e),
1053 };
1054
1055 let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1056
1057 if is_direct_bash {
1058 let command = match &op.kind {
1059 OperationKind::DirectBash { command } => command.clone(),
1060 _ => tool_name,
1061 };
1062
1063 let (stdout, stderr, exit_code) = match &tool_result {
1064 ToolResult::Bash(result) => (
1065 result.stdout.clone(),
1066 result.stderr.clone(),
1067 result.exit_code,
1068 ),
1069 ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1070 other => (format!("{other:?}"), String::new(), 0),
1071 };
1072
1073 let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1074 state
1075 .message_graph
1076 .update_command_execution(
1077 id.as_str(),
1078 command.clone(),
1079 stdout.clone(),
1080 stderr.clone(),
1081 exit_code,
1082 )
1083 .or_else(|| {
1084 let parent_id = state.message_graph.active_message_id.clone();
1085 let timestamp = Message::current_timestamp();
1086 Some(Message {
1087 data: MessageData::User {
1088 content: vec![UserContent::CommandExecution {
1089 command: command.clone(),
1090 stdout: stdout.clone(),
1091 stderr: stderr.clone(),
1092 exit_code,
1093 }],
1094 },
1095 timestamp,
1096 id: id.to_string(),
1097 parent_message_id: parent_id,
1098 })
1099 })
1100 });
1101
1102 state.complete_operation(op_id);
1103
1104 if let Some(message) = updated {
1105 effects.push(Effect::EmitEvent {
1106 session_id,
1107 event: SessionEvent::MessageUpdated { message },
1108 });
1109 }
1110
1111 effects.push(Effect::EmitEvent {
1112 session_id,
1113 event: SessionEvent::OperationCompleted { op_id },
1114 });
1115
1116 effects.extend(maybe_start_queued_work(state, session_id));
1117
1118 return effects;
1119 }
1120
1121 let model = match state.operation_models.get(&op_id).cloned() {
1122 Some(model) => model,
1123 None => {
1124 return vec![Effect::EmitEvent {
1125 session_id,
1126 event: SessionEvent::Error {
1127 message: format!("Missing model for tool result on operation {op_id}"),
1128 },
1129 }];
1130 }
1131 };
1132
1133 let event = match &tool_result {
1134 ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1135 id: tool_call_id.clone(),
1136 name: tool_name.clone(),
1137 error: e.to_string(),
1138 model: model.clone(),
1139 },
1140 _ => SessionEvent::ToolCallCompleted {
1141 id: tool_call_id.clone(),
1142 name: tool_name,
1143 result: tool_result.clone(),
1144 model: model.clone(),
1145 },
1146 };
1147
1148 effects.push(Effect::EmitEvent { session_id, event });
1149
1150 let parent_id = state.message_graph.active_message_id.clone();
1151 let tool_message = Message {
1152 data: MessageData::Tool {
1153 tool_use_id: tool_call_id.0.clone(),
1154 result: tool_result,
1155 },
1156 timestamp: 0,
1157 id: format!("tool_result_{}", tool_call_id.0),
1158 parent_message_id: parent_id,
1159 };
1160 state.message_graph.add_message(tool_message.clone());
1161
1162 effects.push(Effect::EmitEvent {
1163 session_id,
1164 event: SessionEvent::ToolMessageAdded {
1165 message: tool_message,
1166 },
1167 });
1168
1169 let all_tools_complete = state
1170 .current_operation
1171 .as_ref()
1172 .is_none_or(|op| op.pending_tool_calls.is_empty());
1173 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1174
1175 if all_tools_complete && no_pending_approvals {
1176 effects.push(Effect::CallModel {
1177 session_id,
1178 op_id,
1179 model,
1180 messages: state
1181 .message_graph
1182 .get_thread_messages()
1183 .into_iter()
1184 .cloned()
1185 .collect(),
1186 system_context: state.cached_system_context.clone(),
1187 tools: state.tools.clone(),
1188 });
1189 }
1190
1191 effects
1192}
1193
1194struct ModelResponseCompleteParams {
1195 op_id: crate::app::domain::types::OpId,
1196 message_id: crate::app::domain::types::MessageId,
1197 content: Vec<AssistantContent>,
1198 usage: Option<TokenUsage>,
1199 context_window_tokens: Option<u32>,
1200 configured_max_output_tokens: Option<u32>,
1201 timestamp: u64,
1202}
1203
1204fn handle_model_response_complete(
1205 state: &mut AppState,
1206 session_id: crate::app::domain::types::SessionId,
1207 params: ModelResponseCompleteParams,
1208) -> Vec<Effect> {
1209 let ModelResponseCompleteParams {
1210 op_id,
1211 message_id,
1212 content,
1213 usage,
1214 context_window_tokens,
1215 configured_max_output_tokens,
1216 timestamp,
1217 } = params;
1218 let mut effects = Vec::new();
1219
1220 if state.cancelled_ops.contains(&op_id) {
1221 tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1222 return effects;
1223 }
1224
1225 let tool_calls: Vec<_> = content
1226 .iter()
1227 .filter_map(|c| {
1228 if let AssistantContent::ToolCall { tool_call, .. } = c {
1229 Some(tool_call.clone())
1230 } else {
1231 None
1232 }
1233 })
1234 .collect();
1235
1236 let parent_id = state.message_graph.active_message_id.clone();
1237
1238 let message = Message {
1239 data: MessageData::Assistant {
1240 content: content.clone(),
1241 },
1242 timestamp,
1243 id: message_id.0.clone(),
1244 parent_message_id: parent_id,
1245 };
1246
1247 state.message_graph.add_message(message.clone());
1248 state.message_graph.active_message_id = Some(message_id.0.clone());
1249
1250 let model = match state.operation_models.get(&op_id).cloned() {
1251 Some(model) => model,
1252 None => {
1253 return vec![Effect::EmitEvent {
1254 session_id,
1255 event: SessionEvent::Error {
1256 message: format!("Missing model for operation {op_id}"),
1257 },
1258 }];
1259 }
1260 };
1261
1262 effects.push(Effect::EmitEvent {
1263 session_id,
1264 event: SessionEvent::AssistantMessageAdded {
1265 message,
1266 model: model.clone(),
1267 },
1268 });
1269
1270 let outer_usage = usage;
1272
1273 if let Some(usage) = outer_usage {
1274 let context_window = derive_context_window_usage(
1275 usage.total_tokens,
1276 context_window_tokens,
1277 configured_max_output_tokens,
1278 );
1279 state.record_llm_usage(op_id, model.clone(), usage, context_window.clone());
1280 effects.push(Effect::EmitEvent {
1281 session_id,
1282 event: SessionEvent::LlmUsageUpdated {
1283 op_id,
1284 model: model.clone(),
1285 usage,
1286 context_window,
1287 },
1288 });
1289 }
1290
1291 if tool_calls.is_empty() {
1292 state.complete_operation(op_id);
1293 effects.push(Effect::EmitEvent {
1294 session_id,
1295 event: SessionEvent::OperationCompleted { op_id },
1296 });
1297 let auto = maybe_auto_compact(
1299 state,
1300 session_id,
1301 outer_usage,
1302 context_window_tokens,
1303 configured_max_output_tokens,
1304 &model,
1305 );
1306 if auto.is_empty() {
1307 effects.extend(maybe_start_queued_work(state, session_id));
1308 } else {
1309 effects.extend(auto);
1310 }
1311 } else {
1312 for tool_call in tool_calls {
1313 let request_id = crate::app::domain::types::RequestId::new();
1314 effects.extend(handle_tool_approval_requested(
1315 state, session_id, request_id, tool_call,
1316 ));
1317 }
1318 }
1319
1320 effects
1321}
1322
1323fn handle_model_response_error(
1324 state: &mut AppState,
1325 session_id: crate::app::domain::types::SessionId,
1326 op_id: crate::app::domain::types::OpId,
1327 error: &str,
1328) -> Vec<Effect> {
1329 let mut effects = Vec::new();
1330
1331 if state.cancelled_ops.contains(&op_id) {
1332 return effects;
1333 }
1334
1335 state.complete_operation(op_id);
1336
1337 effects.push(Effect::EmitEvent {
1338 session_id,
1339 event: SessionEvent::Error {
1340 message: error.to_string(),
1341 },
1342 });
1343
1344 effects.push(Effect::EmitEvent {
1345 session_id,
1346 event: SessionEvent::OperationCompleted { op_id },
1347 });
1348
1349 effects.extend(maybe_start_queued_work(state, session_id));
1350
1351 effects
1352}
1353
1354fn handle_direct_bash(
1355 state: &mut AppState,
1356 session_id: crate::app::domain::types::SessionId,
1357 op_id: crate::app::domain::types::OpId,
1358 message_id: crate::app::domain::types::MessageId,
1359 command: String,
1360 timestamp: u64,
1361) -> Vec<Effect> {
1362 let mut effects = Vec::new();
1363
1364 if state.has_active_operation() {
1365 state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1366 command,
1367 op_id,
1368 message_id,
1369 queued_at: timestamp,
1370 });
1371 effects.push(Effect::EmitEvent {
1372 session_id,
1373 event: SessionEvent::QueueUpdated {
1374 queue: snapshot_queue(state),
1375 },
1376 });
1377 return effects;
1378 }
1379
1380 let parent_id = state.message_graph.active_message_id.clone();
1381 let message = Message {
1382 data: MessageData::User {
1383 content: vec![UserContent::CommandExecution {
1384 command: command.clone(),
1385 stdout: String::new(),
1386 stderr: String::new(),
1387 exit_code: 0,
1388 }],
1389 },
1390 timestamp,
1391 id: message_id.0.clone(),
1392 parent_message_id: parent_id,
1393 };
1394
1395 state.message_graph.add_message(message.clone());
1396 state.message_graph.active_message_id = Some(message_id.0.clone());
1397
1398 state.start_operation(
1399 op_id,
1400 OperationKind::DirectBash {
1401 command: command.clone(),
1402 },
1403 );
1404 state.operation_messages.insert(op_id, message_id);
1405
1406 effects.push(Effect::EmitEvent {
1407 session_id,
1408 event: SessionEvent::UserMessageAdded { message },
1409 });
1410
1411 effects.push(Effect::EmitEvent {
1412 session_id,
1413 event: SessionEvent::OperationStarted {
1414 op_id,
1415 kind: OperationKind::DirectBash {
1416 command: command.clone(),
1417 },
1418 },
1419 });
1420
1421 let tool_call = steer_tools::ToolCall {
1422 id: format!("direct_bash_{op_id}"),
1423 name: BASH_TOOL_NAME.to_string(),
1424 parameters: serde_json::json!({ "command": command }),
1425 };
1426
1427 effects.push(Effect::ExecuteTool {
1428 session_id,
1429 op_id,
1430 tool_call,
1431 });
1432
1433 effects
1434}
1435
1436fn handle_dequeue_queued_item(
1437 state: &mut AppState,
1438 session_id: crate::app::domain::types::SessionId,
1439) -> Result<Vec<Effect>, ReduceError> {
1440 if state.pop_next_queued_work().is_some() {
1441 Ok(vec![Effect::EmitEvent {
1442 session_id,
1443 event: SessionEvent::QueueUpdated {
1444 queue: snapshot_queue(state),
1445 },
1446 }])
1447 } else {
1448 Err(invalid_action(
1449 InvalidActionKind::QueueEmpty,
1450 "No queued item to remove.",
1451 ))
1452 }
1453}
1454
1455fn maybe_auto_compact(
1456 state: &mut AppState,
1457 session_id: crate::app::domain::types::SessionId,
1458 usage: Option<TokenUsage>,
1459 context_window_tokens: Option<u32>,
1460 configured_max_output_tokens: Option<u32>,
1461 model: &crate::config::model::ModelId,
1462) -> Vec<Effect> {
1463 let config = match &state.session_config {
1465 Some(c) if c.auto_compaction.enabled => c,
1466 _ => return vec![],
1467 };
1468 let threshold = config.auto_compaction.threshold_ratio();
1469
1470 let usage = match usage {
1472 Some(u) => u,
1473 None => return vec![],
1474 };
1475
1476 let cw = match derive_context_window_usage(
1478 usage.total_tokens,
1479 context_window_tokens,
1480 configured_max_output_tokens,
1481 ) {
1482 Some(cw) if !cw.estimated => cw,
1483 _ => return vec![],
1484 };
1485 let ratio = match cw.utilization_ratio {
1486 Some(r) if r >= threshold => r,
1487 _ => return vec![],
1488 };
1489 let _ = ratio; if state.message_graph.get_thread_messages().len() < MIN_MESSAGES_FOR_COMPACT {
1493 return vec![];
1494 }
1495
1496 if !state.queued_work.is_empty() {
1498 return vec![];
1499 }
1500
1501 if state.has_active_operation() {
1503 return vec![];
1504 }
1505
1506 let op_id = crate::app::domain::types::OpId::new();
1507 let kind = OperationKind::Compact {
1508 trigger: CompactTrigger::Auto,
1509 };
1510 state.start_operation(op_id, kind.clone());
1511 state.operation_models.insert(op_id, model.clone());
1512
1513 vec![
1514 Effect::EmitEvent {
1515 session_id,
1516 event: SessionEvent::OperationStarted { op_id, kind },
1517 },
1518 Effect::RequestCompaction {
1519 session_id,
1520 op_id,
1521 model: model.clone(),
1522 },
1523 ]
1524}
1525
1526fn maybe_start_queued_work(
1527 state: &mut AppState,
1528 session_id: crate::app::domain::types::SessionId,
1529) -> Vec<Effect> {
1530 if state.has_active_operation() {
1531 return vec![];
1532 }
1533
1534 let Some(next) = state.pop_next_queued_work() else {
1535 return vec![];
1536 };
1537
1538 let mut effects = vec![Effect::EmitEvent {
1539 session_id,
1540 event: SessionEvent::QueueUpdated {
1541 queue: snapshot_queue(state),
1542 },
1543 }];
1544
1545 match next {
1546 QueuedWorkItem::UserMessage(item) => {
1547 effects.extend(handle_user_input(
1548 state,
1549 session_id,
1550 item.content,
1551 item.op_id,
1552 item.message_id,
1553 item.model,
1554 item.queued_at,
1555 ));
1556 }
1557 QueuedWorkItem::DirectBash(item) => {
1558 effects.extend(handle_direct_bash(
1559 state,
1560 session_id,
1561 item.op_id,
1562 item.message_id,
1563 item.command,
1564 item.queued_at,
1565 ));
1566 }
1567 }
1568
1569 effects
1570}
1571
1572fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1573 state
1574 .queued_work
1575 .iter()
1576 .map(snapshot_queued_work_item)
1577 .collect()
1578}
1579
1580fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1581 match item {
1582 QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1583 kind: Some(QueuedWorkKind::UserMessage),
1584 content: message
1585 .content
1586 .iter()
1587 .filter_map(|item| match item {
1588 UserContent::Text { text } => Some(text.as_str()),
1589 _ => None,
1590 })
1591 .collect::<String>(),
1592 queued_at: message.queued_at,
1593 model: Some(message.model.clone()),
1594 op_id: message.op_id,
1595 message_id: message.message_id.clone(),
1596 attachment_count: message
1597 .content
1598 .iter()
1599 .filter(|item| matches!(item, UserContent::Image { .. }))
1600 .count() as u32,
1601 },
1602 QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1603 kind: Some(QueuedWorkKind::DirectBash),
1604 content: command.command.clone(),
1605 queued_at: command.queued_at,
1606 model: None,
1607 op_id: command.op_id,
1608 message_id: command.message_id.clone(),
1609 attachment_count: 0,
1610 },
1611 }
1612}
1613
1614fn handle_request_compaction(
1615 state: &mut AppState,
1616 session_id: crate::app::domain::types::SessionId,
1617 op_id: crate::app::domain::types::OpId,
1618 model: crate::config::model::ModelId,
1619) -> Result<Vec<Effect>, ReduceError> {
1620 let message_count = state.message_graph.get_thread_messages().len();
1621
1622 if state.has_active_operation() {
1623 return Err(invalid_action(
1624 InvalidActionKind::OperationInFlight,
1625 "Cannot compact while an operation is active.",
1626 ));
1627 }
1628
1629 if message_count < MIN_MESSAGES_FOR_COMPACT {
1630 return Ok(vec![Effect::EmitEvent {
1631 session_id,
1632 event: SessionEvent::CompactResult {
1633 result: crate::app::domain::event::CompactResult::InsufficientMessages,
1634 trigger: CompactTrigger::Manual,
1635 },
1636 }]);
1637 }
1638
1639 let kind = OperationKind::Compact {
1640 trigger: CompactTrigger::Manual,
1641 };
1642 state.start_operation(op_id, kind.clone());
1643 state.operation_models.insert(op_id, model.clone());
1644
1645 Ok(vec![
1646 Effect::EmitEvent {
1647 session_id,
1648 event: SessionEvent::OperationStarted { op_id, kind },
1649 },
1650 Effect::RequestCompaction {
1651 session_id,
1652 op_id,
1653 model,
1654 },
1655 ])
1656}
1657
1658fn handle_cancel(
1659 state: &mut AppState,
1660 session_id: crate::app::domain::types::SessionId,
1661 target_op: Option<crate::app::domain::types::OpId>,
1662) -> Vec<Effect> {
1663 let mut effects = Vec::new();
1664
1665 let op = match &state.current_operation {
1666 Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1667 _ => return effects,
1668 };
1669
1670 state.record_cancelled_op(op.op_id);
1671
1672 if let OperationKind::Compact { trigger } = op.kind {
1673 effects.push(Effect::EmitEvent {
1674 session_id,
1675 event: SessionEvent::CompactResult {
1676 result: crate::app::domain::event::CompactResult::Cancelled,
1677 trigger,
1678 },
1679 });
1680 }
1681
1682 let pending_approval = state.pending_approval.take();
1683 let queued_approvals = std::mem::take(&mut state.approval_queue);
1684
1685 if matches!(op.kind, OperationKind::AgentLoop) {
1686 if let Some(pending) = pending_approval {
1687 let tool_name = pending.tool_call.name.clone();
1688 effects.extend(emit_tool_failure_message(
1689 state,
1690 session_id,
1691 &pending.tool_call.id,
1692 &tool_name,
1693 ToolError::Cancelled(tool_name.clone()),
1694 format!("Tool '{tool_name}' cancelled"),
1695 "cancelled",
1696 ));
1697 }
1698
1699 for queued in queued_approvals {
1700 let tool_name = queued.tool_call.name.clone();
1701 effects.extend(emit_tool_failure_message(
1702 state,
1703 session_id,
1704 &queued.tool_call.id,
1705 &tool_name,
1706 ToolError::Cancelled(tool_name.clone()),
1707 format!("Tool '{tool_name}' cancelled"),
1708 "cancelled",
1709 ));
1710 }
1711
1712 for tool_call_id in &op.pending_tool_calls {
1713 let tool_name = state
1714 .message_graph
1715 .find_tool_name_by_id(tool_call_id.as_str())
1716 .unwrap_or_else(|| tool_call_id.as_str().to_string());
1717 let event_error = if tool_name == tool_call_id.as_str() {
1718 format!("Tool call '{tool_call_id}' cancelled")
1719 } else {
1720 format!("Tool '{tool_name}' cancelled")
1721 };
1722 effects.extend(emit_tool_failure_message(
1723 state,
1724 session_id,
1725 tool_call_id.as_str(),
1726 &tool_name,
1727 ToolError::Cancelled(tool_name.clone()),
1728 event_error,
1729 "cancelled",
1730 ));
1731 }
1732 }
1733 state.active_streams.remove(&op.op_id);
1734
1735 let dequeued_item = state.pop_next_queued_work();
1736 let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1737
1738 effects.push(Effect::EmitEvent {
1739 session_id,
1740 event: SessionEvent::OperationCancelled {
1741 op_id: op.op_id,
1742 info: CancellationInfo {
1743 pending_tool_calls: op.pending_tool_calls.len(),
1744 popped_queued_item,
1745 },
1746 },
1747 });
1748
1749 effects.push(Effect::CancelOperation {
1750 session_id,
1751 op_id: op.op_id,
1752 });
1753
1754 state.complete_operation(op.op_id);
1755
1756 if dequeued_item.is_some() {
1757 effects.push(Effect::EmitEvent {
1758 session_id,
1759 event: SessionEvent::QueueUpdated {
1760 queue: snapshot_queue(state),
1761 },
1762 });
1763 }
1764
1765 effects
1766}
1767
1768fn handle_hydrate(
1769 state: &mut AppState,
1770 session_id: crate::app::domain::types::SessionId,
1771 events: Vec<SessionEvent>,
1772 starting_sequence: u64,
1773) -> Vec<Effect> {
1774 for event in events {
1775 apply_event_to_state(state, &event);
1776 }
1777
1778 state.event_sequence = starting_sequence;
1779
1780 emit_mcp_connect_effects(state, session_id)
1781}
1782
1783fn handle_switch_primary_agent(
1784 state: &mut AppState,
1785 session_id: crate::app::domain::types::SessionId,
1786 agent_id: String,
1787) -> Result<Vec<Effect>, ReduceError> {
1788 if state.current_operation.is_some() {
1789 return Err(invalid_action(
1790 InvalidActionKind::OperationInFlight,
1791 "Cannot switch primary agent while an operation is active.",
1792 ));
1793 }
1794
1795 let Some(base_config) = state
1796 .base_session_config
1797 .as_ref()
1798 .or(state.session_config.as_ref())
1799 else {
1800 return Err(invalid_action(
1801 InvalidActionKind::MissingSessionConfig,
1802 "Cannot switch primary agent without session config.",
1803 ));
1804 };
1805
1806 let Some(_spec) = primary_agent_spec(&agent_id) else {
1807 return Err(invalid_action(
1808 InvalidActionKind::UnknownPrimaryAgent,
1809 format!("Unknown primary agent '{agent_id}'."),
1810 ));
1811 };
1812
1813 let mut updated_config = base_config.clone();
1814 updated_config.primary_agent_id = Some(agent_id.clone());
1815 let new_config = resolve_effective_config(&updated_config);
1816 let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1817
1818 apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1819
1820 let mut effects = Vec::new();
1821 effects.push(Effect::EmitEvent {
1822 session_id,
1823 event: SessionEvent::SessionConfigUpdated {
1824 config: Box::new(new_config),
1825 primary_agent_id: agent_id,
1826 },
1827 });
1828 effects.extend(backend_effects);
1829 effects.push(Effect::ReloadToolSchemas { session_id });
1830
1831 Ok(effects)
1832}
1833
1834fn apply_session_config_state(
1835 state: &mut AppState,
1836 config: &crate::session::state::SessionConfig,
1837 primary_agent_id: Option<String>,
1838 update_base: bool,
1839) {
1840 state.apply_session_config(config, primary_agent_id, update_base);
1841}
1842
1843fn mcp_backend_diff_effects(
1844 session_id: crate::app::domain::types::SessionId,
1845 old_config: &crate::session::state::SessionConfig,
1846 new_config: &crate::session::state::SessionConfig,
1847) -> Vec<Effect> {
1848 let old_map = collect_mcp_backends(old_config);
1849 let new_map = collect_mcp_backends(new_config);
1850
1851 let mut effects = Vec::new();
1852
1853 for (server_name, (old_transport, old_filter)) in &old_map {
1854 match new_map.get(server_name) {
1855 None => {
1856 effects.push(Effect::DisconnectMcpServer {
1857 session_id,
1858 server_name: server_name.clone(),
1859 });
1860 }
1861 Some((new_transport, new_filter)) => {
1862 if new_transport != old_transport || new_filter != old_filter {
1863 effects.push(Effect::DisconnectMcpServer {
1864 session_id,
1865 server_name: server_name.clone(),
1866 });
1867 effects.push(Effect::ConnectMcpServer {
1868 session_id,
1869 config: McpServerConfig {
1870 server_name: server_name.clone(),
1871 transport: new_transport.clone(),
1872 tool_filter: new_filter.clone(),
1873 },
1874 });
1875 }
1876 }
1877 }
1878 }
1879
1880 for (server_name, (new_transport, new_filter)) in &new_map {
1881 if !old_map.contains_key(server_name) {
1882 effects.push(Effect::ConnectMcpServer {
1883 session_id,
1884 config: McpServerConfig {
1885 server_name: server_name.clone(),
1886 transport: new_transport.clone(),
1887 tool_filter: new_filter.clone(),
1888 },
1889 });
1890 }
1891 }
1892
1893 effects
1894}
1895
1896fn collect_mcp_backends(
1897 config: &crate::session::state::SessionConfig,
1898) -> std::collections::HashMap<
1899 String,
1900 (
1901 crate::tools::McpTransport,
1902 crate::session::state::ToolFilter,
1903 ),
1904> {
1905 let mut map = std::collections::HashMap::new();
1906
1907 for backend_config in &config.tool_config.backends {
1908 let BackendConfig::Mcp {
1909 server_name,
1910 transport,
1911 tool_filter,
1912 } = backend_config;
1913
1914 map.insert(
1915 server_name.clone(),
1916 (transport.clone(), tool_filter.clone()),
1917 );
1918 }
1919
1920 map
1921}
1922
1923pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1924 match event {
1925 SessionEvent::SessionCreated { config, .. } => {
1926 let primary_agent_id = config
1927 .primary_agent_id
1928 .clone()
1929 .unwrap_or_else(|| default_primary_agent_id().to_string());
1930 apply_session_config_state(state, config, Some(primary_agent_id), true);
1931 }
1932 SessionEvent::SessionConfigUpdated {
1933 config,
1934 primary_agent_id,
1935 } => {
1936 apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1937 }
1938 SessionEvent::AssistantMessageAdded { message, .. }
1939 | SessionEvent::UserMessageAdded { message }
1940 | SessionEvent::ToolMessageAdded { message } => {
1941 state.message_graph.add_message(message.clone());
1942 state.message_graph.active_message_id = Some(message.id().to_string());
1943 }
1944 SessionEvent::MessageUpdated { message } => {
1945 state.message_graph.replace_message(message.clone());
1946 }
1947 SessionEvent::ApprovalDecided {
1948 decision, remember, ..
1949 } => {
1950 if *decision == ApprovalDecision::Approved
1951 && let Some(memory) = remember
1952 {
1953 match memory {
1954 ApprovalMemory::Tool(name) => {
1955 state.approved_tools.insert(name.clone());
1956 }
1957 ApprovalMemory::BashPattern(pattern) => {
1958 state.approved_bash_patterns.insert(pattern.clone());
1959 }
1960 ApprovalMemory::PendingTool => {}
1961 }
1962 }
1963 state.pending_approval = None;
1964 }
1965 SessionEvent::OperationCompleted { op_id } => {
1966 state.complete_operation(*op_id);
1967 }
1968 SessionEvent::OperationCancelled { op_id, .. } => {
1969 state.record_cancelled_op(*op_id);
1970 state.complete_operation(*op_id);
1971 }
1972 SessionEvent::LlmUsageUpdated {
1973 op_id,
1974 model,
1975 usage,
1976 context_window,
1977 } => {
1978 state.record_llm_usage(*op_id, model.clone(), *usage, context_window.clone());
1979 }
1980 SessionEvent::McpServerStateChanged {
1981 server_name,
1982 state: mcp_state,
1983 } => {
1984 state
1985 .mcp_servers
1986 .insert(server_name.clone(), mcp_state.clone());
1987 }
1988 SessionEvent::QueueUpdated { queue } => {
1989 let parse_content = |content: &str| {
1990 if content.trim().is_empty() {
1991 None
1992 } else {
1993 Some(vec![UserContent::Text {
1994 text: content.to_string(),
1995 }])
1996 }
1997 };
1998
1999 state.queued_work = queue
2000 .iter()
2001 .filter_map(|item| match item.kind {
2002 Some(QueuedWorkKind::UserMessage) => {
2003 let content = parse_content(item.content.as_str())?;
2004 Some(QueuedWorkItem::UserMessage(
2005 crate::app::domain::state::QueuedUserMessage {
2006 content,
2007 op_id: item.op_id,
2008 message_id: item.message_id.clone(),
2009 model: item.model.clone().unwrap_or_else(
2010 crate::config::model::builtin::claude_sonnet_4_5,
2011 ),
2012 queued_at: item.queued_at,
2013 },
2014 ))
2015 }
2016 Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
2017 crate::app::domain::state::QueuedBashCommand {
2018 command: item.content.clone(),
2019 op_id: item.op_id,
2020 message_id: item.message_id.clone(),
2021 queued_at: item.queued_at,
2022 },
2023 )),
2024 None => {
2025 let content = parse_content(item.content.as_str())?;
2026 Some(QueuedWorkItem::UserMessage(
2027 crate::app::domain::state::QueuedUserMessage {
2028 content,
2029 op_id: item.op_id,
2030 message_id: item.message_id.clone(),
2031 model: item.model.clone().unwrap_or_else(
2032 crate::config::model::builtin::claude_sonnet_4_5,
2033 ),
2034 queued_at: item.queued_at,
2035 },
2036 ))
2037 }
2038 })
2039 .collect();
2040 }
2041 SessionEvent::ConversationCompacted { record } => {
2042 state
2043 .compaction_summary_ids
2044 .insert(record.summary_message_id.to_string());
2045 state
2046 .message_graph
2047 .mark_compaction_summary(record.summary_message_id.to_string());
2048 }
2049 _ => {}
2050 }
2051
2052 state.event_sequence += 1;
2053}
2054
2055fn maybe_continue_after_compaction(
2056 state: &mut AppState,
2057 session_id: crate::app::domain::types::SessionId,
2058 trigger: CompactTrigger,
2059 model: &crate::config::model::ModelId,
2060) -> Vec<Effect> {
2061 if trigger != CompactTrigger::Auto || state.has_active_operation() {
2062 return vec![];
2063 }
2064
2065 let op_id = crate::app::domain::types::OpId::new();
2066 state.start_operation(op_id, OperationKind::AgentLoop);
2067 state.operation_models.insert(op_id, model.clone());
2068
2069 let mut messages: Vec<Message> = state
2070 .message_graph
2071 .get_thread_messages()
2072 .into_iter()
2073 .cloned()
2074 .collect();
2075 messages.push(Message {
2076 data: MessageData::User {
2077 content: vec![UserContent::Text {
2078 text: COMPACTION_CONTINUE_PROMPT.to_string(),
2079 }],
2080 },
2081 timestamp: Message::current_timestamp(),
2082 id: format!("compact_continue_{op_id}"),
2083 parent_message_id: state.message_graph.active_message_id.clone(),
2084 });
2085
2086 vec![
2087 Effect::EmitEvent {
2088 session_id,
2089 event: SessionEvent::OperationStarted {
2090 op_id,
2091 kind: OperationKind::AgentLoop,
2092 },
2093 },
2094 Effect::CallModel {
2095 session_id,
2096 op_id,
2097 model: model.clone(),
2098 messages,
2099 system_context: state.cached_system_context.clone(),
2100 tools: state.tools.clone(),
2101 },
2102 ]
2103}
2104
2105struct CompactionCompleteParams {
2106 op_id: crate::app::domain::types::OpId,
2107 compaction_id: crate::app::domain::types::CompactionId,
2108 summary_message_id: crate::app::domain::types::MessageId,
2109 summary: String,
2110 compacted_head_message_id: crate::app::domain::types::MessageId,
2111 previous_active_message_id: Option<crate::app::domain::types::MessageId>,
2112 model_name: String,
2113 timestamp: u64,
2114}
2115
2116fn handle_compaction_complete(
2117 state: &mut AppState,
2118 session_id: crate::app::domain::types::SessionId,
2119 params: CompactionCompleteParams,
2120) -> Vec<Effect> {
2121 use crate::app::conversation::{AssistantContent, Message, MessageData};
2122 use crate::app::domain::types::CompactionRecord;
2123
2124 let CompactionCompleteParams {
2125 op_id,
2126 compaction_id,
2127 summary_message_id,
2128 summary,
2129 compacted_head_message_id,
2130 previous_active_message_id,
2131 model_name,
2132 timestamp,
2133 } = params;
2134
2135 let summary_message = Message {
2136 data: MessageData::Assistant {
2137 content: vec![AssistantContent::Text {
2138 text: summary.clone(),
2139 }],
2140 },
2141 id: summary_message_id.to_string(),
2142 parent_message_id: None,
2143 timestamp,
2144 };
2145
2146 state.message_graph.add_message(summary_message.clone());
2147
2148 state
2150 .compaction_summary_ids
2151 .insert(summary_message_id.to_string());
2152 state
2153 .message_graph
2154 .mark_compaction_summary(summary_message_id.to_string());
2155
2156 let record = CompactionRecord::with_timestamp(
2157 compaction_id,
2158 summary_message_id,
2159 compacted_head_message_id,
2160 previous_active_message_id,
2161 model_name,
2162 timestamp,
2163 );
2164
2165 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
2166 model
2167 } else {
2168 state.complete_operation(op_id);
2169 return vec![Effect::EmitEvent {
2170 session_id,
2171 event: SessionEvent::Error {
2172 message: format!("Missing model for compaction operation {op_id}"),
2173 },
2174 }];
2175 };
2176
2177 let trigger = state
2178 .current_operation
2179 .as_ref()
2180 .and_then(|op| match op.kind {
2181 OperationKind::Compact { trigger } => Some(trigger),
2182 _ => None,
2183 })
2184 .unwrap_or(CompactTrigger::Manual);
2185
2186 state.complete_operation(op_id);
2187
2188 let mut effects = vec![
2189 Effect::EmitEvent {
2190 session_id,
2191 event: SessionEvent::AssistantMessageAdded {
2192 message: summary_message,
2193 model: model.clone(),
2194 },
2195 },
2196 Effect::EmitEvent {
2197 session_id,
2198 event: SessionEvent::CompactResult {
2199 result: crate::app::domain::event::CompactResult::Success(summary),
2200 trigger,
2201 },
2202 },
2203 Effect::EmitEvent {
2204 session_id,
2205 event: SessionEvent::ConversationCompacted { record },
2206 },
2207 Effect::EmitEvent {
2208 session_id,
2209 event: SessionEvent::OperationCompleted { op_id },
2210 },
2211 ];
2212
2213 effects.extend(maybe_start_queued_work(state, session_id));
2214 effects.extend(maybe_continue_after_compaction(
2215 state, session_id, trigger, &model,
2216 ));
2217
2218 effects
2219}
2220
2221fn handle_compaction_failed(
2222 state: &mut AppState,
2223 session_id: crate::app::domain::types::SessionId,
2224 op_id: crate::app::domain::types::OpId,
2225 error: String,
2226) -> Vec<Effect> {
2227 let trigger = state
2228 .current_operation
2229 .as_ref()
2230 .and_then(|op| match op.kind {
2231 OperationKind::Compact { trigger } => Some(trigger),
2232 _ => None,
2233 })
2234 .unwrap_or(CompactTrigger::Manual);
2235
2236 state.complete_operation(op_id);
2237
2238 let mut effects = vec![
2239 Effect::EmitEvent {
2240 session_id,
2241 event: SessionEvent::CompactResult {
2242 result: crate::app::domain::event::CompactResult::Failed(error),
2243 trigger,
2244 },
2245 },
2246 Effect::EmitEvent {
2247 session_id,
2248 event: SessionEvent::OperationCompleted { op_id },
2249 },
2250 ];
2251
2252 effects.extend(maybe_start_queued_work(state, session_id));
2253
2254 effects
2255}
2256
2257fn emit_mcp_connect_effects(
2258 state: &AppState,
2259 session_id: crate::app::domain::types::SessionId,
2260) -> Vec<Effect> {
2261 let mut effects = Vec::new();
2262
2263 let Some(ref config) = state.session_config else {
2264 return effects;
2265 };
2266
2267 for backend_config in &config.tool_config.backends {
2268 let BackendConfig::Mcp {
2269 server_name,
2270 transport,
2271 tool_filter,
2272 } = backend_config;
2273
2274 let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2275 matches!(
2276 s,
2277 McpServerState::Connecting | McpServerState::Connected { .. }
2278 )
2279 });
2280
2281 if !already_connected {
2282 effects.push(Effect::ConnectMcpServer {
2283 session_id,
2284 config: McpServerConfig {
2285 server_name: server_name.clone(),
2286 transport: transport.clone(),
2287 tool_filter: tool_filter.clone(),
2288 },
2289 });
2290 }
2291 }
2292
2293 effects
2294}
2295
2296#[cfg(test)]
2297mod tests {
2298 use super::*;
2299 use crate::api::provider::TokenUsage;
2300 use crate::app::domain::event::ContextWindowUsage;
2301 use crate::app::domain::state::{OperationState, PendingApproval};
2302 use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2303 use crate::config::model::builtin;
2304 use crate::primary_agents::resolve_effective_config;
2305 use crate::session::state::{
2306 ApprovalRules, SessionConfig, SessionPolicyOverrides, ToolApprovalPolicy, ToolVisibility,
2307 UnapprovedBehavior,
2308 };
2309 use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2310 use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2311 use schemars::schema_for;
2312 use serde_json::json;
2313 use std::collections::HashSet;
2314 use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2315
2316 fn test_state() -> AppState {
2317 AppState::new(SessionId::new())
2318 }
2319
2320 fn test_schema(name: &str) -> ToolSchema {
2321 ToolSchema {
2322 name: name.to_string(),
2323 display_name: name.to_string(),
2324 description: String::new(),
2325 input_schema: InputSchema::empty_object(),
2326 }
2327 }
2328
2329 fn base_session_config() -> SessionConfig {
2330 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2331 config.primary_agent_id = Some("normal".to_string());
2332 config.policy_overrides = SessionPolicyOverrides::empty();
2333 resolve_effective_config(&config)
2334 }
2335
2336 fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2337 super::reduce(state, action).expect("reduce failed")
2338 }
2339
2340 #[test]
2341 fn test_user_input_starts_operation() {
2342 let mut state = test_state();
2343 let session_id = state.session_id;
2344 let op_id = OpId::new();
2345 let message_id = MessageId::new();
2346 let model = builtin::claude_sonnet_4_5();
2347
2348 let effects = reduce(
2349 &mut state,
2350 Action::UserInput {
2351 session_id,
2352 content: vec![UserContent::Text {
2353 text: "Hello".to_string(),
2354 }],
2355 op_id,
2356 message_id,
2357 model,
2358 timestamp: 1_234_567_890,
2359 },
2360 );
2361
2362 assert_eq!(state.message_graph.messages.len(), 1);
2363 assert!(state.current_operation.is_some());
2364 assert!(
2365 effects
2366 .iter()
2367 .any(|e| matches!(e, Effect::CallModel { .. }))
2368 );
2369 }
2370
2371 #[test]
2372 fn test_switch_primary_agent_updates_visibility() {
2373 let mut state = test_state();
2374 let session_id = state.session_id;
2375 let config = base_session_config();
2376 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2377
2378 let effects = reduce(
2379 &mut state,
2380 Action::SwitchPrimaryAgent {
2381 session_id,
2382 agent_id: "plan".to_string(),
2383 },
2384 );
2385
2386 let updated = state.session_config.as_ref().expect("config");
2387 match &updated.tool_config.visibility {
2388 ToolVisibility::Whitelist(allowed) => {
2389 assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2390 for name in READ_ONLY_TOOL_NAMES {
2391 assert!(allowed.contains(*name));
2392 }
2393 assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2394 }
2395 other => panic!("Unexpected tool visibility: {other:?}"),
2396 }
2397 assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2398 assert!(effects.iter().any(|e| matches!(
2399 e,
2400 Effect::EmitEvent {
2401 event: SessionEvent::SessionConfigUpdated { .. },
2402 ..
2403 }
2404 )));
2405 assert!(
2406 effects
2407 .iter()
2408 .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2409 );
2410 }
2411
2412 #[test]
2413 fn test_switch_primary_agent_yolo_auto_approves() {
2414 let mut state = test_state();
2415 let session_id = state.session_id;
2416 let config = base_session_config();
2417 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2418
2419 let _ = reduce(
2420 &mut state,
2421 Action::SwitchPrimaryAgent {
2422 session_id,
2423 agent_id: "yolo".to_string(),
2424 },
2425 );
2426
2427 let updated = state.session_config.as_ref().expect("config");
2428 assert_eq!(
2429 updated.tool_config.approval_policy.default_behavior,
2430 UnapprovedBehavior::Allow
2431 );
2432 }
2433
2434 #[test]
2435 fn dispatch_agent_resume_is_auto_approved() {
2436 let mut state = test_state();
2437 let session_id = state.session_id;
2438 let config = base_session_config();
2439 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2440
2441 let tool_call = ToolCall {
2442 id: "tc_dispatch_resume".to_string(),
2443 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2444 parameters: json!({
2445 "prompt": "resume work",
2446 "target": {
2447 "session": "resume",
2448 "session_id": SessionId::new().to_string()
2449 }
2450 }),
2451 };
2452
2453 let decision = get_tool_decision(&state, &tool_call);
2454 assert_eq!(decision, ToolDecision::Allow);
2455 assert_eq!(state.session_id, session_id);
2456 }
2457
2458 #[test]
2459 fn test_switch_primary_agent_restores_base_prompt() {
2460 let mut state = test_state();
2461 let session_id = state.session_id;
2462 let mut config = base_session_config();
2463 config.system_prompt = Some("base prompt".to_string());
2464 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2465
2466 let _ = reduce(
2467 &mut state,
2468 Action::SwitchPrimaryAgent {
2469 session_id,
2470 agent_id: "plan".to_string(),
2471 },
2472 );
2473
2474 let _ = reduce(
2475 &mut state,
2476 Action::SwitchPrimaryAgent {
2477 session_id,
2478 agent_id: "normal".to_string(),
2479 },
2480 );
2481
2482 let updated = state.session_config.as_ref().expect("config");
2483 assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2484 }
2485
2486 #[test]
2487 fn test_switch_primary_agent_blocked_during_operation() {
2488 let mut state = test_state();
2489 let session_id = state.session_id;
2490 let config = base_session_config();
2491 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2492
2493 state.current_operation = Some(OperationState {
2494 op_id: OpId::new(),
2495 kind: OperationKind::AgentLoop,
2496 pending_tool_calls: HashSet::new(),
2497 });
2498
2499 let result = super::reduce(
2500 &mut state,
2501 Action::SwitchPrimaryAgent {
2502 session_id,
2503 agent_id: "plan".to_string(),
2504 },
2505 );
2506
2507 assert!(matches!(
2508 result,
2509 Err(ReduceError::InvalidAction {
2510 kind: InvalidActionKind::OperationInFlight,
2511 ..
2512 })
2513 ));
2514 assert!(state.primary_agent_id.as_deref() == Some("normal"));
2515 }
2516
2517 #[test]
2518 fn test_late_result_ignored_after_cancel() {
2519 let mut state = test_state();
2520 let session_id = state.session_id;
2521 let op_id = OpId::new();
2522 let tool_call_id = ToolCallId::from_string("tc_1");
2523
2524 state.current_operation = Some(OperationState {
2525 op_id,
2526 kind: OperationKind::AgentLoop,
2527 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2528 });
2529
2530 let _ = reduce(
2531 &mut state,
2532 Action::Cancel {
2533 session_id,
2534 op_id: None,
2535 },
2536 );
2537
2538 state.current_operation = Some(OperationState {
2539 op_id,
2540 kind: OperationKind::AgentLoop,
2541 pending_tool_calls: HashSet::new(),
2542 });
2543 state
2544 .operation_models
2545 .insert(op_id, builtin::claude_sonnet_4_5());
2546 state
2547 .operation_models
2548 .insert(op_id, builtin::claude_sonnet_4_5());
2549 state
2550 .operation_models
2551 .insert(op_id, builtin::claude_sonnet_4_5());
2552
2553 let effects = reduce(
2554 &mut state,
2555 Action::ToolResult {
2556 session_id,
2557 tool_call_id,
2558 tool_name: "test".to_string(),
2559 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2560 tool_name: "test".to_string(),
2561 payload: "done".to_string(),
2562 })),
2563 },
2564 );
2565
2566 assert!(effects.is_empty());
2567 }
2568
2569 #[test]
2570 fn test_pre_approved_tool_executes_immediately() {
2571 let mut state = test_state();
2572 let session_id = state.session_id;
2573 let op_id = OpId::new();
2574
2575 state.approved_tools.insert("test_tool".to_string());
2576 state.current_operation = Some(OperationState {
2577 op_id,
2578 kind: OperationKind::AgentLoop,
2579 pending_tool_calls: HashSet::new(),
2580 });
2581 state
2582 .operation_models
2583 .insert(op_id, builtin::claude_sonnet_4_5());
2584 state
2585 .operation_models
2586 .insert(op_id, builtin::claude_sonnet_4_5());
2587 state
2588 .operation_models
2589 .insert(op_id, builtin::claude_sonnet_4_5());
2590
2591 let tool_call = steer_tools::ToolCall {
2592 id: "tc_1".to_string(),
2593 name: "test_tool".to_string(),
2594 parameters: serde_json::json!({}),
2595 };
2596
2597 let effects = reduce(
2598 &mut state,
2599 Action::ToolApprovalRequested {
2600 session_id,
2601 request_id: RequestId::new(),
2602 tool_call,
2603 },
2604 );
2605
2606 assert!(
2607 effects
2608 .iter()
2609 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2610 );
2611 assert!(state.pending_approval.is_none());
2612 }
2613
2614 #[test]
2615 fn test_denied_tool_request_emits_failure_message() {
2616 let mut state = test_state();
2617 let session_id = state.session_id;
2618 let op_id = OpId::new();
2619
2620 state.current_operation = Some(OperationState {
2621 op_id,
2622 kind: OperationKind::AgentLoop,
2623 pending_tool_calls: HashSet::new(),
2624 });
2625
2626 state
2627 .operation_models
2628 .insert(op_id, builtin::claude_sonnet_4_5());
2629
2630 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2631 config.tool_config.approval_policy = ToolApprovalPolicy {
2632 default_behavior: UnapprovedBehavior::Deny,
2633 preapproved: ApprovalRules::default(),
2634 };
2635 state.session_config = Some(config);
2636
2637 let tool_call = steer_tools::ToolCall {
2638 id: "tc_1".to_string(),
2639 name: "test_tool".to_string(),
2640 parameters: serde_json::json!({}),
2641 };
2642
2643 let effects = reduce(
2644 &mut state,
2645 Action::ToolApprovalRequested {
2646 session_id,
2647 request_id: RequestId::new(),
2648 tool_call,
2649 },
2650 );
2651
2652 assert!(effects.iter().any(|e| matches!(
2653 e,
2654 Effect::EmitEvent {
2655 event: SessionEvent::ToolCallFailed { .. },
2656 ..
2657 }
2658 )));
2659 assert!(effects.iter().any(|e| matches!(
2660 e,
2661 Effect::EmitEvent {
2662 event: SessionEvent::ToolMessageAdded { .. },
2663 ..
2664 }
2665 )));
2666 assert!(
2667 !effects
2668 .iter()
2669 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2670 );
2671 assert!(
2672 !effects
2673 .iter()
2674 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2675 );
2676 assert!(state.pending_approval.is_none());
2677 assert!(state.approval_queue.is_empty());
2678 assert_eq!(state.message_graph.messages.len(), 1);
2679
2680 match &state.message_graph.messages[0].data {
2681 MessageData::Tool { result, .. } => match result {
2682 ToolResult::Error(error) => {
2683 assert!(
2684 matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2685 );
2686 }
2687 _ => panic!("expected denied tool error"),
2688 },
2689 _ => panic!("expected tool message"),
2690 }
2691 }
2692
2693 #[test]
2694 fn test_user_denied_tool_request_emits_failure_message() {
2695 let mut state = test_state();
2696 let session_id = state.session_id;
2697 let op_id = OpId::new();
2698
2699 state.current_operation = Some(OperationState {
2700 op_id,
2701 kind: OperationKind::AgentLoop,
2702 pending_tool_calls: HashSet::new(),
2703 });
2704 state
2705 .operation_models
2706 .insert(op_id, builtin::claude_sonnet_4_5());
2707
2708 let tool_call = steer_tools::ToolCall {
2709 id: "tc_1".to_string(),
2710 name: "test_tool".to_string(),
2711 parameters: serde_json::json!({}),
2712 };
2713 let request_id = RequestId::new();
2714 state.pending_approval = Some(PendingApproval {
2715 request_id,
2716 tool_call: tool_call.clone(),
2717 });
2718
2719 let effects = reduce(
2720 &mut state,
2721 Action::ToolApprovalDecided {
2722 session_id,
2723 request_id,
2724 decision: ApprovalDecision::Denied,
2725 remember: None,
2726 },
2727 );
2728
2729 assert!(effects.iter().any(|e| matches!(
2730 e,
2731 Effect::EmitEvent {
2732 event: SessionEvent::ToolCallFailed { .. },
2733 ..
2734 }
2735 )));
2736 assert!(effects.iter().any(|e| matches!(
2737 e,
2738 Effect::EmitEvent {
2739 event: SessionEvent::ToolMessageAdded { .. },
2740 ..
2741 }
2742 )));
2743 assert!(
2744 !effects
2745 .iter()
2746 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2747 );
2748 assert!(state.pending_approval.is_none());
2749 assert!(state.approval_queue.is_empty());
2750 assert_eq!(state.message_graph.messages.len(), 1);
2751
2752 match &state.message_graph.messages[0].data {
2753 MessageData::Tool { result, .. } => match result {
2754 ToolResult::Error(error) => {
2755 assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2756 }
2757 _ => panic!("expected denied tool error"),
2758 },
2759 _ => panic!("expected tool message"),
2760 }
2761 }
2762
2763 #[test]
2764 fn test_cancel_pops_queued_item_without_auto_start() {
2765 let mut state = test_state();
2766 let session_id = state.session_id;
2767 let op_id = OpId::new();
2768
2769 state.current_operation = Some(OperationState {
2770 op_id,
2771 kind: OperationKind::AgentLoop,
2772 pending_tool_calls: HashSet::new(),
2773 });
2774 state
2775 .operation_models
2776 .insert(op_id, builtin::claude_sonnet_4_5());
2777
2778 let queued_op = OpId::new();
2779 let queued_message_id = MessageId::from_string("queued_msg");
2780 let _ = reduce(
2781 &mut state,
2782 Action::UserInput {
2783 session_id,
2784 content: vec![UserContent::Text {
2785 text: "Queued message".to_string(),
2786 }],
2787 op_id: queued_op,
2788 message_id: queued_message_id.clone(),
2789 model: builtin::claude_sonnet_4_5(),
2790 timestamp: 1,
2791 },
2792 );
2793
2794 let effects = reduce(
2795 &mut state,
2796 Action::Cancel {
2797 session_id,
2798 op_id: None,
2799 },
2800 );
2801
2802 assert!(state.current_operation.is_none());
2803 assert!(state.queued_work.is_empty());
2804
2805 let cancellation_info = effects.iter().find_map(|effect| match effect {
2806 Effect::EmitEvent {
2807 event: SessionEvent::OperationCancelled { info, .. },
2808 ..
2809 } => Some(info),
2810 _ => None,
2811 });
2812 let info = cancellation_info.expect("expected OperationCancelled event");
2813 let popped = info
2814 .popped_queued_item
2815 .as_ref()
2816 .expect("expected popped queued item");
2817 assert_eq!(popped.content, "Queued message");
2818 assert_eq!(popped.op_id, queued_op);
2819 assert_eq!(popped.message_id, queued_message_id);
2820
2821 assert!(
2822 !effects.iter().any(|effect| matches!(
2823 effect,
2824 Effect::EmitEvent {
2825 event: SessionEvent::OperationStarted { .. },
2826 ..
2827 }
2828 )),
2829 "queued work should not auto-start on cancel"
2830 );
2831 }
2832
2833 #[test]
2834 fn test_cancel_injects_tool_results_for_pending_calls() {
2835 let mut state = test_state();
2836 let session_id = state.session_id;
2837 let op_id = OpId::new();
2838
2839 let tool_call = ToolCall {
2840 id: "tc_1".to_string(),
2841 name: "test_tool".to_string(),
2842 parameters: serde_json::json!({}),
2843 };
2844
2845 state.message_graph.add_message(Message {
2846 data: MessageData::Assistant {
2847 content: vec![AssistantContent::ToolCall {
2848 tool_call: tool_call.clone(),
2849 thought_signature: None,
2850 }],
2851 },
2852 timestamp: 0,
2853 id: "msg_1".to_string(),
2854 parent_message_id: None,
2855 });
2856
2857 state.current_operation = Some(OperationState {
2858 op_id,
2859 kind: OperationKind::AgentLoop,
2860 pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2861 });
2862 state
2863 .operation_models
2864 .insert(op_id, builtin::claude_sonnet_4_5());
2865
2866 let effects = reduce(
2867 &mut state,
2868 Action::Cancel {
2869 session_id,
2870 op_id: None,
2871 },
2872 );
2873
2874 assert!(effects.iter().any(|e| matches!(
2875 e,
2876 Effect::EmitEvent {
2877 event: SessionEvent::ToolMessageAdded { .. },
2878 ..
2879 }
2880 )));
2881
2882 let tool_message = state
2883 .message_graph
2884 .messages
2885 .iter()
2886 .find(|message| matches!(message.data, MessageData::Tool { .. }))
2887 .expect("tool result should be injected on cancel");
2888
2889 match &tool_message.data {
2890 MessageData::Tool { result, .. } => match result {
2891 ToolResult::Error(error) => {
2892 assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2893 }
2894 _ => panic!("expected cancelled tool error"),
2895 },
2896 _ => panic!("expected tool message"),
2897 }
2898 }
2899
2900 #[test]
2901 fn test_malformed_tool_call_auto_denies() {
2902 let mut state = test_state();
2903 let session_id = state.session_id;
2904 let op_id = OpId::new();
2905
2906 state.current_operation = Some(OperationState {
2907 op_id,
2908 kind: OperationKind::AgentLoop,
2909 pending_tool_calls: HashSet::new(),
2910 });
2911
2912 state
2913 .operation_models
2914 .insert(op_id, builtin::claude_sonnet_4_5());
2915
2916 let mut properties = serde_json::Map::new();
2917 properties.insert("command".to_string(), json!({ "type": "string" }));
2918
2919 state.tools.push(ToolSchema {
2920 name: "test_tool".to_string(),
2921 display_name: "test_tool".to_string(),
2922 description: String::new(),
2923 input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2924 });
2925
2926 let tool_call = ToolCall {
2927 id: "tc_1".to_string(),
2928 name: "test_tool".to_string(),
2929 parameters: json!({}),
2930 };
2931
2932 let effects = reduce(
2933 &mut state,
2934 Action::ToolApprovalRequested {
2935 session_id,
2936 request_id: RequestId::new(),
2937 tool_call,
2938 },
2939 );
2940
2941 assert!(effects.iter().any(|e| matches!(
2942 e,
2943 Effect::EmitEvent {
2944 event: SessionEvent::ToolCallFailed { .. },
2945 ..
2946 }
2947 )));
2948 assert!(effects.iter().any(|e| matches!(
2949 e,
2950 Effect::EmitEvent {
2951 event: SessionEvent::ToolMessageAdded { .. },
2952 ..
2953 }
2954 )));
2955 assert!(
2956 !effects
2957 .iter()
2958 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2959 );
2960 assert!(
2961 !effects
2962 .iter()
2963 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2964 );
2965 assert!(state.pending_approval.is_none());
2966 assert!(state.approval_queue.is_empty());
2967 assert_eq!(state.message_graph.messages.len(), 1);
2968
2969 match &state.message_graph.messages[0].data {
2970 MessageData::Tool { result, .. } => match result {
2971 ToolResult::Error(error) => {
2972 assert!(matches!(error, ToolError::InvalidParams { .. }));
2973 }
2974 _ => panic!("expected invalid params tool error"),
2975 },
2976 _ => panic!("expected tool message"),
2977 }
2978 }
2979
2980 #[test]
2981 fn test_approval_queuing() {
2982 let mut state = test_state();
2983 let session_id = state.session_id;
2984 let op_id = OpId::new();
2985
2986 state.current_operation = Some(OperationState {
2987 op_id,
2988 kind: OperationKind::AgentLoop,
2989 pending_tool_calls: HashSet::new(),
2990 });
2991
2992 let tool_call_1 = steer_tools::ToolCall {
2993 id: "tc_1".to_string(),
2994 name: "tool_1".to_string(),
2995 parameters: serde_json::json!({}),
2996 };
2997 let tool_call_2 = steer_tools::ToolCall {
2998 id: "tc_2".to_string(),
2999 name: "tool_2".to_string(),
3000 parameters: serde_json::json!({}),
3001 };
3002
3003 let _ = reduce(
3004 &mut state,
3005 Action::ToolApprovalRequested {
3006 session_id,
3007 request_id: RequestId::new(),
3008 tool_call: tool_call_1,
3009 },
3010 );
3011
3012 assert!(state.pending_approval.is_some());
3013
3014 let _ = reduce(
3015 &mut state,
3016 Action::ToolApprovalRequested {
3017 session_id,
3018 request_id: RequestId::new(),
3019 tool_call: tool_call_2,
3020 },
3021 );
3022
3023 assert_eq!(state.approval_queue.len(), 1);
3024 }
3025
3026 #[test]
3027 fn test_unknown_tool_is_treated_as_nonexistent_and_never_prompts() {
3028 let mut state = test_state();
3029 let session_id = state.session_id;
3030 let op_id = OpId::new();
3031
3032 state.current_operation = Some(OperationState {
3033 op_id,
3034 kind: OperationKind::AgentLoop,
3035 pending_tool_calls: HashSet::new(),
3036 });
3037
3038 state
3039 .operation_models
3040 .insert(op_id, builtin::claude_sonnet_4_5());
3041
3042 state.tools.push(test_schema("view"));
3043
3044 let tool_call = ToolCall {
3045 id: "tc_unknown".to_string(),
3046 name: "bash".to_string(),
3047 parameters: json!({ "command": "echo hi" }),
3048 };
3049
3050 let effects = reduce(
3051 &mut state,
3052 Action::ToolApprovalRequested {
3053 session_id,
3054 request_id: RequestId::new(),
3055 tool_call,
3056 },
3057 );
3058
3059 assert!(effects.iter().any(|e| matches!(
3060 e,
3061 Effect::EmitEvent {
3062 event: SessionEvent::ToolCallFailed { .. },
3063 ..
3064 }
3065 )));
3066 assert!(effects.iter().any(|e| matches!(
3067 e,
3068 Effect::EmitEvent {
3069 event: SessionEvent::ToolMessageAdded { .. },
3070 ..
3071 }
3072 )));
3073 assert!(
3074 !effects
3075 .iter()
3076 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3077 );
3078 assert!(state.pending_approval.is_none());
3079 assert!(state.approval_queue.is_empty());
3080 assert_eq!(state.message_graph.messages.len(), 1);
3081
3082 match &state.message_graph.messages[0].data {
3083 MessageData::Tool { result, .. } => match result {
3084 ToolResult::Error(error) => {
3085 assert!(matches!(error, ToolError::UnknownTool(name) if name == "bash"));
3086 }
3087 _ => panic!("expected unknown tool error"),
3088 },
3089 _ => panic!("expected tool message"),
3090 }
3091 }
3092
3093 #[test]
3094 fn test_dispatch_agent_missing_target_auto_denies() {
3095 let mut state = test_state();
3096 let session_id = state.session_id;
3097 let op_id = OpId::new();
3098
3099 state.current_operation = Some(OperationState {
3100 op_id,
3101 kind: OperationKind::AgentLoop,
3102 pending_tool_calls: HashSet::new(),
3103 });
3104
3105 state
3106 .operation_models
3107 .insert(op_id, builtin::claude_sonnet_4_5());
3108
3109 let input_schema: InputSchema =
3110 schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
3111 state.tools.push(ToolSchema {
3112 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3113 display_name: "Dispatch Agent".to_string(),
3114 description: String::new(),
3115 input_schema,
3116 });
3117
3118 let tool_call = ToolCall {
3119 id: "tc_dispatch".to_string(),
3120 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3121 parameters: json!({ "prompt": "hello world" }),
3122 };
3123
3124 let effects = reduce(
3125 &mut state,
3126 Action::ToolApprovalRequested {
3127 session_id,
3128 request_id: RequestId::new(),
3129 tool_call,
3130 },
3131 );
3132
3133 assert!(effects.iter().any(|e| matches!(
3134 e,
3135 Effect::EmitEvent {
3136 event: SessionEvent::ToolCallFailed { .. },
3137 ..
3138 }
3139 )));
3140 assert!(effects.iter().any(|e| matches!(
3141 e,
3142 Effect::EmitEvent {
3143 event: SessionEvent::ToolMessageAdded { .. },
3144 ..
3145 }
3146 )));
3147 assert!(
3148 !effects
3149 .iter()
3150 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3151 );
3152 assert!(state.pending_approval.is_none());
3153 assert!(state.approval_queue.is_empty());
3154
3155 match &state.message_graph.messages[0].data {
3156 MessageData::Tool { result, .. } => match result {
3157 ToolResult::Error(error) => {
3158 assert!(matches!(error, ToolError::InvalidParams { .. }));
3159 }
3160 _ => panic!("expected invalid params tool error"),
3161 },
3162 _ => panic!("expected tool message"),
3163 }
3164 }
3165
3166 #[test]
3167 fn test_model_response_with_tool_calls_requests_approval() {
3168 let mut state = test_state();
3169 let session_id = state.session_id;
3170 let op_id = OpId::new();
3171 let message_id = MessageId::new();
3172
3173 state.current_operation = Some(OperationState {
3174 op_id,
3175 kind: OperationKind::AgentLoop,
3176 pending_tool_calls: HashSet::new(),
3177 });
3178 state
3179 .operation_models
3180 .insert(op_id, builtin::claude_sonnet_4_5());
3181
3182 let tool_call = steer_tools::ToolCall {
3183 id: "tc_1".to_string(),
3184 name: "bash".to_string(),
3185 parameters: serde_json::json!({"command": "ls"}),
3186 };
3187
3188 let content = vec![
3189 AssistantContent::Text {
3190 text: "Let me list the files.".to_string(),
3191 },
3192 AssistantContent::ToolCall {
3193 tool_call: tool_call.clone(),
3194 thought_signature: None,
3195 },
3196 ];
3197
3198 let effects = reduce(
3199 &mut state,
3200 Action::ModelResponseComplete {
3201 session_id,
3202 op_id,
3203 message_id,
3204 content,
3205 usage: None,
3206 context_window_tokens: None,
3207 configured_max_output_tokens: None,
3208 timestamp: 12345,
3209 },
3210 );
3211
3212 assert!(state.pending_approval.is_some());
3213 assert!(
3214 effects
3215 .iter()
3216 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3217 );
3218 assert!(state.current_operation.is_some());
3219 }
3220
3221 #[test]
3222 fn test_model_response_no_tools_completes_operation() {
3223 let mut state = test_state();
3224 let session_id = state.session_id;
3225 let op_id = OpId::new();
3226 let message_id = MessageId::new();
3227
3228 state.current_operation = Some(OperationState {
3229 op_id,
3230 kind: OperationKind::AgentLoop,
3231 pending_tool_calls: HashSet::new(),
3232 });
3233 state
3234 .operation_models
3235 .insert(op_id, builtin::claude_sonnet_4_5());
3236
3237 let content = vec![AssistantContent::Text {
3238 text: "Hello! How can I help?".to_string(),
3239 }];
3240
3241 let effects = reduce(
3242 &mut state,
3243 Action::ModelResponseComplete {
3244 session_id,
3245 op_id,
3246 message_id,
3247 content,
3248 usage: None,
3249 context_window_tokens: None,
3250 configured_max_output_tokens: None,
3251 timestamp: 12345,
3252 },
3253 );
3254
3255 assert!(state.current_operation.is_none());
3256 assert!(effects.iter().any(|e| matches!(
3257 e,
3258 Effect::EmitEvent {
3259 event: SessionEvent::OperationCompleted { .. },
3260 ..
3261 }
3262 )));
3263 assert!(!effects.iter().any(|e| matches!(
3264 e,
3265 Effect::EmitEvent {
3266 event: SessionEvent::LlmUsageUpdated { .. },
3267 ..
3268 }
3269 )));
3270 }
3271
3272 #[test]
3273 fn test_model_response_with_usage_emits_usage_event_and_updates_state() {
3274 let mut state = test_state();
3275 let session_id = state.session_id;
3276 let op_id = OpId::new();
3277 let message_id = MessageId::new();
3278 let model = builtin::claude_sonnet_4_5();
3279 let usage = TokenUsage::new(10, 20, 30);
3280
3281 state.current_operation = Some(OperationState {
3282 op_id,
3283 kind: OperationKind::AgentLoop,
3284 pending_tool_calls: HashSet::new(),
3285 });
3286 state.operation_models.insert(op_id, model.clone());
3287
3288 let effects = reduce(
3289 &mut state,
3290 Action::ModelResponseComplete {
3291 session_id,
3292 op_id,
3293 message_id,
3294 content: vec![AssistantContent::Text {
3295 text: "Done".to_string(),
3296 }],
3297 usage: Some(usage),
3298 context_window_tokens: None,
3299 configured_max_output_tokens: None,
3300 timestamp: 12345,
3301 },
3302 );
3303
3304 assert!(effects.iter().any(|e| matches!(
3305 e,
3306 Effect::EmitEvent {
3307 event: SessionEvent::AssistantMessageAdded { .. },
3308 ..
3309 }
3310 )));
3311 assert!(effects.iter().any(|e| matches!(
3312 e,
3313 Effect::EmitEvent {
3314 event: SessionEvent::LlmUsageUpdated {
3315 op_id: usage_op_id,
3316 model: usage_model,
3317 usage: usage_payload,
3318 context_window,
3319 },
3320 ..
3321 } if *usage_op_id == op_id && usage_model == &model && *usage_payload == usage && context_window.is_none()
3322 )));
3323
3324 let snapshot = state
3325 .llm_usage_by_op
3326 .get(&op_id)
3327 .expect("usage snapshot should be recorded");
3328 assert_eq!(snapshot.model, model);
3329 assert_eq!(snapshot.usage, usage);
3330 assert!(snapshot.context_window.is_none());
3331 assert_eq!(state.llm_usage_totals, usage);
3332 }
3333
3334 #[test]
3335 fn test_model_response_with_usage_and_context_window_emits_utilization() {
3336 let mut state = test_state();
3337 let session_id = state.session_id;
3338 let op_id = OpId::new();
3339 let message_id = MessageId::new();
3340 let model = builtin::claude_sonnet_4_5();
3341 let usage = TokenUsage::new(10, 20, 30);
3342 let context_window_tokens = Some(100u32);
3343
3344 state.current_operation = Some(OperationState {
3345 op_id,
3346 kind: OperationKind::AgentLoop,
3347 pending_tool_calls: HashSet::new(),
3348 });
3349 state.operation_models.insert(op_id, model.clone());
3350
3351 let effects = reduce(
3352 &mut state,
3353 Action::ModelResponseComplete {
3354 session_id,
3355 op_id,
3356 message_id,
3357 content: vec![AssistantContent::Text {
3358 text: "Done".to_string(),
3359 }],
3360 usage: Some(usage),
3361 context_window_tokens,
3362 configured_max_output_tokens: None,
3363 timestamp: 12345,
3364 },
3365 );
3366
3367 assert!(effects.iter().any(|e| matches!(
3368 e,
3369 Effect::EmitEvent {
3370 event: SessionEvent::LlmUsageUpdated {
3371 op_id: usage_op_id,
3372 model: usage_model,
3373 usage: usage_payload,
3374 context_window,
3375 },
3376 ..
3377 } if *usage_op_id == op_id
3378 && usage_model == &model
3379 && *usage_payload == usage
3380 && matches!(context_window, Some(ContextWindowUsage {
3381 max_context_tokens: Some(100),
3382 remaining_tokens: Some(70),
3383 utilization_ratio: Some(ratio),
3384 estimated: false,
3385 }) if (*ratio - 0.3).abs() < 1e-9)
3386 )));
3387
3388 let snapshot = state
3389 .llm_usage_by_op
3390 .get(&op_id)
3391 .expect("usage snapshot should be recorded");
3392 assert_eq!(snapshot.model, model);
3393 assert_eq!(snapshot.usage, usage);
3394 assert_eq!(
3395 snapshot.context_window,
3396 Some(ContextWindowUsage {
3397 max_context_tokens: Some(100),
3398 remaining_tokens: Some(70),
3399 utilization_ratio: Some(0.3),
3400 estimated: false,
3401 })
3402 );
3403 assert_eq!(state.llm_usage_totals, usage);
3404 }
3405
3406 #[test]
3407 fn test_model_response_utilization_uses_reserved_context_window() {
3408 let mut state = test_state();
3409 let session_id = state.session_id;
3410 let op_id = OpId::new();
3411 let message_id = MessageId::new();
3412 let model = builtin::claude_sonnet_4_5();
3413 let usage = TokenUsage::new(10, 20, 30);
3414
3415 state.current_operation = Some(OperationState {
3416 op_id,
3417 kind: OperationKind::AgentLoop,
3418 pending_tool_calls: HashSet::new(),
3419 });
3420 state.operation_models.insert(op_id, model.clone());
3421
3422 let effects = reduce(
3423 &mut state,
3424 Action::ModelResponseComplete {
3425 session_id,
3426 op_id,
3427 message_id,
3428 content: vec![AssistantContent::Text {
3429 text: "Done".to_string(),
3430 }],
3431 usage: Some(usage),
3432 context_window_tokens: Some(100),
3433 configured_max_output_tokens: Some(40),
3434 timestamp: 12345,
3435 },
3436 );
3437
3438 assert!(effects.iter().any(|e| matches!(
3439 e,
3440 Effect::EmitEvent {
3441 event: SessionEvent::LlmUsageUpdated {
3442 context_window: Some(ContextWindowUsage {
3443 max_context_tokens: Some(100),
3444 remaining_tokens: Some(30),
3445 utilization_ratio: Some(ratio),
3446 estimated: false,
3447 }),
3448 ..
3449 },
3450 ..
3451 } if (*ratio - 0.5).abs() < 1e-9
3452 )));
3453
3454 let snapshot = state
3455 .llm_usage_by_op
3456 .get(&op_id)
3457 .expect("usage snapshot should be recorded");
3458 assert_eq!(
3459 snapshot.context_window,
3460 Some(ContextWindowUsage {
3461 max_context_tokens: Some(100),
3462 remaining_tokens: Some(30),
3463 utilization_ratio: Some(0.5),
3464 estimated: false,
3465 })
3466 );
3467 }
3468
3469 #[test]
3470 fn test_model_response_with_zero_context_window_marks_estimated() {
3471 let mut state = test_state();
3472 let session_id = state.session_id;
3473 let op_id = OpId::new();
3474 let message_id = MessageId::new();
3475 let model = builtin::claude_sonnet_4_5();
3476 let usage = TokenUsage::new(10, 20, 30);
3477
3478 state.current_operation = Some(OperationState {
3479 op_id,
3480 kind: OperationKind::AgentLoop,
3481 pending_tool_calls: HashSet::new(),
3482 });
3483 state.operation_models.insert(op_id, model);
3484
3485 let effects = reduce(
3486 &mut state,
3487 Action::ModelResponseComplete {
3488 session_id,
3489 op_id,
3490 message_id,
3491 content: vec![AssistantContent::Text {
3492 text: "Done".to_string(),
3493 }],
3494 usage: Some(usage),
3495 context_window_tokens: Some(0),
3496 configured_max_output_tokens: None,
3497 timestamp: 12345,
3498 },
3499 );
3500
3501 assert!(effects.iter().any(|e| matches!(
3502 e,
3503 Effect::EmitEvent {
3504 event: SessionEvent::LlmUsageUpdated {
3505 context_window,
3506 ..
3507 },
3508 ..
3509 } if matches!(context_window, Some(ContextWindowUsage {
3510 max_context_tokens: Some(0),
3511 remaining_tokens: Some(0),
3512 utilization_ratio: None,
3513 estimated: true,
3514 }))
3515 )));
3516 }
3517
3518 #[test]
3519 fn test_apply_usage_event_updates_replay_state_and_totals() {
3520 let mut state = test_state();
3521 let op_a = OpId::new();
3522 let op_b = OpId::new();
3523 let model = builtin::claude_sonnet_4_5();
3524
3525 apply_event_to_state(
3526 &mut state,
3527 &SessionEvent::LlmUsageUpdated {
3528 op_id: op_a,
3529 model: model.clone(),
3530 usage: TokenUsage::new(3, 5, 8),
3531 context_window: None,
3532 },
3533 );
3534
3535 assert_eq!(state.llm_usage_totals, TokenUsage::new(3, 5, 8));
3536
3537 apply_event_to_state(
3538 &mut state,
3539 &SessionEvent::LlmUsageUpdated {
3540 op_id: op_a,
3541 model: model.clone(),
3542 usage: TokenUsage::new(7, 11, 18),
3543 context_window: None,
3544 },
3545 );
3546
3547 assert_eq!(state.llm_usage_totals, TokenUsage::new(7, 11, 18));
3548
3549 apply_event_to_state(
3550 &mut state,
3551 &SessionEvent::LlmUsageUpdated {
3552 op_id: op_b,
3553 model,
3554 usage: TokenUsage::new(2, 4, 6),
3555 context_window: None,
3556 },
3557 );
3558
3559 assert_eq!(state.llm_usage_totals, TokenUsage::new(9, 15, 24));
3560 }
3561
3562 #[test]
3563 fn test_out_of_order_completion_preserves_newer_operation() {
3564 let mut state = test_state();
3565 let session_id = state.session_id;
3566 let model = builtin::claude_sonnet_4_5();
3567
3568 let op_a = OpId::new();
3569 let op_b = OpId::new();
3570
3571 let _ = reduce(
3572 &mut state,
3573 Action::UserInput {
3574 session_id,
3575 content: vec![UserContent::Text {
3576 text: "first".to_string(),
3577 }],
3578 op_id: op_a,
3579 message_id: MessageId::new(),
3580 model: model.clone(),
3581 timestamp: 1,
3582 },
3583 );
3584
3585 let _ = reduce(
3586 &mut state,
3587 Action::UserInput {
3588 session_id,
3589 content: vec![UserContent::Text {
3590 text: "second".to_string(),
3591 }],
3592 op_id: op_b,
3593 message_id: MessageId::new(),
3594 model: model.clone(),
3595 timestamp: 2,
3596 },
3597 );
3598
3599 let _ = reduce(
3600 &mut state,
3601 Action::ModelResponseComplete {
3602 session_id,
3603 op_id: op_a,
3604 message_id: MessageId::new(),
3605 content: vec![AssistantContent::Text {
3606 text: "done A".to_string(),
3607 }],
3608 usage: None,
3609 context_window_tokens: None,
3610 configured_max_output_tokens: None,
3611 timestamp: 3,
3612 },
3613 );
3614
3615 assert!(
3616 state
3617 .current_operation
3618 .as_ref()
3619 .is_some_and(|op| op.op_id == op_b)
3620 );
3621 assert!(state.operation_models.contains_key(&op_b));
3622 assert!(!state.operation_models.contains_key(&op_a));
3623
3624 let effects = reduce(
3625 &mut state,
3626 Action::ModelResponseComplete {
3627 session_id,
3628 op_id: op_b,
3629 message_id: MessageId::new(),
3630 content: vec![AssistantContent::Text {
3631 text: "done B".to_string(),
3632 }],
3633 usage: None,
3634 context_window_tokens: None,
3635 configured_max_output_tokens: None,
3636 timestamp: 4,
3637 },
3638 );
3639
3640 assert!(effects.iter().any(|e| matches!(
3641 e,
3642 Effect::EmitEvent {
3643 event: SessionEvent::OperationCompleted { op_id },
3644 ..
3645 } if *op_id == op_b
3646 )));
3647 assert!(!effects.iter().any(|e| matches!(
3648 e,
3649 Effect::EmitEvent {
3650 event: SessionEvent::Error { message },
3651 ..
3652 } if message.contains("Missing model for operation")
3653 )));
3654 }
3655
3656 #[test]
3657 fn test_tool_approval_does_not_call_model_before_result() {
3658 let mut state = test_state();
3659 let session_id = state.session_id;
3660 let op_id = OpId::new();
3661
3662 state.current_operation = Some(OperationState {
3663 op_id,
3664 kind: OperationKind::AgentLoop,
3665 pending_tool_calls: HashSet::new(),
3666 });
3667 state
3668 .operation_models
3669 .insert(op_id, builtin::claude_sonnet_4_5());
3670
3671 let tool_call = steer_tools::ToolCall {
3672 id: "tc_1".to_string(),
3673 name: "bash".to_string(),
3674 parameters: serde_json::json!({"command": "ls"}),
3675 };
3676 let request_id = RequestId::new();
3677 state.pending_approval = Some(PendingApproval {
3678 request_id,
3679 tool_call: tool_call.clone(),
3680 });
3681
3682 let effects = reduce(
3683 &mut state,
3684 Action::ToolApprovalDecided {
3685 session_id,
3686 request_id,
3687 decision: ApprovalDecision::Approved,
3688 remember: None,
3689 },
3690 );
3691
3692 assert!(
3693 effects
3694 .iter()
3695 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3696 );
3697 assert!(
3698 !effects
3699 .iter()
3700 .any(|e| matches!(e, Effect::CallModel { .. }))
3701 );
3702 assert!(state.current_operation.as_ref().is_some_and(|op| {
3703 op.pending_tool_calls
3704 .contains(&ToolCallId::from_string("tc_1"))
3705 }));
3706 }
3707
3708 #[test]
3709 fn test_mcp_tool_visibility_and_disconnect_removal() {
3710 let mut state = test_state();
3711 let session_id = state.session_id;
3712
3713 let mut allowed = HashSet::new();
3714 allowed.insert("mcp__alpha__allowed".to_string());
3715
3716 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3717 config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3718 state.session_config = Some(config);
3719
3720 state.tools.push(test_schema("bash"));
3721
3722 let _ = reduce(
3723 &mut state,
3724 Action::McpServerStateChanged {
3725 session_id,
3726 server_name: "alpha".to_string(),
3727 state: McpServerState::Connected {
3728 tools: vec![
3729 test_schema("mcp__alpha__allowed"),
3730 test_schema("mcp__alpha__blocked"),
3731 ],
3732 },
3733 },
3734 );
3735
3736 assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3737 assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3738
3739 let _ = reduce(
3740 &mut state,
3741 Action::McpServerStateChanged {
3742 session_id,
3743 server_name: "alpha".to_string(),
3744 state: McpServerState::Disconnected { error: None },
3745 },
3746 );
3747
3748 assert!(
3749 !state
3750 .tools
3751 .iter()
3752 .any(|t| t.name.starts_with("mcp__alpha__"))
3753 );
3754 assert!(state.tools.iter().any(|t| t.name == "bash"));
3755 }
3756
3757 #[test]
3758 fn test_tool_result_continues_agent_loop() {
3759 let mut state = test_state();
3760 let session_id = state.session_id;
3761 let op_id = OpId::new();
3762 let tool_call_id = ToolCallId::from_string("tc_1");
3763
3764 state.current_operation = Some(OperationState {
3765 op_id,
3766 kind: OperationKind::AgentLoop,
3767 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3768 });
3769 state
3770 .operation_models
3771 .insert(op_id, builtin::claude_sonnet_4_5());
3772
3773 let effects = reduce(
3774 &mut state,
3775 Action::ToolResult {
3776 session_id,
3777 tool_call_id,
3778 tool_name: "bash".to_string(),
3779 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3780 tool_name: "bash".to_string(),
3781 payload: "file1.txt\nfile2.txt".to_string(),
3782 })),
3783 },
3784 );
3785
3786 assert!(
3787 effects
3788 .iter()
3789 .any(|e| matches!(e, Effect::CallModel { .. }))
3790 );
3791 }
3792
3793 #[test]
3794 fn test_tool_result_waits_for_pending_tools() {
3795 let mut state = test_state();
3796 let session_id = state.session_id;
3797 let op_id = OpId::new();
3798 let tool_call_id_1 = ToolCallId::from_string("tc_1");
3799 let tool_call_id_2 = ToolCallId::from_string("tc_2");
3800
3801 state.current_operation = Some(OperationState {
3802 op_id,
3803 kind: OperationKind::AgentLoop,
3804 pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3805 .into_iter()
3806 .collect(),
3807 });
3808 state
3809 .operation_models
3810 .insert(op_id, builtin::claude_sonnet_4_5());
3811
3812 let effects = reduce(
3813 &mut state,
3814 Action::ToolResult {
3815 session_id,
3816 tool_call_id: tool_call_id_1,
3817 tool_name: "bash".to_string(),
3818 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3819 tool_name: "bash".to_string(),
3820 payload: "done".to_string(),
3821 })),
3822 },
3823 );
3824
3825 assert!(
3826 !effects
3827 .iter()
3828 .any(|e| matches!(e, Effect::CallModel { .. }))
3829 );
3830
3831 let effects = reduce(
3832 &mut state,
3833 Action::ToolResult {
3834 session_id,
3835 tool_call_id: tool_call_id_2,
3836 tool_name: "bash".to_string(),
3837 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3838 tool_name: "bash".to_string(),
3839 payload: "done".to_string(),
3840 })),
3841 },
3842 );
3843
3844 assert!(
3845 effects
3846 .iter()
3847 .any(|e| matches!(e, Effect::CallModel { .. }))
3848 );
3849 }
3850
3851 fn setup_auto_compact_state(
3854 enabled: bool,
3855 threshold_percent: u32,
3856 message_count: usize,
3857 ) -> AppState {
3858 use crate::session::state::AutoCompactionConfig;
3859
3860 let mut state = test_state();
3861
3862 let mut config = base_session_config();
3864 config.auto_compaction = AutoCompactionConfig {
3865 enabled,
3866 threshold_percent,
3867 };
3868 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
3869
3870 let mut prev_id: Option<String> = None;
3873 for i in 0..message_count {
3874 let id = format!("msg_{}", i);
3875 state.message_graph.add_message(Message {
3876 data: MessageData::User {
3877 content: vec![UserContent::Text {
3878 text: format!("message {}", i),
3879 }],
3880 },
3881 timestamp: i as u64,
3882 id: id.clone(),
3883 parent_message_id: prev_id.clone(),
3884 });
3885 prev_id = Some(id);
3886 }
3887
3888 state
3889 }
3890
3891 #[test]
3892 fn test_maybe_auto_compact_triggers_when_all_guards_pass() {
3893 let mut state = setup_auto_compact_state(true, 90, 4);
3894 let session_id = state.session_id;
3895 let model = builtin::claude_sonnet_4_5();
3896
3897 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3898 let effects = maybe_auto_compact(
3899 &mut state,
3900 session_id,
3901 usage,
3902 Some(100_000),
3903 Some(10_000),
3904 &model,
3905 );
3906
3907 assert!(!effects.is_empty(), "expected non-empty effects");
3908 assert!(
3909 effects
3910 .iter()
3911 .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3912 "expected Effect::RequestCompaction"
3913 );
3914 assert!(
3915 effects.iter().any(|e| matches!(
3916 e,
3917 Effect::EmitEvent {
3918 event: SessionEvent::OperationStarted { .. },
3919 ..
3920 }
3921 )),
3922 "expected OperationStarted event"
3923 );
3924 }
3925
3926 #[test]
3927 fn test_maybe_auto_compact_disabled_config() {
3928 let mut state = setup_auto_compact_state(false, 90, 4);
3929 let session_id = state.session_id;
3930 let model = builtin::claude_sonnet_4_5();
3931
3932 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3933 let effects = maybe_auto_compact(
3934 &mut state,
3935 session_id,
3936 usage,
3937 Some(100_000),
3938 Some(10_000),
3939 &model,
3940 );
3941
3942 assert!(
3943 effects.is_empty(),
3944 "expected empty effects when auto-compaction is disabled"
3945 );
3946 }
3947
3948 #[test]
3949 fn test_maybe_auto_compact_below_threshold() {
3950 let mut state = setup_auto_compact_state(true, 90, 4);
3951 let session_id = state.session_id;
3952 let model = builtin::claude_sonnet_4_5();
3953
3954 let usage = Some(TokenUsage::new(40_000, 10_000, 50_000));
3956 let effects = maybe_auto_compact(
3957 &mut state,
3958 session_id,
3959 usage,
3960 Some(100_000),
3961 Some(10_000),
3962 &model,
3963 );
3964
3965 assert!(
3966 effects.is_empty(),
3967 "expected empty effects when utilization is below threshold"
3968 );
3969 }
3970
3971 #[test]
3972 fn test_maybe_auto_compact_insufficient_messages() {
3973 let mut state = setup_auto_compact_state(true, 90, 2);
3975 let session_id = state.session_id;
3976 let model = builtin::claude_sonnet_4_5();
3977
3978 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3979 let effects = maybe_auto_compact(
3980 &mut state,
3981 session_id,
3982 usage,
3983 Some(100_000),
3984 Some(10_000),
3985 &model,
3986 );
3987
3988 assert!(
3989 effects.is_empty(),
3990 "expected empty effects with insufficient messages"
3991 );
3992 }
3993
3994 #[test]
3995 fn test_maybe_auto_compact_queued_work_blocks() {
3996 use crate::app::domain::state::QueuedUserMessage;
3997
3998 let mut state = setup_auto_compact_state(true, 90, 4);
3999 let session_id = state.session_id;
4000 let model = builtin::claude_sonnet_4_5();
4001
4002 state.queue_user_message(QueuedUserMessage {
4004 op_id: OpId::new(),
4005 message_id: MessageId::new(),
4006 content: vec![UserContent::Text {
4007 text: "queued msg".to_string(),
4008 }],
4009 model: builtin::claude_sonnet_4_5(),
4010 queued_at: 0,
4011 });
4012
4013 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
4014 let effects = maybe_auto_compact(
4015 &mut state,
4016 session_id,
4017 usage,
4018 Some(100_000),
4019 Some(10_000),
4020 &model,
4021 );
4022
4023 assert!(
4024 effects.is_empty(),
4025 "expected empty effects when there is queued work"
4026 );
4027 }
4028
4029 #[test]
4030 fn test_handle_compaction_failed_emits_compact_result() {
4031 use crate::app::domain::event::{CompactResult, CompactTrigger};
4032
4033 let mut state = test_state();
4034 let session_id = state.session_id;
4035 let op_id = OpId::new();
4036
4037 state.start_operation(
4039 op_id,
4040 OperationKind::Compact {
4041 trigger: CompactTrigger::Auto,
4042 },
4043 );
4044 state
4045 .operation_models
4046 .insert(op_id, builtin::claude_sonnet_4_5());
4047
4048 let effects = reduce(
4049 &mut state,
4050 Action::CompactionFailed {
4051 session_id,
4052 op_id,
4053 error: "test error".into(),
4054 },
4055 );
4056
4057 let has_compact_result = effects.iter().any(|e| {
4059 matches!(
4060 e,
4061 Effect::EmitEvent {
4062 event: SessionEvent::CompactResult {
4063 result: CompactResult::Failed(msg),
4064 trigger: CompactTrigger::Auto,
4065 },
4066 ..
4067 } if msg == "test error"
4068 )
4069 });
4070 assert!(
4071 has_compact_result,
4072 "expected CompactResult::Failed event with Auto trigger"
4073 );
4074
4075 let has_error_event = effects.iter().any(|e| {
4077 matches!(
4078 e,
4079 Effect::EmitEvent {
4080 event: SessionEvent::Error { .. },
4081 ..
4082 }
4083 )
4084 });
4085 assert!(
4086 !has_error_event,
4087 "should not emit SessionEvent::Error for compaction failure"
4088 );
4089 }
4090
4091 fn extract_callmodel_messages(effects: &[Effect]) -> Option<&Vec<Message>> {
4093 effects.iter().find_map(|e| match e {
4094 Effect::CallModel { messages, .. } => Some(messages),
4095 _ => None,
4096 })
4097 }
4098
4099 fn messages_contain_text(messages: &[Message], needle: &str) -> bool {
4101 messages.iter().any(|m| match &m.data {
4102 MessageData::User { content } => content.iter().any(|c| match c {
4103 UserContent::Text { text } => text.contains(needle),
4104 _ => false,
4105 }),
4106 MessageData::Assistant { content } => content.iter().any(|c| match c {
4107 AssistantContent::Text { text } => text.contains(needle),
4108 _ => false,
4109 }),
4110 _ => false,
4111 })
4112 }
4113
4114 #[test]
4115 fn test_auto_compaction_completion_triggers_continuation() {
4116 use crate::app::domain::event::{CompactResult, CompactTrigger};
4117 use crate::app::domain::types::CompactionId;
4118
4119 let mut state = setup_auto_compact_state(true, 90, 4);
4120 let session_id = state.session_id;
4121 let model = builtin::claude_sonnet_4_5();
4122
4123 let auto_effects = maybe_auto_compact(
4124 &mut state,
4125 session_id,
4126 Some(TokenUsage::new(80_000, 15_000, 95_000)),
4127 Some(100_000),
4128 Some(10_000),
4129 &model,
4130 );
4131
4132 let op_id = auto_effects
4133 .iter()
4134 .find_map(|effect| match effect {
4135 Effect::RequestCompaction { op_id, .. } => Some(*op_id),
4136 _ => None,
4137 })
4138 .expect("expected auto compaction request");
4139
4140 let summary_message_id = MessageId::new();
4141 let previous_active_message_id = state
4142 .message_graph
4143 .active_message_id
4144 .clone()
4145 .map(MessageId::from_string);
4146 let head_before = previous_active_message_id
4147 .clone()
4148 .expect("expected active message before compaction");
4149
4150 let completion_effects = reduce(
4151 &mut state,
4152 Action::CompactionComplete {
4153 session_id,
4154 op_id,
4155 compaction_id: CompactionId::new(),
4156 summary_message_id: summary_message_id.clone(),
4157 summary: "Auto summary".to_string(),
4158 compacted_head_message_id: head_before,
4159 previous_active_message_id,
4160 model: model.id.clone(),
4161 timestamp: 42,
4162 },
4163 );
4164
4165 assert!(
4166 completion_effects.iter().any(|effect| matches!(
4167 effect,
4168 Effect::EmitEvent {
4169 event: SessionEvent::CompactResult {
4170 result: CompactResult::Success(summary),
4171 trigger: CompactTrigger::Auto,
4172 },
4173 ..
4174 } if summary == "Auto summary"
4175 )),
4176 "expected auto compaction result event"
4177 );
4178
4179 assert!(
4180 completion_effects
4181 .iter()
4182 .any(|effect| matches!(effect, Effect::CallModel { .. })),
4183 "expected auto compaction completion to continue with a model call"
4184 );
4185
4186 let messages = extract_callmodel_messages(&completion_effects)
4187 .expect("expected continuation CallModel after auto compaction");
4188
4189 assert!(
4190 matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Auto summary"))),
4191 "first continuation message should be compaction summary"
4192 );
4193 assert!(
4194 matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == COMPACTION_CONTINUE_PROMPT))),
4195 "second continuation message should be continuation prompt"
4196 );
4197 }
4198
4199 #[test]
4200 fn test_compaction_full_cycle_callmodel_filters_messages() {
4201 use crate::app::domain::types::CompactionId;
4202
4203 let mut state = test_state();
4204 let session_id = state.session_id;
4205 let model = builtin::claude_sonnet_4_5();
4206
4207 let op_id_1 = OpId::new();
4210 let _ = reduce(
4211 &mut state,
4212 Action::UserInput {
4213 session_id,
4214 content: vec![UserContent::Text {
4215 text: "hello".to_string(),
4216 }],
4217 op_id: op_id_1,
4218 message_id: MessageId::new(),
4219 model: model.clone(),
4220 timestamp: 1,
4221 },
4222 );
4223 let _ = reduce(
4224 &mut state,
4225 Action::ModelResponseComplete {
4226 session_id,
4227 op_id: op_id_1,
4228 message_id: MessageId::new(),
4229 content: vec![AssistantContent::Text {
4230 text: "hi there".to_string(),
4231 }],
4232 usage: None,
4233 context_window_tokens: None,
4234 configured_max_output_tokens: None,
4235 timestamp: 2,
4236 },
4237 );
4238
4239 let op_id_1b = OpId::new();
4241 let _ = reduce(
4242 &mut state,
4243 Action::UserInput {
4244 session_id,
4245 content: vec![UserContent::Text {
4246 text: "how are you".to_string(),
4247 }],
4248 op_id: op_id_1b,
4249 message_id: MessageId::new(),
4250 model: model.clone(),
4251 timestamp: 3,
4252 },
4253 );
4254 let assistant_msg_id = MessageId::new();
4255 let _ = reduce(
4256 &mut state,
4257 Action::ModelResponseComplete {
4258 session_id,
4259 op_id: op_id_1b,
4260 message_id: assistant_msg_id.clone(),
4261 content: vec![AssistantContent::Text {
4262 text: "doing well".to_string(),
4263 }],
4264 usage: None,
4265 context_window_tokens: None,
4266 configured_max_output_tokens: None,
4267 timestamp: 4,
4268 },
4269 );
4270 assert!(
4271 state.current_operation.is_none(),
4272 "operation should be complete after text-only response"
4273 );
4274 assert!(state.message_graph.messages.len() >= 3);
4275
4276 let op_id_2 = OpId::new();
4278 let compact_effects = reduce(
4279 &mut state,
4280 Action::RequestCompaction {
4281 session_id,
4282 op_id: op_id_2,
4283 model: model.clone(),
4284 },
4285 );
4286 assert!(
4287 compact_effects
4288 .iter()
4289 .any(|e| matches!(e, Effect::RequestCompaction { .. })),
4290 "expected Effect::RequestCompaction"
4291 );
4292
4293 let compaction_id = CompactionId::new();
4295 let summary_message_id = MessageId::new();
4296 let active_before = state.message_graph.active_message_id.clone();
4297 let completion_effects = reduce(
4298 &mut state,
4299 Action::CompactionComplete {
4300 session_id,
4301 op_id: op_id_2,
4302 compaction_id,
4303 summary_message_id: summary_message_id.clone(),
4304 summary: "Summary of conversation.".to_string(),
4305 compacted_head_message_id: assistant_msg_id,
4306 previous_active_message_id: active_before.map(MessageId::from_string),
4307 model: "claude-sonnet-4-5-20250929".to_string(),
4308 timestamp: 3,
4309 },
4310 );
4311 assert!(
4312 state.compaction_summary_ids.contains(&summary_message_id.0),
4313 "compaction_summary_ids should contain the summary id"
4314 );
4315 assert!(
4316 !completion_effects
4317 .iter()
4318 .any(|e| matches!(e, Effect::CallModel { .. })),
4319 "manual compaction should not trigger continuation"
4320 );
4321
4322 let compact_result_trigger = completion_effects.iter().find_map(|effect| match effect {
4323 Effect::EmitEvent {
4324 event: SessionEvent::CompactResult { trigger, .. },
4325 ..
4326 } => Some(*trigger),
4327 _ => None,
4328 });
4329 assert_eq!(compact_result_trigger, Some(CompactTrigger::Manual));
4330
4331 let op_id_3 = OpId::new();
4333 let effects = reduce(
4334 &mut state,
4335 Action::UserInput {
4336 session_id,
4337 content: vec![UserContent::Text {
4338 text: "new question".to_string(),
4339 }],
4340 op_id: op_id_3,
4341 message_id: MessageId::new(),
4342 model: model.clone(),
4343 timestamp: 10,
4344 },
4345 );
4346
4347 let messages = extract_callmodel_messages(&effects)
4349 .expect("expected Effect::CallModel from UserInput after compaction");
4350
4351 assert_eq!(
4352 messages.len(),
4353 2,
4354 "CallModel should contain exactly 2 messages (summary + new user msg), got: {:?}",
4355 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4356 );
4357
4358 assert!(
4360 matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Summary of conversation."))),
4361 "messages[0] should be the compaction summary"
4362 );
4363
4364 assert!(
4366 matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == "new question"))),
4367 "messages[1] should be the new user message"
4368 );
4369
4370 assert!(
4372 !messages_contain_text(messages, "hello"),
4373 "CallModel messages must not contain pre-compaction user text 'hello'"
4374 );
4375 assert!(
4376 !messages_contain_text(messages, "hi there"),
4377 "CallModel messages must not contain pre-compaction assistant text 'hi there'"
4378 );
4379 assert!(
4380 !messages_contain_text(messages, "how are you"),
4381 "CallModel messages must not contain pre-compaction user text 'how are you'"
4382 );
4383 assert!(
4384 !messages_contain_text(messages, "doing well"),
4385 "CallModel messages must not contain pre-compaction assistant text 'doing well'"
4386 );
4387 }
4388
4389 #[test]
4390 fn test_compaction_queued_message_callmodel_filters_messages() {
4391 use crate::app::domain::types::CompactionId;
4392
4393 let mut state = test_state();
4394 let session_id = state.session_id;
4395 let model = builtin::claude_sonnet_4_5();
4396
4397 let op_id_1 = OpId::new();
4400 let _ = reduce(
4401 &mut state,
4402 Action::UserInput {
4403 session_id,
4404 content: vec![UserContent::Text {
4405 text: "msg A".to_string(),
4406 }],
4407 op_id: op_id_1,
4408 message_id: MessageId::new(),
4409 model: model.clone(),
4410 timestamp: 1,
4411 },
4412 );
4413 let _ = reduce(
4414 &mut state,
4415 Action::ModelResponseComplete {
4416 session_id,
4417 op_id: op_id_1,
4418 message_id: MessageId::new(),
4419 content: vec![AssistantContent::Text {
4420 text: "response A".to_string(),
4421 }],
4422 usage: None,
4423 context_window_tokens: None,
4424 configured_max_output_tokens: None,
4425 timestamp: 2,
4426 },
4427 );
4428
4429 let op_id_1b = OpId::new();
4431 let _ = reduce(
4432 &mut state,
4433 Action::UserInput {
4434 session_id,
4435 content: vec![UserContent::Text {
4436 text: "msg A2".to_string(),
4437 }],
4438 op_id: op_id_1b,
4439 message_id: MessageId::new(),
4440 model: model.clone(),
4441 timestamp: 3,
4442 },
4443 );
4444 let assistant_msg_id = MessageId::new();
4445 let _ = reduce(
4446 &mut state,
4447 Action::ModelResponseComplete {
4448 session_id,
4449 op_id: op_id_1b,
4450 message_id: assistant_msg_id.clone(),
4451 content: vec![AssistantContent::Text {
4452 text: "response A2".to_string(),
4453 }],
4454 usage: None,
4455 context_window_tokens: None,
4456 configured_max_output_tokens: None,
4457 timestamp: 4,
4458 },
4459 );
4460 assert!(state.current_operation.is_none());
4461
4462 let op_id_2 = OpId::new();
4464 let _ = reduce(
4465 &mut state,
4466 Action::RequestCompaction {
4467 session_id,
4468 op_id: op_id_2,
4469 model: model.clone(),
4470 },
4471 );
4472 assert!(
4473 state.has_active_operation(),
4474 "Compact operation should be active"
4475 );
4476
4477 let op_id_3 = OpId::new();
4479 let queued_effects = reduce(
4480 &mut state,
4481 Action::UserInput {
4482 session_id,
4483 content: vec![UserContent::Text {
4484 text: "msg B".to_string(),
4485 }],
4486 op_id: op_id_3,
4487 message_id: MessageId::new(),
4488 model: model.clone(),
4489 timestamp: 5,
4490 },
4491 );
4492 assert!(
4493 queued_effects.iter().any(|e| matches!(
4494 e,
4495 Effect::EmitEvent {
4496 event: SessionEvent::QueueUpdated { .. },
4497 ..
4498 }
4499 )),
4500 "queued user message should emit QueueUpdated"
4501 );
4502
4503 let compaction_id = CompactionId::new();
4505 let summary_message_id = MessageId::new();
4506 let active_before = state.message_graph.active_message_id.clone();
4507 let completion_effects = reduce(
4508 &mut state,
4509 Action::CompactionComplete {
4510 session_id,
4511 op_id: op_id_2,
4512 compaction_id,
4513 summary_message_id: summary_message_id.clone(),
4514 summary: "Summary of A.".to_string(),
4515 compacted_head_message_id: assistant_msg_id,
4516 previous_active_message_id: active_before.map(MessageId::from_string),
4517 model: "claude-sonnet-4-5-20250929".to_string(),
4518 timestamp: 6,
4519 },
4520 );
4521
4522 let messages = extract_callmodel_messages(&completion_effects)
4524 .expect("CompactionComplete should dequeue 'msg B' and produce CallModel");
4525
4526 assert_eq!(
4527 messages.len(),
4528 2,
4529 "CallModel should contain exactly 2 messages (summary + 'msg B'), got: {:?}",
4530 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4531 );
4532
4533 assert!(
4535 messages_contain_text(messages, "Summary of A."),
4536 "CallModel messages should contain the compaction summary"
4537 );
4538
4539 assert!(
4541 messages_contain_text(messages, "msg B"),
4542 "CallModel messages should contain the dequeued 'msg B'"
4543 );
4544
4545 assert!(
4547 !messages_contain_text(messages, "response A"),
4548 "CallModel messages must not contain pre-compaction 'response A'"
4549 );
4550 assert!(
4551 !messages_contain_text(messages, "response A2"),
4552 "CallModel messages must not contain pre-compaction 'response A2'"
4553 );
4554 }
4555
4556 #[test]
4557 fn test_compaction_multi_round_conversation_then_compact() {
4558 use crate::app::domain::types::CompactionId;
4559
4560 let mut state = test_state();
4561 let session_id = state.session_id;
4562 let model = builtin::claude_sonnet_4_5();
4563
4564 let pre_compaction_texts = [
4566 ("alpha user", "alpha assistant"),
4567 ("beta user", "beta assistant"),
4568 ("gamma user", "gamma assistant"),
4569 ];
4570 let mut last_assistant_msg_id = MessageId::new();
4571 for (i, (user_text, assistant_text)) in pre_compaction_texts.iter().enumerate() {
4572 let op = OpId::new();
4573 let _ = reduce(
4574 &mut state,
4575 Action::UserInput {
4576 session_id,
4577 content: vec![UserContent::Text {
4578 text: (*user_text).to_string(),
4579 }],
4580 op_id: op,
4581 message_id: MessageId::new(),
4582 model: model.clone(),
4583 timestamp: (i * 2 + 1) as u64,
4584 },
4585 );
4586 last_assistant_msg_id = MessageId::new();
4587 let _ = reduce(
4588 &mut state,
4589 Action::ModelResponseComplete {
4590 session_id,
4591 op_id: op,
4592 message_id: last_assistant_msg_id.clone(),
4593 content: vec![AssistantContent::Text {
4594 text: assistant_text.to_string(),
4595 }],
4596 usage: None,
4597 context_window_tokens: None,
4598 configured_max_output_tokens: None,
4599 timestamp: (i * 2 + 2) as u64,
4600 },
4601 );
4602 }
4603 assert_eq!(state.message_graph.messages.len(), 6);
4604 assert!(state.current_operation.is_none());
4605
4606 let op_compact = OpId::new();
4608 let _ = reduce(
4609 &mut state,
4610 Action::RequestCompaction {
4611 session_id,
4612 op_id: op_compact,
4613 model: model.clone(),
4614 },
4615 );
4616
4617 let compaction_id = CompactionId::new();
4618 let summary_message_id = MessageId::new();
4619 let active_before = state.message_graph.active_message_id.clone();
4620 let completion_effects = reduce(
4621 &mut state,
4622 Action::CompactionComplete {
4623 session_id,
4624 op_id: op_compact,
4625 compaction_id,
4626 summary_message_id: summary_message_id.clone(),
4627 summary: "Summary of greek turns.".to_string(),
4628 compacted_head_message_id: last_assistant_msg_id,
4629 previous_active_message_id: active_before.map(MessageId::from_string),
4630 model: "claude-sonnet-4-5-20250929".to_string(),
4631 timestamp: 100,
4632 },
4633 );
4634
4635 assert!(
4637 !completion_effects
4638 .iter()
4639 .any(|effect| matches!(effect, Effect::CallModel { .. })),
4640 "manual compaction should not trigger continuation"
4641 );
4642
4643 let compact_result_trigger = completion_effects.iter().find_map(|effect| match effect {
4644 Effect::EmitEvent {
4645 event: SessionEvent::CompactResult { trigger, .. },
4646 ..
4647 } => Some(*trigger),
4648 _ => None,
4649 });
4650 assert_eq!(compact_result_trigger, Some(CompactTrigger::Manual));
4651
4652 let post_compaction_texts = [
4654 ("delta user", "delta assistant"),
4655 ("epsilon user", "epsilon assistant"),
4656 ];
4657 for (i, (user_text, assistant_text)) in post_compaction_texts.iter().enumerate() {
4658 let op = OpId::new();
4659 let _ = reduce(
4660 &mut state,
4661 Action::UserInput {
4662 session_id,
4663 content: vec![UserContent::Text {
4664 text: (*user_text).to_string(),
4665 }],
4666 op_id: op,
4667 message_id: MessageId::new(),
4668 model: model.clone(),
4669 timestamp: (101 + i * 2) as u64,
4670 },
4671 );
4672 let _ = reduce(
4673 &mut state,
4674 Action::ModelResponseComplete {
4675 session_id,
4676 op_id: op,
4677 message_id: MessageId::new(),
4678 content: vec![AssistantContent::Text {
4679 text: assistant_text.to_string(),
4680 }],
4681 usage: None,
4682 context_window_tokens: None,
4683 configured_max_output_tokens: None,
4684 timestamp: (102 + i * 2) as u64,
4685 },
4686 );
4687 }
4688
4689 let final_op = OpId::new();
4691 let effects = reduce(
4692 &mut state,
4693 Action::UserInput {
4694 session_id,
4695 content: vec![UserContent::Text {
4696 text: "final question".to_string(),
4697 }],
4698 op_id: final_op,
4699 message_id: MessageId::new(),
4700 model: model.clone(),
4701 timestamp: 200,
4702 },
4703 );
4704
4705 let messages =
4706 extract_callmodel_messages(&effects).expect("expected CallModel from final UserInput");
4707
4708 assert_eq!(
4710 messages.len(),
4711 6,
4712 "CallModel should have 6 messages (summary + 4 post-compaction + final), got: {:?}",
4713 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4714 );
4715
4716 assert!(
4718 messages_contain_text(&messages[..1], "Summary of greek turns."),
4719 "first message should be the compaction summary"
4720 );
4721
4722 assert!(
4724 messages_contain_text(&messages[5..], "final question"),
4725 "last message should be the final user question"
4726 );
4727
4728 assert!(
4730 messages_contain_text(messages, "delta user"),
4731 "should contain post-compaction user text"
4732 );
4733 assert!(
4734 messages_contain_text(messages, "epsilon assistant"),
4735 "should contain post-compaction assistant text"
4736 );
4737
4738 for (user_text, assistant_text) in &pre_compaction_texts {
4740 assert!(
4741 !messages_contain_text(messages, user_text),
4742 "CallModel messages must not contain pre-compaction text '{user_text}'"
4743 );
4744 assert!(
4745 !messages_contain_text(messages, assistant_text),
4746 "CallModel messages must not contain pre-compaction text '{assistant_text}'"
4747 );
4748 }
4749 }
4750}