1use crate::agents::default_agent_spec_id;
2use crate::api::provider::TokenUsage;
3use crate::app::conversation::{AssistantContent, Message, MessageData, UserContent};
4
5use crate::app::domain::action::{Action, ApprovalDecision, ApprovalMemory, McpServerState};
6
7use crate::app::domain::effect::{Effect, McpServerConfig};
8use crate::app::domain::event::{
9 CancellationInfo, ContextWindowUsage, QueuedWorkItemSnapshot, QueuedWorkKind, SessionEvent,
10};
11use crate::app::domain::state::{
12 AppState, OperationKind, PendingApproval, QueuedApproval, QueuedWorkItem,
13};
14use crate::primary_agents::{
15 default_primary_agent_id, primary_agent_spec, resolve_effective_config,
16};
17use crate::session::state::{BackendConfig, ToolDecision};
18
19use crate::app::domain::event::CompactTrigger;
20use crate::tools::{DISPATCH_AGENT_TOOL_NAME, DispatchAgentParams, DispatchAgentTarget};
21use serde_json::Value;
22use steer_tools::ToolError;
23use steer_tools::result::ToolResult;
24use steer_tools::tools::BASH_TOOL_NAME;
25use thiserror::Error;
26
27const MIN_MESSAGES_FOR_COMPACT: usize = 3;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum InvalidActionKind {
31 OperationInFlight,
32 MissingSessionConfig,
33 UnknownPrimaryAgent,
34 QueueEmpty,
35}
36
37#[derive(Debug, Error)]
38pub enum ReduceError {
39 #[error("{message}")]
40 InvalidAction {
41 message: String,
42 kind: InvalidActionKind,
43 },
44 #[error("Invariant violated: {message}")]
45 Invariant { message: String },
46}
47
48fn invalid_action(kind: InvalidActionKind, message: impl Into<String>) -> ReduceError {
49 ReduceError::InvalidAction {
50 message: message.into(),
51 kind,
52 }
53}
54
55fn derive_context_window_usage(
56 total_tokens: u32,
57 context_window_tokens: Option<u32>,
58) -> Option<ContextWindowUsage> {
59 context_window_tokens.map(|max_context_tokens| {
60 let remaining_tokens = max_context_tokens.saturating_sub(total_tokens);
61 let (utilization_ratio, estimated) = if max_context_tokens > 0 {
62 let ratio = (f64::from(total_tokens) / f64::from(max_context_tokens)).clamp(0.0, 1.0);
63 (Some(ratio), false)
64 } else {
65 (None, true)
66 };
67
68 ContextWindowUsage {
69 max_context_tokens: Some(max_context_tokens),
70 remaining_tokens: Some(remaining_tokens),
71 utilization_ratio,
72 estimated,
73 }
74 })
75}
76
77pub fn reduce(state: &mut AppState, action: Action) -> Result<Vec<Effect>, ReduceError> {
78 match action {
79 Action::UserInput {
80 session_id,
81 content,
82 op_id,
83 message_id,
84 model,
85 timestamp,
86 } => Ok(handle_user_input(
87 state, session_id, content, op_id, message_id, model, timestamp,
88 )),
89
90 Action::UserEditedMessage {
91 session_id,
92 message_id,
93 new_content,
94 op_id,
95 new_message_id,
96 model,
97 timestamp,
98 } => handle_user_edited_message(
99 state,
100 session_id,
101 UserEditedMessageParams {
102 original_message_id: message_id,
103 new_content,
104 op_id,
105 new_message_id,
106 model,
107 timestamp,
108 },
109 ),
110
111 Action::ToolApprovalRequested {
112 session_id,
113 request_id,
114 tool_call,
115 } => Ok(handle_tool_approval_requested(
116 state, session_id, request_id, tool_call,
117 )),
118
119 Action::ToolApprovalDecided {
120 session_id,
121 request_id,
122 decision,
123 remember,
124 } => Ok(handle_tool_approval_decided(
125 state, session_id, request_id, decision, remember,
126 )),
127
128 Action::ToolExecutionStarted {
129 session_id,
130 tool_call_id,
131 tool_name,
132 tool_parameters,
133 } => Ok(handle_tool_execution_started(
134 state,
135 session_id,
136 tool_call_id,
137 tool_name,
138 tool_parameters,
139 )),
140
141 Action::ToolResult {
142 session_id,
143 tool_call_id,
144 tool_name,
145 result,
146 } => Ok(handle_tool_result(
147 state,
148 session_id,
149 tool_call_id,
150 tool_name,
151 result,
152 )),
153
154 Action::ModelResponseComplete {
155 session_id,
156 op_id,
157 message_id,
158 content,
159 usage,
160 context_window_tokens,
161 timestamp,
162 } => Ok(handle_model_response_complete(
163 state,
164 session_id,
165 ModelResponseCompleteParams {
166 op_id,
167 message_id,
168 content,
169 usage,
170 context_window_tokens,
171 timestamp,
172 },
173 )),
174
175 Action::ModelResponseError {
176 session_id,
177 op_id,
178 error,
179 } => Ok(handle_model_response_error(
180 state, session_id, op_id, &error,
181 )),
182
183 Action::Cancel { session_id, op_id } => Ok(handle_cancel(state, session_id, op_id)),
184
185 Action::DirectBashCommand {
186 session_id,
187 op_id,
188 message_id,
189 command,
190 timestamp,
191 } => Ok(handle_direct_bash(
192 state, session_id, op_id, message_id, command, timestamp,
193 )),
194
195 Action::DequeueQueuedItem { session_id } => handle_dequeue_queued_item(state, session_id),
196
197 Action::DrainQueuedWork { session_id } => Ok(maybe_start_queued_work(state, session_id)),
198
199 Action::RequestCompaction {
200 session_id,
201 op_id,
202 model,
203 } => handle_request_compaction(state, session_id, op_id, model),
204
205 Action::Hydrate {
206 session_id,
207 events,
208 starting_sequence,
209 } => Ok(handle_hydrate(state, session_id, events, starting_sequence)),
210
211 Action::WorkspaceFilesListed { files, .. } => {
212 state.workspace_files = files;
213 Ok(vec![])
214 }
215
216 Action::ToolSchemasAvailable { tools, .. } => {
217 state.tools = tools;
218 Ok(vec![])
219 }
220
221 Action::ToolSchemasUpdated { schemas, .. } => {
222 state.tools = schemas;
223 Ok(vec![])
224 }
225
226 Action::SwitchPrimaryAgent {
227 session_id,
228 agent_id,
229 } => handle_switch_primary_agent(state, session_id, agent_id),
230
231 Action::McpServerStateChanged {
232 session_id,
233 server_name,
234 state: new_state,
235 } => {
236 if let McpServerState::Connected { tools } = &new_state {
238 let tools = state.session_config.as_ref().map_or_else(
239 || tools.clone(),
240 |config| config.filter_tools_by_visibility(tools.clone()),
241 );
242
243 for tool in tools {
245 if !state.tools.iter().any(|t| t.name == tool.name) {
246 state.tools.push(tool.clone());
247 }
248 }
249 }
250
251 if matches!(
253 &new_state,
254 McpServerState::Disconnected { .. } | McpServerState::Failed { .. }
255 ) {
256 let prefix = format!("mcp__{server_name}__");
257 state.tools.retain(|t| !t.name.starts_with(&prefix));
258 }
259
260 state
261 .mcp_servers
262 .insert(server_name.clone(), new_state.clone());
263 Ok(vec![Effect::EmitEvent {
264 session_id,
265 event: SessionEvent::McpServerStateChanged {
266 server_name,
267 state: new_state,
268 },
269 }])
270 }
271
272 Action::CompactionComplete {
273 session_id,
274 op_id,
275 compaction_id,
276 summary_message_id,
277 summary,
278 compacted_head_message_id,
279 previous_active_message_id,
280 model,
281 timestamp,
282 } => Ok(handle_compaction_complete(
283 state,
284 session_id,
285 CompactionCompleteParams {
286 op_id,
287 compaction_id,
288 summary_message_id,
289 summary,
290 compacted_head_message_id,
291 previous_active_message_id,
292 model_name: model,
293 timestamp,
294 },
295 )),
296
297 Action::CompactionFailed {
298 session_id,
299 op_id,
300 error,
301 } => Ok(handle_compaction_failed(state, session_id, op_id, error)),
302
303 Action::Shutdown => Ok(vec![]),
304 }
305}
306
307fn handle_user_input(
308 state: &mut AppState,
309 session_id: crate::app::domain::types::SessionId,
310 content: Vec<UserContent>,
311 op_id: crate::app::domain::types::OpId,
312 message_id: crate::app::domain::types::MessageId,
313 model: crate::config::model::ModelId,
314 timestamp: u64,
315) -> Vec<Effect> {
316 let mut effects = Vec::new();
317
318 if state.has_active_operation() {
319 state.queue_user_message(crate::app::domain::state::QueuedUserMessage {
320 content,
321 op_id,
322 message_id,
323 model,
324 queued_at: timestamp,
325 });
326 effects.push(Effect::EmitEvent {
327 session_id,
328 event: SessionEvent::QueueUpdated {
329 queue: snapshot_queue(state),
330 },
331 });
332 return effects;
333 }
334
335 let parent_id = state.message_graph.active_message_id.clone();
336
337 let message = Message {
338 data: MessageData::User { content },
339 timestamp,
340 id: message_id.0.clone(),
341 parent_message_id: parent_id,
342 };
343
344 state.message_graph.add_message(message.clone());
345 state.message_graph.active_message_id = Some(message_id.0.clone());
346
347 state.start_operation(op_id, OperationKind::AgentLoop);
348 state.operation_models.insert(op_id, model.clone());
349
350 effects.push(Effect::EmitEvent {
351 session_id,
352 event: SessionEvent::UserMessageAdded {
353 message: message.clone(),
354 },
355 });
356
357 effects.push(Effect::EmitEvent {
358 session_id,
359 event: SessionEvent::OperationStarted {
360 op_id,
361 kind: OperationKind::AgentLoop,
362 },
363 });
364
365 effects.push(Effect::CallModel {
366 session_id,
367 op_id,
368 model,
369 messages: state
370 .message_graph
371 .get_thread_messages()
372 .into_iter()
373 .cloned()
374 .collect(),
375 system_context: state.cached_system_context.clone(),
376 tools: state.tools.clone(),
377 });
378
379 effects
380}
381
382struct UserEditedMessageParams {
383 original_message_id: crate::app::domain::types::MessageId,
384 new_content: Vec<UserContent>,
385 op_id: crate::app::domain::types::OpId,
386 new_message_id: crate::app::domain::types::MessageId,
387 model: crate::config::model::ModelId,
388 timestamp: u64,
389}
390
391fn handle_user_edited_message(
392 state: &mut AppState,
393 session_id: crate::app::domain::types::SessionId,
394 params: UserEditedMessageParams,
395) -> Result<Vec<Effect>, ReduceError> {
396 let UserEditedMessageParams {
397 original_message_id,
398 new_content,
399 op_id,
400 new_message_id,
401 model,
402 timestamp,
403 } = params;
404 let mut effects = Vec::new();
405
406 if state.has_active_operation() {
407 return Err(invalid_action(
408 InvalidActionKind::OperationInFlight,
409 "Cannot edit message while an operation is active.",
410 ));
411 }
412
413 let parent_id = state
414 .message_graph
415 .messages
416 .iter()
417 .find(|m| m.id() == original_message_id.0)
418 .and_then(|m| m.parent_message_id().map(|s| s.to_string()));
419
420 let message = Message {
421 data: MessageData::User {
422 content: new_content,
423 },
424 timestamp,
425 id: new_message_id.0.clone(),
426 parent_message_id: parent_id,
427 };
428
429 state.message_graph.add_message(message.clone());
430 state.message_graph.active_message_id = Some(new_message_id.0.clone());
431
432 state.start_operation(op_id, OperationKind::AgentLoop);
433 state.operation_models.insert(op_id, model.clone());
434
435 effects.push(Effect::EmitEvent {
436 session_id,
437 event: SessionEvent::UserMessageAdded {
438 message: message.clone(),
439 },
440 });
441
442 effects.push(Effect::EmitEvent {
443 session_id,
444 event: SessionEvent::OperationStarted {
445 op_id,
446 kind: OperationKind::AgentLoop,
447 },
448 });
449
450 effects.push(Effect::CallModel {
451 session_id,
452 op_id,
453 model,
454 messages: state
455 .message_graph
456 .get_thread_messages()
457 .into_iter()
458 .cloned()
459 .collect(),
460 system_context: state.cached_system_context.clone(),
461 tools: state.tools.clone(),
462 });
463
464 Ok(effects)
465}
466
467fn handle_tool_approval_requested(
468 state: &mut AppState,
469 session_id: crate::app::domain::types::SessionId,
470 request_id: crate::app::domain::types::RequestId,
471 tool_call: steer_tools::ToolCall,
472) -> Vec<Effect> {
473 let mut effects = Vec::new();
474
475 if let Err(error) = validate_tool_call(state, &tool_call) {
476 let error_message = error.to_string();
477 return fail_tool_call_without_execution(
478 state,
479 session_id,
480 tool_call,
481 error,
482 error_message,
483 "invalid",
484 true,
485 );
486 }
487
488 let decision = get_tool_decision(state, &tool_call);
489
490 match decision {
491 ToolDecision::Allow => {
492 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
493 return vec![Effect::EmitEvent {
494 session_id,
495 event: SessionEvent::Error {
496 message: "Tool approval requested without active operation".to_string(),
497 },
498 }];
499 };
500 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
501 &tool_call.id,
502 ));
503
504 effects.push(Effect::ExecuteTool {
505 session_id,
506 op_id,
507 tool_call,
508 });
509 }
510 ToolDecision::Deny => {
511 let error = ToolError::DeniedByPolicy(tool_call.name.clone());
512 let tool_name = tool_call.name.clone();
513 effects.extend(fail_tool_call_without_execution(
514 state,
515 session_id,
516 tool_call,
517 error,
518 format!("Tool '{tool_name}' denied by policy"),
519 "denied",
520 true,
521 ));
522 }
523 ToolDecision::Ask => {
524 if state.pending_approval.is_some() {
525 state.approval_queue.push_back(QueuedApproval { tool_call });
526 return effects;
527 }
528
529 state.pending_approval = Some(PendingApproval {
530 request_id,
531 tool_call: tool_call.clone(),
532 });
533
534 effects.push(Effect::EmitEvent {
535 session_id,
536 event: SessionEvent::ApprovalRequested {
537 request_id,
538 tool_call: tool_call.clone(),
539 },
540 });
541
542 effects.push(Effect::RequestUserApproval {
543 session_id,
544 request_id,
545 tool_call,
546 });
547 }
548 }
549
550 effects
551}
552
553fn validate_tool_call(
554 state: &AppState,
555 tool_call: &steer_tools::ToolCall,
556) -> Result<(), ToolError> {
557 if tool_call.name.trim().is_empty() {
558 return Err(ToolError::invalid_params(
559 "unknown",
560 "Malformed tool call: missing tool name",
561 ));
562 }
563
564 if tool_call.id.trim().is_empty() {
565 return Err(ToolError::invalid_params(
566 tool_call.name.clone(),
567 "Malformed tool call: missing tool call id",
568 ));
569 }
570
571 if state.tools.is_empty() {
572 return Ok(());
573 }
574
575 let Some(schema) = state.tools.iter().find(|s| s.name == tool_call.name) else {
576 return Ok(());
577 };
578
579 validate_against_json_schema(
580 &tool_call.name,
581 schema.input_schema.as_value(),
582 &tool_call.parameters,
583 )
584}
585
586fn validate_against_json_schema(
587 tool_name: &str,
588 schema: &Value,
589 params: &Value,
590) -> Result<(), ToolError> {
591 let validator = jsonschema::JSONSchema::compile(schema).map_err(|e| {
592 ToolError::InternalError(format!("Invalid schema for tool '{tool_name}': {e}"))
593 })?;
594
595 if let Err(errors) = validator.validate(params) {
596 let message = errors
597 .into_iter()
598 .map(|error| error.to_string())
599 .next()
600 .unwrap_or_else(|| "Parameters do not match schema".to_string());
601 return Err(ToolError::invalid_params(tool_name.to_string(), message));
602 }
603
604 Ok(())
605}
606
607fn emit_tool_failure_message(
608 state: &mut AppState,
609 session_id: crate::app::domain::types::SessionId,
610 tool_call_id: &str,
611 tool_name: &str,
612 tool_error: ToolError,
613 event_error: String,
614 message_id_prefix: &str,
615) -> Vec<Effect> {
616 let mut effects = Vec::new();
617
618 let tool_result = ToolResult::Error(tool_error);
619 let parent_id = state.message_graph.active_message_id.clone();
620 let tool_message = Message {
621 data: MessageData::Tool {
622 tool_use_id: tool_call_id.to_string(),
623 result: tool_result,
624 },
625 timestamp: 0,
626 id: format!("{message_id_prefix}_{tool_call_id}"),
627 parent_message_id: parent_id,
628 };
629 state.message_graph.add_message(tool_message.clone());
630
631 if let Some(model) = state
632 .current_operation
633 .as_ref()
634 .and_then(|op| state.operation_models.get(&op.op_id).cloned())
635 {
636 effects.push(Effect::EmitEvent {
637 session_id,
638 event: SessionEvent::ToolCallFailed {
639 id: crate::app::domain::types::ToolCallId::from_string(tool_call_id),
640 name: tool_name.to_string(),
641 error: event_error,
642 model,
643 },
644 });
645 }
646
647 effects.push(Effect::EmitEvent {
648 session_id,
649 event: SessionEvent::ToolMessageAdded {
650 message: tool_message,
651 },
652 });
653
654 effects
655}
656
657fn fail_tool_call_without_execution(
658 state: &mut AppState,
659 session_id: crate::app::domain::types::SessionId,
660 tool_call: steer_tools::ToolCall,
661 tool_error: ToolError,
662 event_error: String,
663 message_id_prefix: &str,
664 call_model_if_ready: bool,
665) -> Vec<Effect> {
666 let mut effects = emit_tool_failure_message(
667 state,
668 session_id,
669 &tool_call.id,
670 &tool_call.name,
671 tool_error,
672 event_error,
673 message_id_prefix,
674 );
675
676 if !call_model_if_ready {
677 return effects;
678 }
679
680 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
681 effects.push(Effect::EmitEvent {
682 session_id,
683 event: SessionEvent::Error {
684 message: "Tool failure recorded without active operation".to_string(),
685 },
686 });
687 return effects;
688 };
689 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
690 model
691 } else {
692 effects.push(Effect::EmitEvent {
693 session_id,
694 event: SessionEvent::Error {
695 message: format!("Missing model for operation {op_id}"),
696 },
697 });
698 return effects;
699 };
700
701 let all_tools_complete = state
702 .current_operation
703 .as_ref()
704 .is_none_or(|op| op.pending_tool_calls.is_empty());
705 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
706
707 if all_tools_complete && no_pending_approvals {
708 effects.push(Effect::CallModel {
709 session_id,
710 op_id,
711 model,
712 messages: state
713 .message_graph
714 .get_thread_messages()
715 .into_iter()
716 .cloned()
717 .collect(),
718 system_context: state.cached_system_context.clone(),
719 tools: state.tools.clone(),
720 });
721 }
722
723 effects
724}
725
726fn handle_tool_approval_decided(
727 state: &mut AppState,
728 session_id: crate::app::domain::types::SessionId,
729 request_id: crate::app::domain::types::RequestId,
730 decision: ApprovalDecision,
731 remember: Option<ApprovalMemory>,
732) -> Vec<Effect> {
733 let mut effects = Vec::new();
734
735 let pending = match state.pending_approval.take() {
736 Some(p) if p.request_id == request_id => p,
737 other => {
738 state.pending_approval = other;
739 return effects;
740 }
741 };
742
743 let resolved_memory = if decision == ApprovalDecision::Approved {
744 match remember {
745 Some(ApprovalMemory::PendingTool) => {
746 Some(ApprovalMemory::Tool(pending.tool_call.name.clone()))
747 }
748 Some(ApprovalMemory::Tool(name)) => Some(ApprovalMemory::Tool(name)),
749 Some(ApprovalMemory::BashPattern(pattern)) => {
750 Some(ApprovalMemory::BashPattern(pattern))
751 }
752 None => None,
753 }
754 } else {
755 None
756 };
757
758 effects.push(Effect::EmitEvent {
759 session_id,
760 event: SessionEvent::ApprovalDecided {
761 request_id,
762 decision,
763 remember: resolved_memory.clone(),
764 },
765 });
766
767 if decision == ApprovalDecision::Approved {
768 if let Some(ref memory) = resolved_memory {
769 match memory {
770 ApprovalMemory::Tool(name) => {
771 state.approved_tools.insert(name.clone());
772 }
773 ApprovalMemory::BashPattern(pattern) => {
774 state.approved_bash_patterns.insert(pattern.clone());
775 }
776 ApprovalMemory::PendingTool => {}
777 }
778 }
779
780 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
781 effects.push(Effect::EmitEvent {
782 session_id,
783 event: SessionEvent::Error {
784 message: "Tool approval decided without active operation".to_string(),
785 },
786 });
787 return effects;
788 };
789 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
790 &pending.tool_call.id,
791 ));
792
793 effects.push(Effect::ExecuteTool {
794 session_id,
795 op_id,
796 tool_call: pending.tool_call,
797 });
798 } else {
799 let tool_name = pending.tool_call.name.clone();
800 let error = ToolError::DeniedByUser(tool_name.clone());
801 effects.extend(fail_tool_call_without_execution(
802 state,
803 session_id,
804 pending.tool_call,
805 error,
806 format!("Tool '{tool_name}' denied by user"),
807 "denied",
808 false,
809 ));
810 }
811
812 effects.extend(process_next_queued_approval(state, session_id));
813
814 effects
815}
816
817fn process_next_queued_approval(
818 state: &mut AppState,
819 session_id: crate::app::domain::types::SessionId,
820) -> Vec<Effect> {
821 let mut effects = Vec::new();
822
823 while let Some(queued) = state.approval_queue.pop_front() {
824 let decision = get_tool_decision(state, &queued.tool_call);
825
826 match decision {
827 ToolDecision::Allow => {
828 let Some(op_id) = state.current_operation.as_ref().map(|o| o.op_id) else {
829 effects.push(Effect::EmitEvent {
830 session_id,
831 event: SessionEvent::Error {
832 message: "Queued tool approval processed without active operation"
833 .to_string(),
834 },
835 });
836 state.approval_queue.push_front(queued);
837 break;
838 };
839 state.add_pending_tool_call(crate::app::domain::types::ToolCallId::from_string(
840 &queued.tool_call.id,
841 ));
842
843 effects.push(Effect::ExecuteTool {
844 session_id,
845 op_id,
846 tool_call: queued.tool_call,
847 });
848 }
849 ToolDecision::Deny => {
850 let tool_name = queued.tool_call.name.clone();
851 let error = ToolError::DeniedByPolicy(tool_name.clone());
852 effects.extend(fail_tool_call_without_execution(
853 state,
854 session_id,
855 queued.tool_call,
856 error,
857 format!("Tool '{tool_name}' denied by policy"),
858 "denied",
859 false,
860 ));
861 }
862 ToolDecision::Ask => {
863 let request_id = crate::app::domain::types::RequestId::new();
864 state.pending_approval = Some(PendingApproval {
865 request_id,
866 tool_call: queued.tool_call.clone(),
867 });
868
869 effects.push(Effect::EmitEvent {
870 session_id,
871 event: SessionEvent::ApprovalRequested {
872 request_id,
873 tool_call: queued.tool_call.clone(),
874 },
875 });
876
877 effects.push(Effect::RequestUserApproval {
878 session_id,
879 request_id,
880 tool_call: queued.tool_call,
881 });
882
883 break;
884 }
885 }
886 }
887
888 let all_tools_complete = state
889 .current_operation
890 .as_ref()
891 .is_none_or(|op| op.pending_tool_calls.is_empty());
892 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
893
894 if all_tools_complete
895 && no_pending_approvals
896 && let Some(op) = &state.current_operation
897 {
898 let op_id = op.op_id;
899 if let Some(model) = state.operation_models.get(&op_id).cloned() {
900 effects.push(Effect::CallModel {
901 session_id,
902 op_id,
903 model,
904 messages: state
905 .message_graph
906 .get_thread_messages()
907 .into_iter()
908 .cloned()
909 .collect(),
910 system_context: state.cached_system_context.clone(),
911 tools: state.tools.clone(),
912 });
913 }
914 }
915
916 effects
917}
918
919fn get_tool_decision(state: &AppState, tool_call: &steer_tools::ToolCall) -> ToolDecision {
920 if state.approved_tools.contains(&tool_call.name) {
921 return ToolDecision::Allow;
922 }
923
924 if tool_call.name == DISPATCH_AGENT_TOOL_NAME
925 && let Ok(params) =
926 serde_json::from_value::<DispatchAgentParams>(tool_call.parameters.clone())
927 && let Some(config) = state.session_config.as_ref()
928 {
929 let policy = &config.tool_config.approval_policy;
930 match params.target {
931 DispatchAgentTarget::Resume { .. } => {
932 return ToolDecision::Allow;
933 }
934 DispatchAgentTarget::New { agent, .. } => {
935 let agent_id = agent
936 .as_deref()
937 .filter(|value| !value.trim().is_empty())
938 .map_or_else(|| default_agent_spec_id().to_string(), str::to_string);
939 if policy.is_dispatch_agent_pattern_preapproved(&agent_id) {
940 return ToolDecision::Allow;
941 }
942 }
943 }
944 }
945
946 if tool_call.name == BASH_TOOL_NAME
947 && let Ok(params) = serde_json::from_value::<steer_tools::tools::bash::BashParams>(
948 tool_call.parameters.clone(),
949 )
950 && state.is_bash_pattern_approved(¶ms.command)
951 {
952 return ToolDecision::Allow;
953 }
954
955 state
956 .session_config
957 .as_ref()
958 .map_or(ToolDecision::Ask, |config| {
959 config
960 .tool_config
961 .approval_policy
962 .tool_decision(&tool_call.name)
963 })
964}
965
966fn handle_tool_execution_started(
967 state: &mut AppState,
968 session_id: crate::app::domain::types::SessionId,
969 tool_call_id: crate::app::domain::types::ToolCallId,
970 tool_name: String,
971 tool_parameters: serde_json::Value,
972) -> Vec<Effect> {
973 state.add_pending_tool_call(tool_call_id.clone());
974
975 let op_id = match state.current_operation.as_ref() {
976 Some(op) => op.op_id,
977 None => {
978 return vec![Effect::EmitEvent {
979 session_id,
980 event: SessionEvent::Error {
981 message: "Tool call started without active operation".to_string(),
982 },
983 }];
984 }
985 };
986
987 let is_direct_bash = matches!(
988 state.current_operation.as_ref().map(|op| &op.kind),
989 Some(OperationKind::DirectBash { .. })
990 );
991
992 if is_direct_bash {
993 return vec![];
994 }
995
996 let model = match state.operation_models.get(&op_id).cloned() {
997 Some(model) => model,
998 None => {
999 return vec![Effect::EmitEvent {
1000 session_id,
1001 event: SessionEvent::Error {
1002 message: format!("Missing model for tool call on operation {op_id}"),
1003 },
1004 }];
1005 }
1006 };
1007
1008 vec![Effect::EmitEvent {
1009 session_id,
1010 event: SessionEvent::ToolCallStarted {
1011 id: tool_call_id,
1012 name: tool_name,
1013 parameters: tool_parameters,
1014 model,
1015 },
1016 }]
1017}
1018
1019fn handle_tool_result(
1020 state: &mut AppState,
1021 session_id: crate::app::domain::types::SessionId,
1022 tool_call_id: crate::app::domain::types::ToolCallId,
1023 tool_name: String,
1024 result: Result<ToolResult, ToolError>,
1025) -> Vec<Effect> {
1026 let mut effects = Vec::new();
1027
1028 let op = match &state.current_operation {
1029 Some(op) => {
1030 if state.cancelled_ops.contains(&op.op_id) {
1031 tracing::debug!("Ignoring late tool result for cancelled op {:?}", op.op_id);
1032 return effects;
1033 }
1034 op.clone()
1035 }
1036 None => return effects,
1037 };
1038 let op_id = op.op_id;
1039
1040 state.remove_pending_tool_call(&tool_call_id);
1041
1042 let tool_result = match result {
1043 Ok(r) => r,
1044 Err(e) => ToolResult::Error(e),
1045 };
1046
1047 let is_direct_bash = matches!(op.kind, OperationKind::DirectBash { .. });
1048
1049 if is_direct_bash {
1050 let command = match &op.kind {
1051 OperationKind::DirectBash { command } => command.clone(),
1052 _ => tool_name,
1053 };
1054
1055 let (stdout, stderr, exit_code) = match &tool_result {
1056 ToolResult::Bash(result) => (
1057 result.stdout.clone(),
1058 result.stderr.clone(),
1059 result.exit_code,
1060 ),
1061 ToolResult::Error(err) => (String::new(), err.to_string(), 1),
1062 other => (format!("{other:?}"), String::new(), 0),
1063 };
1064
1065 let updated = state.operation_messages.remove(&op_id).and_then(|id| {
1066 state
1067 .message_graph
1068 .update_command_execution(
1069 id.as_str(),
1070 command.clone(),
1071 stdout.clone(),
1072 stderr.clone(),
1073 exit_code,
1074 )
1075 .or_else(|| {
1076 let parent_id = state.message_graph.active_message_id.clone();
1077 let timestamp = Message::current_timestamp();
1078 Some(Message {
1079 data: MessageData::User {
1080 content: vec![UserContent::CommandExecution {
1081 command: command.clone(),
1082 stdout: stdout.clone(),
1083 stderr: stderr.clone(),
1084 exit_code,
1085 }],
1086 },
1087 timestamp,
1088 id: id.to_string(),
1089 parent_message_id: parent_id,
1090 })
1091 })
1092 });
1093
1094 state.complete_operation(op_id);
1095
1096 if let Some(message) = updated {
1097 effects.push(Effect::EmitEvent {
1098 session_id,
1099 event: SessionEvent::MessageUpdated { message },
1100 });
1101 }
1102
1103 effects.push(Effect::EmitEvent {
1104 session_id,
1105 event: SessionEvent::OperationCompleted { op_id },
1106 });
1107
1108 effects.extend(maybe_start_queued_work(state, session_id));
1109
1110 return effects;
1111 }
1112
1113 let model = match state.operation_models.get(&op_id).cloned() {
1114 Some(model) => model,
1115 None => {
1116 return vec![Effect::EmitEvent {
1117 session_id,
1118 event: SessionEvent::Error {
1119 message: format!("Missing model for tool result on operation {op_id}"),
1120 },
1121 }];
1122 }
1123 };
1124
1125 let event = match &tool_result {
1126 ToolResult::Error(e) => SessionEvent::ToolCallFailed {
1127 id: tool_call_id.clone(),
1128 name: tool_name.clone(),
1129 error: e.to_string(),
1130 model: model.clone(),
1131 },
1132 _ => SessionEvent::ToolCallCompleted {
1133 id: tool_call_id.clone(),
1134 name: tool_name,
1135 result: tool_result.clone(),
1136 model: model.clone(),
1137 },
1138 };
1139
1140 effects.push(Effect::EmitEvent { session_id, event });
1141
1142 let parent_id = state.message_graph.active_message_id.clone();
1143 let tool_message = Message {
1144 data: MessageData::Tool {
1145 tool_use_id: tool_call_id.0.clone(),
1146 result: tool_result,
1147 },
1148 timestamp: 0,
1149 id: format!("tool_result_{}", tool_call_id.0),
1150 parent_message_id: parent_id,
1151 };
1152 state.message_graph.add_message(tool_message.clone());
1153
1154 effects.push(Effect::EmitEvent {
1155 session_id,
1156 event: SessionEvent::ToolMessageAdded {
1157 message: tool_message,
1158 },
1159 });
1160
1161 let all_tools_complete = state
1162 .current_operation
1163 .as_ref()
1164 .is_none_or(|op| op.pending_tool_calls.is_empty());
1165 let no_pending_approvals = state.pending_approval.is_none() && state.approval_queue.is_empty();
1166
1167 if all_tools_complete && no_pending_approvals {
1168 effects.push(Effect::CallModel {
1169 session_id,
1170 op_id,
1171 model,
1172 messages: state
1173 .message_graph
1174 .get_thread_messages()
1175 .into_iter()
1176 .cloned()
1177 .collect(),
1178 system_context: state.cached_system_context.clone(),
1179 tools: state.tools.clone(),
1180 });
1181 }
1182
1183 effects
1184}
1185
1186struct ModelResponseCompleteParams {
1187 op_id: crate::app::domain::types::OpId,
1188 message_id: crate::app::domain::types::MessageId,
1189 content: Vec<AssistantContent>,
1190 usage: Option<TokenUsage>,
1191 context_window_tokens: Option<u32>,
1192 timestamp: u64,
1193}
1194
1195fn handle_model_response_complete(
1196 state: &mut AppState,
1197 session_id: crate::app::domain::types::SessionId,
1198 params: ModelResponseCompleteParams,
1199) -> Vec<Effect> {
1200 let ModelResponseCompleteParams {
1201 op_id,
1202 message_id,
1203 content,
1204 usage,
1205 context_window_tokens,
1206 timestamp,
1207 } = params;
1208 let mut effects = Vec::new();
1209
1210 if state.cancelled_ops.contains(&op_id) {
1211 tracing::debug!("Ignoring model response for cancelled op {:?}", op_id);
1212 return effects;
1213 }
1214
1215 let tool_calls: Vec<_> = content
1216 .iter()
1217 .filter_map(|c| {
1218 if let AssistantContent::ToolCall { tool_call, .. } = c {
1219 Some(tool_call.clone())
1220 } else {
1221 None
1222 }
1223 })
1224 .collect();
1225
1226 let parent_id = state.message_graph.active_message_id.clone();
1227
1228 let message = Message {
1229 data: MessageData::Assistant {
1230 content: content.clone(),
1231 },
1232 timestamp,
1233 id: message_id.0.clone(),
1234 parent_message_id: parent_id,
1235 };
1236
1237 state.message_graph.add_message(message.clone());
1238 state.message_graph.active_message_id = Some(message_id.0.clone());
1239
1240 let model = match state.operation_models.get(&op_id).cloned() {
1241 Some(model) => model,
1242 None => {
1243 return vec![Effect::EmitEvent {
1244 session_id,
1245 event: SessionEvent::Error {
1246 message: format!("Missing model for operation {op_id}"),
1247 },
1248 }];
1249 }
1250 };
1251
1252 effects.push(Effect::EmitEvent {
1253 session_id,
1254 event: SessionEvent::AssistantMessageAdded {
1255 message,
1256 model: model.clone(),
1257 },
1258 });
1259
1260 let outer_usage = usage;
1262
1263 if let Some(usage) = outer_usage {
1264 let context_window = derive_context_window_usage(usage.total_tokens, context_window_tokens);
1265 state.record_llm_usage(op_id, model.clone(), usage, context_window.clone());
1266 effects.push(Effect::EmitEvent {
1267 session_id,
1268 event: SessionEvent::LlmUsageUpdated {
1269 op_id,
1270 model: model.clone(),
1271 usage,
1272 context_window,
1273 },
1274 });
1275 }
1276
1277 if tool_calls.is_empty() {
1278 state.complete_operation(op_id);
1279 effects.push(Effect::EmitEvent {
1280 session_id,
1281 event: SessionEvent::OperationCompleted { op_id },
1282 });
1283 let auto = maybe_auto_compact(
1285 state,
1286 session_id,
1287 outer_usage,
1288 context_window_tokens,
1289 &model,
1290 );
1291 if auto.is_empty() {
1292 effects.extend(maybe_start_queued_work(state, session_id));
1293 } else {
1294 effects.extend(auto);
1295 }
1296 } else {
1297 for tool_call in tool_calls {
1298 let request_id = crate::app::domain::types::RequestId::new();
1299 effects.extend(handle_tool_approval_requested(
1300 state, session_id, request_id, tool_call,
1301 ));
1302 }
1303 }
1304
1305 effects
1306}
1307
1308fn handle_model_response_error(
1309 state: &mut AppState,
1310 session_id: crate::app::domain::types::SessionId,
1311 op_id: crate::app::domain::types::OpId,
1312 error: &str,
1313) -> Vec<Effect> {
1314 let mut effects = Vec::new();
1315
1316 if state.cancelled_ops.contains(&op_id) {
1317 return effects;
1318 }
1319
1320 state.complete_operation(op_id);
1321
1322 effects.push(Effect::EmitEvent {
1323 session_id,
1324 event: SessionEvent::Error {
1325 message: error.to_string(),
1326 },
1327 });
1328
1329 effects.push(Effect::EmitEvent {
1330 session_id,
1331 event: SessionEvent::OperationCompleted { op_id },
1332 });
1333
1334 effects.extend(maybe_start_queued_work(state, session_id));
1335
1336 effects
1337}
1338
1339fn handle_direct_bash(
1340 state: &mut AppState,
1341 session_id: crate::app::domain::types::SessionId,
1342 op_id: crate::app::domain::types::OpId,
1343 message_id: crate::app::domain::types::MessageId,
1344 command: String,
1345 timestamp: u64,
1346) -> Vec<Effect> {
1347 let mut effects = Vec::new();
1348
1349 if state.has_active_operation() {
1350 state.queue_bash_command(crate::app::domain::state::QueuedBashCommand {
1351 command,
1352 op_id,
1353 message_id,
1354 queued_at: timestamp,
1355 });
1356 effects.push(Effect::EmitEvent {
1357 session_id,
1358 event: SessionEvent::QueueUpdated {
1359 queue: snapshot_queue(state),
1360 },
1361 });
1362 return effects;
1363 }
1364
1365 let parent_id = state.message_graph.active_message_id.clone();
1366 let message = Message {
1367 data: MessageData::User {
1368 content: vec![UserContent::CommandExecution {
1369 command: command.clone(),
1370 stdout: String::new(),
1371 stderr: String::new(),
1372 exit_code: 0,
1373 }],
1374 },
1375 timestamp,
1376 id: message_id.0.clone(),
1377 parent_message_id: parent_id,
1378 };
1379
1380 state.message_graph.add_message(message.clone());
1381 state.message_graph.active_message_id = Some(message_id.0.clone());
1382
1383 state.start_operation(
1384 op_id,
1385 OperationKind::DirectBash {
1386 command: command.clone(),
1387 },
1388 );
1389 state.operation_messages.insert(op_id, message_id);
1390
1391 effects.push(Effect::EmitEvent {
1392 session_id,
1393 event: SessionEvent::UserMessageAdded { message },
1394 });
1395
1396 effects.push(Effect::EmitEvent {
1397 session_id,
1398 event: SessionEvent::OperationStarted {
1399 op_id,
1400 kind: OperationKind::DirectBash {
1401 command: command.clone(),
1402 },
1403 },
1404 });
1405
1406 let tool_call = steer_tools::ToolCall {
1407 id: format!("direct_bash_{op_id}"),
1408 name: BASH_TOOL_NAME.to_string(),
1409 parameters: serde_json::json!({ "command": command }),
1410 };
1411
1412 effects.push(Effect::ExecuteTool {
1413 session_id,
1414 op_id,
1415 tool_call,
1416 });
1417
1418 effects
1419}
1420
1421fn handle_dequeue_queued_item(
1422 state: &mut AppState,
1423 session_id: crate::app::domain::types::SessionId,
1424) -> Result<Vec<Effect>, ReduceError> {
1425 if state.pop_next_queued_work().is_some() {
1426 Ok(vec![Effect::EmitEvent {
1427 session_id,
1428 event: SessionEvent::QueueUpdated {
1429 queue: snapshot_queue(state),
1430 },
1431 }])
1432 } else {
1433 Err(invalid_action(
1434 InvalidActionKind::QueueEmpty,
1435 "No queued item to remove.",
1436 ))
1437 }
1438}
1439
1440fn maybe_auto_compact(
1441 state: &mut AppState,
1442 session_id: crate::app::domain::types::SessionId,
1443 usage: Option<TokenUsage>,
1444 context_window_tokens: Option<u32>,
1445 model: &crate::config::model::ModelId,
1446) -> Vec<Effect> {
1447 let config = match &state.session_config {
1449 Some(c) if c.auto_compaction.enabled => c,
1450 _ => return vec![],
1451 };
1452 let threshold = config.auto_compaction.threshold_ratio();
1453
1454 let usage = match usage {
1456 Some(u) => u,
1457 None => return vec![],
1458 };
1459
1460 let cw = match derive_context_window_usage(usage.total_tokens, context_window_tokens) {
1462 Some(cw) if !cw.estimated => cw,
1463 _ => return vec![],
1464 };
1465 let ratio = match cw.utilization_ratio {
1466 Some(r) if r >= threshold => r,
1467 _ => return vec![],
1468 };
1469 let _ = ratio; if state.message_graph.get_thread_messages().len() < MIN_MESSAGES_FOR_COMPACT {
1473 return vec![];
1474 }
1475
1476 if !state.queued_work.is_empty() {
1478 return vec![];
1479 }
1480
1481 if state.has_active_operation() {
1483 return vec![];
1484 }
1485
1486 let op_id = crate::app::domain::types::OpId::new();
1487 let kind = OperationKind::Compact {
1488 trigger: CompactTrigger::Auto,
1489 };
1490 state.start_operation(op_id, kind.clone());
1491 state.operation_models.insert(op_id, model.clone());
1492
1493 vec![
1494 Effect::EmitEvent {
1495 session_id,
1496 event: SessionEvent::OperationStarted { op_id, kind },
1497 },
1498 Effect::RequestCompaction {
1499 session_id,
1500 op_id,
1501 model: model.clone(),
1502 },
1503 ]
1504}
1505
1506fn maybe_start_queued_work(
1507 state: &mut AppState,
1508 session_id: crate::app::domain::types::SessionId,
1509) -> Vec<Effect> {
1510 if state.has_active_operation() {
1511 return vec![];
1512 }
1513
1514 let Some(next) = state.pop_next_queued_work() else {
1515 return vec![];
1516 };
1517
1518 let mut effects = vec![Effect::EmitEvent {
1519 session_id,
1520 event: SessionEvent::QueueUpdated {
1521 queue: snapshot_queue(state),
1522 },
1523 }];
1524
1525 match next {
1526 QueuedWorkItem::UserMessage(item) => {
1527 effects.extend(handle_user_input(
1528 state,
1529 session_id,
1530 item.content,
1531 item.op_id,
1532 item.message_id,
1533 item.model,
1534 item.queued_at,
1535 ));
1536 }
1537 QueuedWorkItem::DirectBash(item) => {
1538 effects.extend(handle_direct_bash(
1539 state,
1540 session_id,
1541 item.op_id,
1542 item.message_id,
1543 item.command,
1544 item.queued_at,
1545 ));
1546 }
1547 }
1548
1549 effects
1550}
1551
1552fn snapshot_queue(state: &AppState) -> Vec<QueuedWorkItemSnapshot> {
1553 state
1554 .queued_work
1555 .iter()
1556 .map(snapshot_queued_work_item)
1557 .collect()
1558}
1559
1560fn snapshot_queued_work_item(item: &QueuedWorkItem) -> QueuedWorkItemSnapshot {
1561 match item {
1562 QueuedWorkItem::UserMessage(message) => QueuedWorkItemSnapshot {
1563 kind: Some(QueuedWorkKind::UserMessage),
1564 content: message
1565 .content
1566 .iter()
1567 .filter_map(|item| match item {
1568 UserContent::Text { text } => Some(text.as_str()),
1569 _ => None,
1570 })
1571 .collect::<String>(),
1572 queued_at: message.queued_at,
1573 model: Some(message.model.clone()),
1574 op_id: message.op_id,
1575 message_id: message.message_id.clone(),
1576 attachment_count: message
1577 .content
1578 .iter()
1579 .filter(|item| matches!(item, UserContent::Image { .. }))
1580 .count() as u32,
1581 },
1582 QueuedWorkItem::DirectBash(command) => QueuedWorkItemSnapshot {
1583 kind: Some(QueuedWorkKind::DirectBash),
1584 content: command.command.clone(),
1585 queued_at: command.queued_at,
1586 model: None,
1587 op_id: command.op_id,
1588 message_id: command.message_id.clone(),
1589 attachment_count: 0,
1590 },
1591 }
1592}
1593
1594fn handle_request_compaction(
1595 state: &mut AppState,
1596 session_id: crate::app::domain::types::SessionId,
1597 op_id: crate::app::domain::types::OpId,
1598 model: crate::config::model::ModelId,
1599) -> Result<Vec<Effect>, ReduceError> {
1600 let message_count = state.message_graph.get_thread_messages().len();
1601
1602 if state.has_active_operation() {
1603 return Err(invalid_action(
1604 InvalidActionKind::OperationInFlight,
1605 "Cannot compact while an operation is active.",
1606 ));
1607 }
1608
1609 if message_count < MIN_MESSAGES_FOR_COMPACT {
1610 return Ok(vec![Effect::EmitEvent {
1611 session_id,
1612 event: SessionEvent::CompactResult {
1613 result: crate::app::domain::event::CompactResult::InsufficientMessages,
1614 trigger: CompactTrigger::Manual,
1615 },
1616 }]);
1617 }
1618
1619 let kind = OperationKind::Compact {
1620 trigger: CompactTrigger::Manual,
1621 };
1622 state.start_operation(op_id, kind.clone());
1623 state.operation_models.insert(op_id, model.clone());
1624
1625 Ok(vec![
1626 Effect::EmitEvent {
1627 session_id,
1628 event: SessionEvent::OperationStarted { op_id, kind },
1629 },
1630 Effect::RequestCompaction {
1631 session_id,
1632 op_id,
1633 model,
1634 },
1635 ])
1636}
1637
1638fn handle_cancel(
1639 state: &mut AppState,
1640 session_id: crate::app::domain::types::SessionId,
1641 target_op: Option<crate::app::domain::types::OpId>,
1642) -> Vec<Effect> {
1643 let mut effects = Vec::new();
1644
1645 let op = match &state.current_operation {
1646 Some(op) if target_op.is_none_or(|t| t == op.op_id) => op.clone(),
1647 _ => return effects,
1648 };
1649
1650 state.record_cancelled_op(op.op_id);
1651
1652 if let OperationKind::Compact { trigger } = op.kind {
1653 effects.push(Effect::EmitEvent {
1654 session_id,
1655 event: SessionEvent::CompactResult {
1656 result: crate::app::domain::event::CompactResult::Cancelled,
1657 trigger,
1658 },
1659 });
1660 }
1661
1662 let pending_approval = state.pending_approval.take();
1663 let queued_approvals = std::mem::take(&mut state.approval_queue);
1664
1665 if matches!(op.kind, OperationKind::AgentLoop) {
1666 if let Some(pending) = pending_approval {
1667 let tool_name = pending.tool_call.name.clone();
1668 effects.extend(emit_tool_failure_message(
1669 state,
1670 session_id,
1671 &pending.tool_call.id,
1672 &tool_name,
1673 ToolError::Cancelled(tool_name.clone()),
1674 format!("Tool '{tool_name}' cancelled"),
1675 "cancelled",
1676 ));
1677 }
1678
1679 for queued in queued_approvals {
1680 let tool_name = queued.tool_call.name.clone();
1681 effects.extend(emit_tool_failure_message(
1682 state,
1683 session_id,
1684 &queued.tool_call.id,
1685 &tool_name,
1686 ToolError::Cancelled(tool_name.clone()),
1687 format!("Tool '{tool_name}' cancelled"),
1688 "cancelled",
1689 ));
1690 }
1691
1692 for tool_call_id in &op.pending_tool_calls {
1693 let tool_name = state
1694 .message_graph
1695 .find_tool_name_by_id(tool_call_id.as_str())
1696 .unwrap_or_else(|| tool_call_id.as_str().to_string());
1697 let event_error = if tool_name == tool_call_id.as_str() {
1698 format!("Tool call '{tool_call_id}' cancelled")
1699 } else {
1700 format!("Tool '{tool_name}' cancelled")
1701 };
1702 effects.extend(emit_tool_failure_message(
1703 state,
1704 session_id,
1705 tool_call_id.as_str(),
1706 &tool_name,
1707 ToolError::Cancelled(tool_name.clone()),
1708 event_error,
1709 "cancelled",
1710 ));
1711 }
1712 }
1713 state.active_streams.remove(&op.op_id);
1714
1715 let dequeued_item = state.pop_next_queued_work();
1716 let popped_queued_item = dequeued_item.as_ref().map(snapshot_queued_work_item);
1717
1718 effects.push(Effect::EmitEvent {
1719 session_id,
1720 event: SessionEvent::OperationCancelled {
1721 op_id: op.op_id,
1722 info: CancellationInfo {
1723 pending_tool_calls: op.pending_tool_calls.len(),
1724 popped_queued_item,
1725 },
1726 },
1727 });
1728
1729 effects.push(Effect::CancelOperation {
1730 session_id,
1731 op_id: op.op_id,
1732 });
1733
1734 state.complete_operation(op.op_id);
1735
1736 if dequeued_item.is_some() {
1737 effects.push(Effect::EmitEvent {
1738 session_id,
1739 event: SessionEvent::QueueUpdated {
1740 queue: snapshot_queue(state),
1741 },
1742 });
1743 }
1744
1745 effects
1746}
1747
1748fn handle_hydrate(
1749 state: &mut AppState,
1750 session_id: crate::app::domain::types::SessionId,
1751 events: Vec<SessionEvent>,
1752 starting_sequence: u64,
1753) -> Vec<Effect> {
1754 for event in events {
1755 apply_event_to_state(state, &event);
1756 }
1757
1758 state.event_sequence = starting_sequence;
1759
1760 emit_mcp_connect_effects(state, session_id)
1761}
1762
1763fn handle_switch_primary_agent(
1764 state: &mut AppState,
1765 session_id: crate::app::domain::types::SessionId,
1766 agent_id: String,
1767) -> Result<Vec<Effect>, ReduceError> {
1768 if state.current_operation.is_some() {
1769 return Err(invalid_action(
1770 InvalidActionKind::OperationInFlight,
1771 "Cannot switch primary agent while an operation is active.",
1772 ));
1773 }
1774
1775 let Some(base_config) = state
1776 .base_session_config
1777 .as_ref()
1778 .or(state.session_config.as_ref())
1779 else {
1780 return Err(invalid_action(
1781 InvalidActionKind::MissingSessionConfig,
1782 "Cannot switch primary agent without session config.",
1783 ));
1784 };
1785
1786 let Some(_spec) = primary_agent_spec(&agent_id) else {
1787 return Err(invalid_action(
1788 InvalidActionKind::UnknownPrimaryAgent,
1789 format!("Unknown primary agent '{agent_id}'."),
1790 ));
1791 };
1792
1793 let mut updated_config = base_config.clone();
1794 updated_config.primary_agent_id = Some(agent_id.clone());
1795 let new_config = resolve_effective_config(&updated_config);
1796 let backend_effects = mcp_backend_diff_effects(session_id, base_config, &new_config);
1797
1798 apply_session_config_state(state, &new_config, Some(agent_id.clone()), false);
1799
1800 let mut effects = Vec::new();
1801 effects.push(Effect::EmitEvent {
1802 session_id,
1803 event: SessionEvent::SessionConfigUpdated {
1804 config: Box::new(new_config),
1805 primary_agent_id: agent_id,
1806 },
1807 });
1808 effects.extend(backend_effects);
1809 effects.push(Effect::ReloadToolSchemas { session_id });
1810
1811 Ok(effects)
1812}
1813
1814fn apply_session_config_state(
1815 state: &mut AppState,
1816 config: &crate::session::state::SessionConfig,
1817 primary_agent_id: Option<String>,
1818 update_base: bool,
1819) {
1820 state.apply_session_config(config, primary_agent_id, update_base);
1821}
1822
1823fn mcp_backend_diff_effects(
1824 session_id: crate::app::domain::types::SessionId,
1825 old_config: &crate::session::state::SessionConfig,
1826 new_config: &crate::session::state::SessionConfig,
1827) -> Vec<Effect> {
1828 let old_map = collect_mcp_backends(old_config);
1829 let new_map = collect_mcp_backends(new_config);
1830
1831 let mut effects = Vec::new();
1832
1833 for (server_name, (old_transport, old_filter)) in &old_map {
1834 match new_map.get(server_name) {
1835 None => {
1836 effects.push(Effect::DisconnectMcpServer {
1837 session_id,
1838 server_name: server_name.clone(),
1839 });
1840 }
1841 Some((new_transport, new_filter)) => {
1842 if new_transport != old_transport || new_filter != old_filter {
1843 effects.push(Effect::DisconnectMcpServer {
1844 session_id,
1845 server_name: server_name.clone(),
1846 });
1847 effects.push(Effect::ConnectMcpServer {
1848 session_id,
1849 config: McpServerConfig {
1850 server_name: server_name.clone(),
1851 transport: new_transport.clone(),
1852 tool_filter: new_filter.clone(),
1853 },
1854 });
1855 }
1856 }
1857 }
1858 }
1859
1860 for (server_name, (new_transport, new_filter)) in &new_map {
1861 if !old_map.contains_key(server_name) {
1862 effects.push(Effect::ConnectMcpServer {
1863 session_id,
1864 config: McpServerConfig {
1865 server_name: server_name.clone(),
1866 transport: new_transport.clone(),
1867 tool_filter: new_filter.clone(),
1868 },
1869 });
1870 }
1871 }
1872
1873 effects
1874}
1875
1876fn collect_mcp_backends(
1877 config: &crate::session::state::SessionConfig,
1878) -> std::collections::HashMap<
1879 String,
1880 (
1881 crate::tools::McpTransport,
1882 crate::session::state::ToolFilter,
1883 ),
1884> {
1885 let mut map = std::collections::HashMap::new();
1886
1887 for backend_config in &config.tool_config.backends {
1888 let BackendConfig::Mcp {
1889 server_name,
1890 transport,
1891 tool_filter,
1892 } = backend_config;
1893
1894 map.insert(
1895 server_name.clone(),
1896 (transport.clone(), tool_filter.clone()),
1897 );
1898 }
1899
1900 map
1901}
1902
1903pub fn apply_event_to_state(state: &mut AppState, event: &SessionEvent) {
1904 match event {
1905 SessionEvent::SessionCreated { config, .. } => {
1906 let primary_agent_id = config
1907 .primary_agent_id
1908 .clone()
1909 .unwrap_or_else(|| default_primary_agent_id().to_string());
1910 apply_session_config_state(state, config, Some(primary_agent_id), true);
1911 }
1912 SessionEvent::SessionConfigUpdated {
1913 config,
1914 primary_agent_id,
1915 } => {
1916 apply_session_config_state(state, config, Some(primary_agent_id.clone()), false);
1917 }
1918 SessionEvent::AssistantMessageAdded { message, .. }
1919 | SessionEvent::UserMessageAdded { message }
1920 | SessionEvent::ToolMessageAdded { message } => {
1921 state.message_graph.add_message(message.clone());
1922 state.message_graph.active_message_id = Some(message.id().to_string());
1923 }
1924 SessionEvent::MessageUpdated { message } => {
1925 state.message_graph.replace_message(message.clone());
1926 }
1927 SessionEvent::ApprovalDecided {
1928 decision, remember, ..
1929 } => {
1930 if *decision == ApprovalDecision::Approved
1931 && let Some(memory) = remember
1932 {
1933 match memory {
1934 ApprovalMemory::Tool(name) => {
1935 state.approved_tools.insert(name.clone());
1936 }
1937 ApprovalMemory::BashPattern(pattern) => {
1938 state.approved_bash_patterns.insert(pattern.clone());
1939 }
1940 ApprovalMemory::PendingTool => {}
1941 }
1942 }
1943 state.pending_approval = None;
1944 }
1945 SessionEvent::OperationCompleted { op_id } => {
1946 state.complete_operation(*op_id);
1947 }
1948 SessionEvent::OperationCancelled { op_id, .. } => {
1949 state.record_cancelled_op(*op_id);
1950 state.complete_operation(*op_id);
1951 }
1952 SessionEvent::LlmUsageUpdated {
1953 op_id,
1954 model,
1955 usage,
1956 context_window,
1957 } => {
1958 state.record_llm_usage(*op_id, model.clone(), *usage, context_window.clone());
1959 }
1960 SessionEvent::McpServerStateChanged {
1961 server_name,
1962 state: mcp_state,
1963 } => {
1964 state
1965 .mcp_servers
1966 .insert(server_name.clone(), mcp_state.clone());
1967 }
1968 SessionEvent::QueueUpdated { queue } => {
1969 let parse_content = |content: &str| {
1970 if content.trim().is_empty() {
1971 None
1972 } else {
1973 Some(vec![UserContent::Text {
1974 text: content.to_string(),
1975 }])
1976 }
1977 };
1978
1979 state.queued_work = queue
1980 .iter()
1981 .filter_map(|item| match item.kind {
1982 Some(QueuedWorkKind::UserMessage) => {
1983 let content = parse_content(item.content.as_str())?;
1984 Some(QueuedWorkItem::UserMessage(
1985 crate::app::domain::state::QueuedUserMessage {
1986 content,
1987 op_id: item.op_id,
1988 message_id: item.message_id.clone(),
1989 model: item.model.clone().unwrap_or_else(
1990 crate::config::model::builtin::claude_sonnet_4_5,
1991 ),
1992 queued_at: item.queued_at,
1993 },
1994 ))
1995 }
1996 Some(QueuedWorkKind::DirectBash) => Some(QueuedWorkItem::DirectBash(
1997 crate::app::domain::state::QueuedBashCommand {
1998 command: item.content.clone(),
1999 op_id: item.op_id,
2000 message_id: item.message_id.clone(),
2001 queued_at: item.queued_at,
2002 },
2003 )),
2004 None => {
2005 let content = parse_content(item.content.as_str())?;
2006 Some(QueuedWorkItem::UserMessage(
2007 crate::app::domain::state::QueuedUserMessage {
2008 content,
2009 op_id: item.op_id,
2010 message_id: item.message_id.clone(),
2011 model: item.model.clone().unwrap_or_else(
2012 crate::config::model::builtin::claude_sonnet_4_5,
2013 ),
2014 queued_at: item.queued_at,
2015 },
2016 ))
2017 }
2018 })
2019 .collect();
2020 }
2021 SessionEvent::ConversationCompacted { record } => {
2022 state
2023 .compaction_summary_ids
2024 .insert(record.summary_message_id.to_string());
2025 state
2026 .message_graph
2027 .mark_compaction_summary(record.summary_message_id.to_string());
2028 }
2029 _ => {}
2030 }
2031
2032 state.event_sequence += 1;
2033}
2034
2035struct CompactionCompleteParams {
2036 op_id: crate::app::domain::types::OpId,
2037 compaction_id: crate::app::domain::types::CompactionId,
2038 summary_message_id: crate::app::domain::types::MessageId,
2039 summary: String,
2040 compacted_head_message_id: crate::app::domain::types::MessageId,
2041 previous_active_message_id: Option<crate::app::domain::types::MessageId>,
2042 model_name: String,
2043 timestamp: u64,
2044}
2045
2046fn handle_compaction_complete(
2047 state: &mut AppState,
2048 session_id: crate::app::domain::types::SessionId,
2049 params: CompactionCompleteParams,
2050) -> Vec<Effect> {
2051 use crate::app::conversation::{AssistantContent, Message, MessageData};
2052 use crate::app::domain::types::CompactionRecord;
2053
2054 let CompactionCompleteParams {
2055 op_id,
2056 compaction_id,
2057 summary_message_id,
2058 summary,
2059 compacted_head_message_id,
2060 previous_active_message_id,
2061 model_name,
2062 timestamp,
2063 } = params;
2064
2065 let summary_message = Message {
2066 data: MessageData::Assistant {
2067 content: vec![AssistantContent::Text {
2068 text: summary.clone(),
2069 }],
2070 },
2071 id: summary_message_id.to_string(),
2072 parent_message_id: None,
2073 timestamp,
2074 };
2075
2076 state.message_graph.add_message(summary_message.clone());
2077
2078 state
2080 .compaction_summary_ids
2081 .insert(summary_message_id.to_string());
2082 state
2083 .message_graph
2084 .mark_compaction_summary(summary_message_id.to_string());
2085
2086 let record = CompactionRecord::with_timestamp(
2087 compaction_id,
2088 summary_message_id,
2089 compacted_head_message_id,
2090 previous_active_message_id,
2091 model_name,
2092 timestamp,
2093 );
2094
2095 let model = if let Some(model) = state.operation_models.get(&op_id).cloned() {
2096 model
2097 } else {
2098 state.complete_operation(op_id);
2099 return vec![Effect::EmitEvent {
2100 session_id,
2101 event: SessionEvent::Error {
2102 message: format!("Missing model for compaction operation {op_id}"),
2103 },
2104 }];
2105 };
2106
2107 let trigger = state
2108 .current_operation
2109 .as_ref()
2110 .and_then(|op| match op.kind {
2111 OperationKind::Compact { trigger } => Some(trigger),
2112 _ => None,
2113 })
2114 .unwrap_or(CompactTrigger::Manual);
2115
2116 state.complete_operation(op_id);
2117
2118 let mut effects = vec![
2119 Effect::EmitEvent {
2120 session_id,
2121 event: SessionEvent::AssistantMessageAdded {
2122 message: summary_message,
2123 model,
2124 },
2125 },
2126 Effect::EmitEvent {
2127 session_id,
2128 event: SessionEvent::CompactResult {
2129 result: crate::app::domain::event::CompactResult::Success(summary),
2130 trigger,
2131 },
2132 },
2133 Effect::EmitEvent {
2134 session_id,
2135 event: SessionEvent::ConversationCompacted { record },
2136 },
2137 Effect::EmitEvent {
2138 session_id,
2139 event: SessionEvent::OperationCompleted { op_id },
2140 },
2141 ];
2142
2143 effects.extend(maybe_start_queued_work(state, session_id));
2144
2145 effects
2146}
2147
2148fn handle_compaction_failed(
2149 state: &mut AppState,
2150 session_id: crate::app::domain::types::SessionId,
2151 op_id: crate::app::domain::types::OpId,
2152 error: String,
2153) -> Vec<Effect> {
2154 let trigger = state
2155 .current_operation
2156 .as_ref()
2157 .and_then(|op| match op.kind {
2158 OperationKind::Compact { trigger } => Some(trigger),
2159 _ => None,
2160 })
2161 .unwrap_or(CompactTrigger::Manual);
2162
2163 state.complete_operation(op_id);
2164
2165 let mut effects = vec![
2166 Effect::EmitEvent {
2167 session_id,
2168 event: SessionEvent::CompactResult {
2169 result: crate::app::domain::event::CompactResult::Failed(error),
2170 trigger,
2171 },
2172 },
2173 Effect::EmitEvent {
2174 session_id,
2175 event: SessionEvent::OperationCompleted { op_id },
2176 },
2177 ];
2178
2179 effects.extend(maybe_start_queued_work(state, session_id));
2180
2181 effects
2182}
2183
2184fn emit_mcp_connect_effects(
2185 state: &AppState,
2186 session_id: crate::app::domain::types::SessionId,
2187) -> Vec<Effect> {
2188 let mut effects = Vec::new();
2189
2190 let Some(ref config) = state.session_config else {
2191 return effects;
2192 };
2193
2194 for backend_config in &config.tool_config.backends {
2195 let BackendConfig::Mcp {
2196 server_name,
2197 transport,
2198 tool_filter,
2199 } = backend_config;
2200
2201 let already_connected = state.mcp_servers.get(server_name).is_some_and(|s| {
2202 matches!(
2203 s,
2204 McpServerState::Connecting | McpServerState::Connected { .. }
2205 )
2206 });
2207
2208 if !already_connected {
2209 effects.push(Effect::ConnectMcpServer {
2210 session_id,
2211 config: McpServerConfig {
2212 server_name: server_name.clone(),
2213 transport: transport.clone(),
2214 tool_filter: tool_filter.clone(),
2215 },
2216 });
2217 }
2218 }
2219
2220 effects
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225 use super::*;
2226 use crate::api::provider::TokenUsage;
2227 use crate::app::domain::event::ContextWindowUsage;
2228 use crate::app::domain::state::{OperationState, PendingApproval};
2229 use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId, ToolCallId};
2230 use crate::config::model::builtin;
2231 use crate::primary_agents::resolve_effective_config;
2232 use crate::session::state::{
2233 ApprovalRules, ApprovalRulesOverrides, SessionConfig, SessionPolicyOverrides,
2234 ToolApprovalPolicy, ToolApprovalPolicyOverrides, ToolVisibility, UnapprovedBehavior,
2235 };
2236 use crate::tools::DISPATCH_AGENT_TOOL_NAME;
2237 use crate::tools::static_tools::READ_ONLY_TOOL_NAMES;
2238 use schemars::schema_for;
2239 use serde_json::json;
2240 use std::collections::HashSet;
2241 use steer_tools::{InputSchema, ToolCall, ToolError, ToolSchema};
2242
2243 fn test_state() -> AppState {
2244 AppState::new(SessionId::new())
2245 }
2246
2247 fn test_schema(name: &str) -> ToolSchema {
2248 ToolSchema {
2249 name: name.to_string(),
2250 display_name: name.to_string(),
2251 description: String::new(),
2252 input_schema: InputSchema::empty_object(),
2253 }
2254 }
2255
2256 fn base_session_config() -> SessionConfig {
2257 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2258 config.primary_agent_id = Some("normal".to_string());
2259 config.policy_overrides = SessionPolicyOverrides::empty();
2260 resolve_effective_config(&config)
2261 }
2262
2263 fn reduce(state: &mut AppState, action: Action) -> Vec<Effect> {
2264 super::reduce(state, action).expect("reduce failed")
2265 }
2266
2267 #[test]
2268 fn test_user_input_starts_operation() {
2269 let mut state = test_state();
2270 let session_id = state.session_id;
2271 let op_id = OpId::new();
2272 let message_id = MessageId::new();
2273 let model = builtin::claude_sonnet_4_5();
2274
2275 let effects = reduce(
2276 &mut state,
2277 Action::UserInput {
2278 session_id,
2279 content: vec![UserContent::Text {
2280 text: "Hello".to_string(),
2281 }],
2282 op_id,
2283 message_id,
2284 model,
2285 timestamp: 1_234_567_890,
2286 },
2287 );
2288
2289 assert_eq!(state.message_graph.messages.len(), 1);
2290 assert!(state.current_operation.is_some());
2291 assert!(
2292 effects
2293 .iter()
2294 .any(|e| matches!(e, Effect::CallModel { .. }))
2295 );
2296 }
2297
2298 #[test]
2299 fn test_switch_primary_agent_updates_visibility() {
2300 let mut state = test_state();
2301 let session_id = state.session_id;
2302 let config = base_session_config();
2303 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2304
2305 let effects = reduce(
2306 &mut state,
2307 Action::SwitchPrimaryAgent {
2308 session_id,
2309 agent_id: "plan".to_string(),
2310 },
2311 );
2312
2313 let updated = state.session_config.as_ref().expect("config");
2314 match &updated.tool_config.visibility {
2315 ToolVisibility::Whitelist(allowed) => {
2316 assert!(allowed.contains(DISPATCH_AGENT_TOOL_NAME));
2317 for name in READ_ONLY_TOOL_NAMES {
2318 assert!(allowed.contains(*name));
2319 }
2320 assert_eq!(allowed.len(), READ_ONLY_TOOL_NAMES.len() + 1);
2321 }
2322 other => panic!("Unexpected tool visibility: {other:?}"),
2323 }
2324 assert_eq!(state.primary_agent_id.as_deref(), Some("plan"));
2325 assert!(effects.iter().any(|e| matches!(
2326 e,
2327 Effect::EmitEvent {
2328 event: SessionEvent::SessionConfigUpdated { .. },
2329 ..
2330 }
2331 )));
2332 assert!(
2333 effects
2334 .iter()
2335 .any(|e| matches!(e, Effect::ReloadToolSchemas { .. }))
2336 );
2337 }
2338
2339 #[test]
2340 fn test_switch_primary_agent_yolo_auto_approves() {
2341 let mut state = test_state();
2342 let session_id = state.session_id;
2343 let config = base_session_config();
2344 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2345
2346 let _ = reduce(
2347 &mut state,
2348 Action::SwitchPrimaryAgent {
2349 session_id,
2350 agent_id: "yolo".to_string(),
2351 },
2352 );
2353
2354 let updated = state.session_config.as_ref().expect("config");
2355 assert_eq!(
2356 updated.tool_config.approval_policy.default_behavior,
2357 UnapprovedBehavior::Allow
2358 );
2359 }
2360
2361 #[test]
2362 fn test_switch_primary_agent_preserves_policy_overrides() {
2363 let mut state = test_state();
2364 let session_id = state.session_id;
2365
2366 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2367 config.primary_agent_id = Some("normal".to_string());
2368 config.policy_overrides = SessionPolicyOverrides {
2369 default_model: None,
2370 tool_visibility: None,
2371 approval_policy: ToolApprovalPolicyOverrides {
2372 default_behavior: Some(UnapprovedBehavior::Deny),
2373 preapproved: ApprovalRulesOverrides::empty(),
2374 },
2375 };
2376 let config = resolve_effective_config(&config);
2377 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2378
2379 let _ = reduce(
2380 &mut state,
2381 Action::SwitchPrimaryAgent {
2382 session_id,
2383 agent_id: "yolo".to_string(),
2384 },
2385 );
2386
2387 let updated = state.session_config.as_ref().expect("config");
2388 assert_eq!(
2389 updated.tool_config.approval_policy.default_behavior,
2390 UnapprovedBehavior::Deny
2391 );
2392 assert_eq!(
2393 updated.policy_overrides.approval_policy.default_behavior,
2394 Some(UnapprovedBehavior::Deny)
2395 );
2396 }
2397
2398 #[test]
2399 fn dispatch_agent_resume_is_auto_approved() {
2400 let mut state = test_state();
2401 let session_id = state.session_id;
2402 let config = base_session_config();
2403 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2404
2405 let tool_call = ToolCall {
2406 id: "tc_dispatch_resume".to_string(),
2407 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
2408 parameters: json!({
2409 "prompt": "resume work",
2410 "target": {
2411 "session": "resume",
2412 "session_id": SessionId::new().to_string()
2413 }
2414 }),
2415 };
2416
2417 let decision = get_tool_decision(&state, &tool_call);
2418 assert_eq!(decision, ToolDecision::Allow);
2419 assert_eq!(state.session_id, session_id);
2420 }
2421
2422 #[test]
2423 fn test_switch_primary_agent_restores_base_prompt() {
2424 let mut state = test_state();
2425 let session_id = state.session_id;
2426 let mut config = base_session_config();
2427 config.system_prompt = Some("base prompt".to_string());
2428 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2429
2430 let _ = reduce(
2431 &mut state,
2432 Action::SwitchPrimaryAgent {
2433 session_id,
2434 agent_id: "plan".to_string(),
2435 },
2436 );
2437
2438 let _ = reduce(
2439 &mut state,
2440 Action::SwitchPrimaryAgent {
2441 session_id,
2442 agent_id: "normal".to_string(),
2443 },
2444 );
2445
2446 let updated = state.session_config.as_ref().expect("config");
2447 assert_eq!(updated.system_prompt, Some("base prompt".to_string()));
2448 }
2449
2450 #[test]
2451 fn test_switch_primary_agent_blocked_during_operation() {
2452 let mut state = test_state();
2453 let session_id = state.session_id;
2454 let config = base_session_config();
2455 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
2456
2457 state.current_operation = Some(OperationState {
2458 op_id: OpId::new(),
2459 kind: OperationKind::AgentLoop,
2460 pending_tool_calls: HashSet::new(),
2461 });
2462
2463 let result = super::reduce(
2464 &mut state,
2465 Action::SwitchPrimaryAgent {
2466 session_id,
2467 agent_id: "plan".to_string(),
2468 },
2469 );
2470
2471 assert!(matches!(
2472 result,
2473 Err(ReduceError::InvalidAction {
2474 kind: InvalidActionKind::OperationInFlight,
2475 ..
2476 })
2477 ));
2478 assert!(state.primary_agent_id.as_deref() == Some("normal"));
2479 }
2480
2481 #[test]
2482 fn test_late_result_ignored_after_cancel() {
2483 let mut state = test_state();
2484 let session_id = state.session_id;
2485 let op_id = OpId::new();
2486 let tool_call_id = ToolCallId::from_string("tc_1");
2487
2488 state.current_operation = Some(OperationState {
2489 op_id,
2490 kind: OperationKind::AgentLoop,
2491 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
2492 });
2493
2494 let _ = reduce(
2495 &mut state,
2496 Action::Cancel {
2497 session_id,
2498 op_id: None,
2499 },
2500 );
2501
2502 state.current_operation = Some(OperationState {
2503 op_id,
2504 kind: OperationKind::AgentLoop,
2505 pending_tool_calls: HashSet::new(),
2506 });
2507 state
2508 .operation_models
2509 .insert(op_id, builtin::claude_sonnet_4_5());
2510 state
2511 .operation_models
2512 .insert(op_id, builtin::claude_sonnet_4_5());
2513 state
2514 .operation_models
2515 .insert(op_id, builtin::claude_sonnet_4_5());
2516
2517 let effects = reduce(
2518 &mut state,
2519 Action::ToolResult {
2520 session_id,
2521 tool_call_id,
2522 tool_name: "test".to_string(),
2523 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
2524 tool_name: "test".to_string(),
2525 payload: "done".to_string(),
2526 })),
2527 },
2528 );
2529
2530 assert!(effects.is_empty());
2531 }
2532
2533 #[test]
2534 fn test_pre_approved_tool_executes_immediately() {
2535 let mut state = test_state();
2536 let session_id = state.session_id;
2537 let op_id = OpId::new();
2538
2539 state.approved_tools.insert("test_tool".to_string());
2540 state.current_operation = Some(OperationState {
2541 op_id,
2542 kind: OperationKind::AgentLoop,
2543 pending_tool_calls: HashSet::new(),
2544 });
2545 state
2546 .operation_models
2547 .insert(op_id, builtin::claude_sonnet_4_5());
2548 state
2549 .operation_models
2550 .insert(op_id, builtin::claude_sonnet_4_5());
2551 state
2552 .operation_models
2553 .insert(op_id, builtin::claude_sonnet_4_5());
2554
2555 let tool_call = steer_tools::ToolCall {
2556 id: "tc_1".to_string(),
2557 name: "test_tool".to_string(),
2558 parameters: serde_json::json!({}),
2559 };
2560
2561 let effects = reduce(
2562 &mut state,
2563 Action::ToolApprovalRequested {
2564 session_id,
2565 request_id: RequestId::new(),
2566 tool_call,
2567 },
2568 );
2569
2570 assert!(
2571 effects
2572 .iter()
2573 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2574 );
2575 assert!(state.pending_approval.is_none());
2576 }
2577
2578 #[test]
2579 fn test_denied_tool_request_emits_failure_message() {
2580 let mut state = test_state();
2581 let session_id = state.session_id;
2582 let op_id = OpId::new();
2583
2584 state.current_operation = Some(OperationState {
2585 op_id,
2586 kind: OperationKind::AgentLoop,
2587 pending_tool_calls: HashSet::new(),
2588 });
2589
2590 state
2591 .operation_models
2592 .insert(op_id, builtin::claude_sonnet_4_5());
2593
2594 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
2595 config.tool_config.approval_policy = ToolApprovalPolicy {
2596 default_behavior: UnapprovedBehavior::Deny,
2597 preapproved: ApprovalRules::default(),
2598 };
2599 state.session_config = Some(config);
2600
2601 let tool_call = steer_tools::ToolCall {
2602 id: "tc_1".to_string(),
2603 name: "test_tool".to_string(),
2604 parameters: serde_json::json!({}),
2605 };
2606
2607 let effects = reduce(
2608 &mut state,
2609 Action::ToolApprovalRequested {
2610 session_id,
2611 request_id: RequestId::new(),
2612 tool_call,
2613 },
2614 );
2615
2616 assert!(effects.iter().any(|e| matches!(
2617 e,
2618 Effect::EmitEvent {
2619 event: SessionEvent::ToolCallFailed { .. },
2620 ..
2621 }
2622 )));
2623 assert!(effects.iter().any(|e| matches!(
2624 e,
2625 Effect::EmitEvent {
2626 event: SessionEvent::ToolMessageAdded { .. },
2627 ..
2628 }
2629 )));
2630 assert!(
2631 !effects
2632 .iter()
2633 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2634 );
2635 assert!(
2636 !effects
2637 .iter()
2638 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2639 );
2640 assert!(state.pending_approval.is_none());
2641 assert!(state.approval_queue.is_empty());
2642 assert_eq!(state.message_graph.messages.len(), 1);
2643
2644 match &state.message_graph.messages[0].data {
2645 MessageData::Tool { result, .. } => match result {
2646 ToolResult::Error(error) => {
2647 assert!(
2648 matches!(error, ToolError::DeniedByPolicy(name) if name == "test_tool")
2649 );
2650 }
2651 _ => panic!("expected denied tool error"),
2652 },
2653 _ => panic!("expected tool message"),
2654 }
2655 }
2656
2657 #[test]
2658 fn test_user_denied_tool_request_emits_failure_message() {
2659 let mut state = test_state();
2660 let session_id = state.session_id;
2661 let op_id = OpId::new();
2662
2663 state.current_operation = Some(OperationState {
2664 op_id,
2665 kind: OperationKind::AgentLoop,
2666 pending_tool_calls: HashSet::new(),
2667 });
2668 state
2669 .operation_models
2670 .insert(op_id, builtin::claude_sonnet_4_5());
2671
2672 let tool_call = steer_tools::ToolCall {
2673 id: "tc_1".to_string(),
2674 name: "test_tool".to_string(),
2675 parameters: serde_json::json!({}),
2676 };
2677 let request_id = RequestId::new();
2678 state.pending_approval = Some(PendingApproval {
2679 request_id,
2680 tool_call: tool_call.clone(),
2681 });
2682
2683 let effects = reduce(
2684 &mut state,
2685 Action::ToolApprovalDecided {
2686 session_id,
2687 request_id,
2688 decision: ApprovalDecision::Denied,
2689 remember: None,
2690 },
2691 );
2692
2693 assert!(effects.iter().any(|e| matches!(
2694 e,
2695 Effect::EmitEvent {
2696 event: SessionEvent::ToolCallFailed { .. },
2697 ..
2698 }
2699 )));
2700 assert!(effects.iter().any(|e| matches!(
2701 e,
2702 Effect::EmitEvent {
2703 event: SessionEvent::ToolMessageAdded { .. },
2704 ..
2705 }
2706 )));
2707 assert!(
2708 !effects
2709 .iter()
2710 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2711 );
2712 assert!(state.pending_approval.is_none());
2713 assert!(state.approval_queue.is_empty());
2714 assert_eq!(state.message_graph.messages.len(), 1);
2715
2716 match &state.message_graph.messages[0].data {
2717 MessageData::Tool { result, .. } => match result {
2718 ToolResult::Error(error) => {
2719 assert!(matches!(error, ToolError::DeniedByUser(name) if name == "test_tool"));
2720 }
2721 _ => panic!("expected denied tool error"),
2722 },
2723 _ => panic!("expected tool message"),
2724 }
2725 }
2726
2727 #[test]
2728 fn test_cancel_pops_queued_item_without_auto_start() {
2729 let mut state = test_state();
2730 let session_id = state.session_id;
2731 let op_id = OpId::new();
2732
2733 state.current_operation = Some(OperationState {
2734 op_id,
2735 kind: OperationKind::AgentLoop,
2736 pending_tool_calls: HashSet::new(),
2737 });
2738 state
2739 .operation_models
2740 .insert(op_id, builtin::claude_sonnet_4_5());
2741
2742 let queued_op = OpId::new();
2743 let queued_message_id = MessageId::from_string("queued_msg");
2744 let _ = reduce(
2745 &mut state,
2746 Action::UserInput {
2747 session_id,
2748 content: vec![UserContent::Text {
2749 text: "Queued message".to_string(),
2750 }],
2751 op_id: queued_op,
2752 message_id: queued_message_id.clone(),
2753 model: builtin::claude_sonnet_4_5(),
2754 timestamp: 1,
2755 },
2756 );
2757
2758 let effects = reduce(
2759 &mut state,
2760 Action::Cancel {
2761 session_id,
2762 op_id: None,
2763 },
2764 );
2765
2766 assert!(state.current_operation.is_none());
2767 assert!(state.queued_work.is_empty());
2768
2769 let cancellation_info = effects.iter().find_map(|effect| match effect {
2770 Effect::EmitEvent {
2771 event: SessionEvent::OperationCancelled { info, .. },
2772 ..
2773 } => Some(info),
2774 _ => None,
2775 });
2776 let info = cancellation_info.expect("expected OperationCancelled event");
2777 let popped = info
2778 .popped_queued_item
2779 .as_ref()
2780 .expect("expected popped queued item");
2781 assert_eq!(popped.content, "Queued message");
2782 assert_eq!(popped.op_id, queued_op);
2783 assert_eq!(popped.message_id, queued_message_id);
2784
2785 assert!(
2786 !effects.iter().any(|effect| matches!(
2787 effect,
2788 Effect::EmitEvent {
2789 event: SessionEvent::OperationStarted { .. },
2790 ..
2791 }
2792 )),
2793 "queued work should not auto-start on cancel"
2794 );
2795 }
2796
2797 #[test]
2798 fn test_cancel_injects_tool_results_for_pending_calls() {
2799 let mut state = test_state();
2800 let session_id = state.session_id;
2801 let op_id = OpId::new();
2802
2803 let tool_call = ToolCall {
2804 id: "tc_1".to_string(),
2805 name: "test_tool".to_string(),
2806 parameters: serde_json::json!({}),
2807 };
2808
2809 state.message_graph.add_message(Message {
2810 data: MessageData::Assistant {
2811 content: vec![AssistantContent::ToolCall {
2812 tool_call: tool_call.clone(),
2813 thought_signature: None,
2814 }],
2815 },
2816 timestamp: 0,
2817 id: "msg_1".to_string(),
2818 parent_message_id: None,
2819 });
2820
2821 state.current_operation = Some(OperationState {
2822 op_id,
2823 kind: OperationKind::AgentLoop,
2824 pending_tool_calls: [ToolCallId::from_string("tc_1")].into_iter().collect(),
2825 });
2826 state
2827 .operation_models
2828 .insert(op_id, builtin::claude_sonnet_4_5());
2829
2830 let effects = reduce(
2831 &mut state,
2832 Action::Cancel {
2833 session_id,
2834 op_id: None,
2835 },
2836 );
2837
2838 assert!(effects.iter().any(|e| matches!(
2839 e,
2840 Effect::EmitEvent {
2841 event: SessionEvent::ToolMessageAdded { .. },
2842 ..
2843 }
2844 )));
2845
2846 let tool_message = state
2847 .message_graph
2848 .messages
2849 .iter()
2850 .find(|message| matches!(message.data, MessageData::Tool { .. }))
2851 .expect("tool result should be injected on cancel");
2852
2853 match &tool_message.data {
2854 MessageData::Tool { result, .. } => match result {
2855 ToolResult::Error(error) => {
2856 assert!(matches!(error, ToolError::Cancelled(name) if name == "test_tool"));
2857 }
2858 _ => panic!("expected cancelled tool error"),
2859 },
2860 _ => panic!("expected tool message"),
2861 }
2862 }
2863
2864 #[test]
2865 fn test_malformed_tool_call_auto_denies() {
2866 let mut state = test_state();
2867 let session_id = state.session_id;
2868 let op_id = OpId::new();
2869
2870 state.current_operation = Some(OperationState {
2871 op_id,
2872 kind: OperationKind::AgentLoop,
2873 pending_tool_calls: HashSet::new(),
2874 });
2875
2876 state
2877 .operation_models
2878 .insert(op_id, builtin::claude_sonnet_4_5());
2879
2880 let mut properties = serde_json::Map::new();
2881 properties.insert("command".to_string(), json!({ "type": "string" }));
2882
2883 state.tools.push(ToolSchema {
2884 name: "test_tool".to_string(),
2885 display_name: "test_tool".to_string(),
2886 description: String::new(),
2887 input_schema: InputSchema::object(properties, vec!["command".to_string()]),
2888 });
2889
2890 let tool_call = ToolCall {
2891 id: "tc_1".to_string(),
2892 name: "test_tool".to_string(),
2893 parameters: json!({}),
2894 };
2895
2896 let effects = reduce(
2897 &mut state,
2898 Action::ToolApprovalRequested {
2899 session_id,
2900 request_id: RequestId::new(),
2901 tool_call,
2902 },
2903 );
2904
2905 assert!(effects.iter().any(|e| matches!(
2906 e,
2907 Effect::EmitEvent {
2908 event: SessionEvent::ToolCallFailed { .. },
2909 ..
2910 }
2911 )));
2912 assert!(effects.iter().any(|e| matches!(
2913 e,
2914 Effect::EmitEvent {
2915 event: SessionEvent::ToolMessageAdded { .. },
2916 ..
2917 }
2918 )));
2919 assert!(
2920 !effects
2921 .iter()
2922 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
2923 );
2924 assert!(
2925 !effects
2926 .iter()
2927 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
2928 );
2929 assert!(state.pending_approval.is_none());
2930 assert!(state.approval_queue.is_empty());
2931 assert_eq!(state.message_graph.messages.len(), 1);
2932
2933 match &state.message_graph.messages[0].data {
2934 MessageData::Tool { result, .. } => match result {
2935 ToolResult::Error(error) => {
2936 assert!(matches!(error, ToolError::InvalidParams { .. }));
2937 }
2938 _ => panic!("expected invalid params tool error"),
2939 },
2940 _ => panic!("expected tool message"),
2941 }
2942 }
2943
2944 #[test]
2945 fn test_approval_queuing() {
2946 let mut state = test_state();
2947 let session_id = state.session_id;
2948 let op_id = OpId::new();
2949
2950 state.current_operation = Some(OperationState {
2951 op_id,
2952 kind: OperationKind::AgentLoop,
2953 pending_tool_calls: HashSet::new(),
2954 });
2955
2956 let tool_call_1 = steer_tools::ToolCall {
2957 id: "tc_1".to_string(),
2958 name: "tool_1".to_string(),
2959 parameters: serde_json::json!({}),
2960 };
2961 let tool_call_2 = steer_tools::ToolCall {
2962 id: "tc_2".to_string(),
2963 name: "tool_2".to_string(),
2964 parameters: serde_json::json!({}),
2965 };
2966
2967 let _ = reduce(
2968 &mut state,
2969 Action::ToolApprovalRequested {
2970 session_id,
2971 request_id: RequestId::new(),
2972 tool_call: tool_call_1,
2973 },
2974 );
2975
2976 assert!(state.pending_approval.is_some());
2977
2978 let _ = reduce(
2979 &mut state,
2980 Action::ToolApprovalRequested {
2981 session_id,
2982 request_id: RequestId::new(),
2983 tool_call: tool_call_2,
2984 },
2985 );
2986
2987 assert_eq!(state.approval_queue.len(), 1);
2988 }
2989
2990 #[test]
2991 fn test_dispatch_agent_missing_target_auto_denies() {
2992 let mut state = test_state();
2993 let session_id = state.session_id;
2994 let op_id = OpId::new();
2995
2996 state.current_operation = Some(OperationState {
2997 op_id,
2998 kind: OperationKind::AgentLoop,
2999 pending_tool_calls: HashSet::new(),
3000 });
3001
3002 state
3003 .operation_models
3004 .insert(op_id, builtin::claude_sonnet_4_5());
3005
3006 let input_schema: InputSchema =
3007 schema_for!(steer_tools::tools::dispatch_agent::DispatchAgentParams).into();
3008 state.tools.push(ToolSchema {
3009 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3010 display_name: "Dispatch Agent".to_string(),
3011 description: String::new(),
3012 input_schema,
3013 });
3014
3015 let tool_call = ToolCall {
3016 id: "tc_dispatch".to_string(),
3017 name: DISPATCH_AGENT_TOOL_NAME.to_string(),
3018 parameters: json!({ "prompt": "hello world" }),
3019 };
3020
3021 let effects = reduce(
3022 &mut state,
3023 Action::ToolApprovalRequested {
3024 session_id,
3025 request_id: RequestId::new(),
3026 tool_call,
3027 },
3028 );
3029
3030 assert!(effects.iter().any(|e| matches!(
3031 e,
3032 Effect::EmitEvent {
3033 event: SessionEvent::ToolCallFailed { .. },
3034 ..
3035 }
3036 )));
3037 assert!(effects.iter().any(|e| matches!(
3038 e,
3039 Effect::EmitEvent {
3040 event: SessionEvent::ToolMessageAdded { .. },
3041 ..
3042 }
3043 )));
3044 assert!(
3045 !effects
3046 .iter()
3047 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3048 );
3049 assert!(state.pending_approval.is_none());
3050 assert!(state.approval_queue.is_empty());
3051
3052 match &state.message_graph.messages[0].data {
3053 MessageData::Tool { result, .. } => match result {
3054 ToolResult::Error(error) => {
3055 assert!(matches!(error, ToolError::InvalidParams { .. }));
3056 }
3057 _ => panic!("expected invalid params tool error"),
3058 },
3059 _ => panic!("expected tool message"),
3060 }
3061 }
3062
3063 #[test]
3064 fn test_model_response_with_tool_calls_requests_approval() {
3065 let mut state = test_state();
3066 let session_id = state.session_id;
3067 let op_id = OpId::new();
3068 let message_id = MessageId::new();
3069
3070 state.current_operation = Some(OperationState {
3071 op_id,
3072 kind: OperationKind::AgentLoop,
3073 pending_tool_calls: HashSet::new(),
3074 });
3075 state
3076 .operation_models
3077 .insert(op_id, builtin::claude_sonnet_4_5());
3078
3079 let tool_call = steer_tools::ToolCall {
3080 id: "tc_1".to_string(),
3081 name: "bash".to_string(),
3082 parameters: serde_json::json!({"command": "ls"}),
3083 };
3084
3085 let content = vec![
3086 AssistantContent::Text {
3087 text: "Let me list the files.".to_string(),
3088 },
3089 AssistantContent::ToolCall {
3090 tool_call: tool_call.clone(),
3091 thought_signature: None,
3092 },
3093 ];
3094
3095 let effects = reduce(
3096 &mut state,
3097 Action::ModelResponseComplete {
3098 session_id,
3099 op_id,
3100 message_id,
3101 content,
3102 usage: None,
3103 context_window_tokens: None,
3104 timestamp: 12345,
3105 },
3106 );
3107
3108 assert!(state.pending_approval.is_some());
3109 assert!(
3110 effects
3111 .iter()
3112 .any(|e| matches!(e, Effect::RequestUserApproval { .. }))
3113 );
3114 assert!(state.current_operation.is_some());
3115 }
3116
3117 #[test]
3118 fn test_model_response_no_tools_completes_operation() {
3119 let mut state = test_state();
3120 let session_id = state.session_id;
3121 let op_id = OpId::new();
3122 let message_id = MessageId::new();
3123
3124 state.current_operation = Some(OperationState {
3125 op_id,
3126 kind: OperationKind::AgentLoop,
3127 pending_tool_calls: HashSet::new(),
3128 });
3129 state
3130 .operation_models
3131 .insert(op_id, builtin::claude_sonnet_4_5());
3132
3133 let content = vec![AssistantContent::Text {
3134 text: "Hello! How can I help?".to_string(),
3135 }];
3136
3137 let effects = reduce(
3138 &mut state,
3139 Action::ModelResponseComplete {
3140 session_id,
3141 op_id,
3142 message_id,
3143 content,
3144 usage: None,
3145 context_window_tokens: None,
3146 timestamp: 12345,
3147 },
3148 );
3149
3150 assert!(state.current_operation.is_none());
3151 assert!(effects.iter().any(|e| matches!(
3152 e,
3153 Effect::EmitEvent {
3154 event: SessionEvent::OperationCompleted { .. },
3155 ..
3156 }
3157 )));
3158 assert!(!effects.iter().any(|e| matches!(
3159 e,
3160 Effect::EmitEvent {
3161 event: SessionEvent::LlmUsageUpdated { .. },
3162 ..
3163 }
3164 )));
3165 }
3166
3167 #[test]
3168 fn test_model_response_with_usage_emits_usage_event_and_updates_state() {
3169 let mut state = test_state();
3170 let session_id = state.session_id;
3171 let op_id = OpId::new();
3172 let message_id = MessageId::new();
3173 let model = builtin::claude_sonnet_4_5();
3174 let usage = TokenUsage::new(10, 20, 30);
3175
3176 state.current_operation = Some(OperationState {
3177 op_id,
3178 kind: OperationKind::AgentLoop,
3179 pending_tool_calls: HashSet::new(),
3180 });
3181 state.operation_models.insert(op_id, model.clone());
3182
3183 let effects = reduce(
3184 &mut state,
3185 Action::ModelResponseComplete {
3186 session_id,
3187 op_id,
3188 message_id,
3189 content: vec![AssistantContent::Text {
3190 text: "Done".to_string(),
3191 }],
3192 usage: Some(usage),
3193 context_window_tokens: None,
3194 timestamp: 12345,
3195 },
3196 );
3197
3198 assert!(effects.iter().any(|e| matches!(
3199 e,
3200 Effect::EmitEvent {
3201 event: SessionEvent::AssistantMessageAdded { .. },
3202 ..
3203 }
3204 )));
3205 assert!(effects.iter().any(|e| matches!(
3206 e,
3207 Effect::EmitEvent {
3208 event: SessionEvent::LlmUsageUpdated {
3209 op_id: usage_op_id,
3210 model: usage_model,
3211 usage: usage_payload,
3212 context_window,
3213 },
3214 ..
3215 } if *usage_op_id == op_id && usage_model == &model && *usage_payload == usage && context_window.is_none()
3216 )));
3217
3218 let snapshot = state
3219 .llm_usage_by_op
3220 .get(&op_id)
3221 .expect("usage snapshot should be recorded");
3222 assert_eq!(snapshot.model, model);
3223 assert_eq!(snapshot.usage, usage);
3224 assert!(snapshot.context_window.is_none());
3225 assert_eq!(state.llm_usage_totals, usage);
3226 }
3227
3228 #[test]
3229 fn test_model_response_with_usage_and_context_window_emits_utilization() {
3230 let mut state = test_state();
3231 let session_id = state.session_id;
3232 let op_id = OpId::new();
3233 let message_id = MessageId::new();
3234 let model = builtin::claude_sonnet_4_5();
3235 let usage = TokenUsage::new(10, 20, 30);
3236 let context_window_tokens = Some(100u32);
3237
3238 state.current_operation = Some(OperationState {
3239 op_id,
3240 kind: OperationKind::AgentLoop,
3241 pending_tool_calls: HashSet::new(),
3242 });
3243 state.operation_models.insert(op_id, model.clone());
3244
3245 let effects = reduce(
3246 &mut state,
3247 Action::ModelResponseComplete {
3248 session_id,
3249 op_id,
3250 message_id,
3251 content: vec![AssistantContent::Text {
3252 text: "Done".to_string(),
3253 }],
3254 usage: Some(usage),
3255 context_window_tokens,
3256 timestamp: 12345,
3257 },
3258 );
3259
3260 assert!(effects.iter().any(|e| matches!(
3261 e,
3262 Effect::EmitEvent {
3263 event: SessionEvent::LlmUsageUpdated {
3264 op_id: usage_op_id,
3265 model: usage_model,
3266 usage: usage_payload,
3267 context_window,
3268 },
3269 ..
3270 } if *usage_op_id == op_id
3271 && usage_model == &model
3272 && *usage_payload == usage
3273 && matches!(context_window, Some(ContextWindowUsage {
3274 max_context_tokens: Some(100),
3275 remaining_tokens: Some(70),
3276 utilization_ratio: Some(ratio),
3277 estimated: false,
3278 }) if (*ratio - 0.3).abs() < 1e-9)
3279 )));
3280
3281 let snapshot = state
3282 .llm_usage_by_op
3283 .get(&op_id)
3284 .expect("usage snapshot should be recorded");
3285 assert_eq!(snapshot.model, model);
3286 assert_eq!(snapshot.usage, usage);
3287 assert_eq!(
3288 snapshot.context_window,
3289 Some(ContextWindowUsage {
3290 max_context_tokens: Some(100),
3291 remaining_tokens: Some(70),
3292 utilization_ratio: Some(0.3),
3293 estimated: false,
3294 })
3295 );
3296 assert_eq!(state.llm_usage_totals, usage);
3297 }
3298
3299 #[test]
3300 fn test_model_response_with_zero_context_window_marks_estimated() {
3301 let mut state = test_state();
3302 let session_id = state.session_id;
3303 let op_id = OpId::new();
3304 let message_id = MessageId::new();
3305 let model = builtin::claude_sonnet_4_5();
3306 let usage = TokenUsage::new(10, 20, 30);
3307
3308 state.current_operation = Some(OperationState {
3309 op_id,
3310 kind: OperationKind::AgentLoop,
3311 pending_tool_calls: HashSet::new(),
3312 });
3313 state.operation_models.insert(op_id, model);
3314
3315 let effects = reduce(
3316 &mut state,
3317 Action::ModelResponseComplete {
3318 session_id,
3319 op_id,
3320 message_id,
3321 content: vec![AssistantContent::Text {
3322 text: "Done".to_string(),
3323 }],
3324 usage: Some(usage),
3325 context_window_tokens: Some(0),
3326 timestamp: 12345,
3327 },
3328 );
3329
3330 assert!(effects.iter().any(|e| matches!(
3331 e,
3332 Effect::EmitEvent {
3333 event: SessionEvent::LlmUsageUpdated {
3334 context_window,
3335 ..
3336 },
3337 ..
3338 } if matches!(context_window, Some(ContextWindowUsage {
3339 max_context_tokens: Some(0),
3340 remaining_tokens: Some(0),
3341 utilization_ratio: None,
3342 estimated: true,
3343 }))
3344 )));
3345 }
3346
3347 #[test]
3348 fn test_apply_usage_event_updates_replay_state_and_totals() {
3349 let mut state = test_state();
3350 let op_a = OpId::new();
3351 let op_b = OpId::new();
3352 let model = builtin::claude_sonnet_4_5();
3353
3354 apply_event_to_state(
3355 &mut state,
3356 &SessionEvent::LlmUsageUpdated {
3357 op_id: op_a,
3358 model: model.clone(),
3359 usage: TokenUsage::new(3, 5, 8),
3360 context_window: None,
3361 },
3362 );
3363
3364 assert_eq!(state.llm_usage_totals, TokenUsage::new(3, 5, 8));
3365
3366 apply_event_to_state(
3367 &mut state,
3368 &SessionEvent::LlmUsageUpdated {
3369 op_id: op_a,
3370 model: model.clone(),
3371 usage: TokenUsage::new(7, 11, 18),
3372 context_window: None,
3373 },
3374 );
3375
3376 assert_eq!(state.llm_usage_totals, TokenUsage::new(7, 11, 18));
3377
3378 apply_event_to_state(
3379 &mut state,
3380 &SessionEvent::LlmUsageUpdated {
3381 op_id: op_b,
3382 model,
3383 usage: TokenUsage::new(2, 4, 6),
3384 context_window: None,
3385 },
3386 );
3387
3388 assert_eq!(state.llm_usage_totals, TokenUsage::new(9, 15, 24));
3389 }
3390
3391 #[test]
3392 fn test_out_of_order_completion_preserves_newer_operation() {
3393 let mut state = test_state();
3394 let session_id = state.session_id;
3395 let model = builtin::claude_sonnet_4_5();
3396
3397 let op_a = OpId::new();
3398 let op_b = OpId::new();
3399
3400 let _ = reduce(
3401 &mut state,
3402 Action::UserInput {
3403 session_id,
3404 content: vec![UserContent::Text {
3405 text: "first".to_string(),
3406 }],
3407 op_id: op_a,
3408 message_id: MessageId::new(),
3409 model: model.clone(),
3410 timestamp: 1,
3411 },
3412 );
3413
3414 let _ = reduce(
3415 &mut state,
3416 Action::UserInput {
3417 session_id,
3418 content: vec![UserContent::Text {
3419 text: "second".to_string(),
3420 }],
3421 op_id: op_b,
3422 message_id: MessageId::new(),
3423 model: model.clone(),
3424 timestamp: 2,
3425 },
3426 );
3427
3428 let _ = reduce(
3429 &mut state,
3430 Action::ModelResponseComplete {
3431 session_id,
3432 op_id: op_a,
3433 message_id: MessageId::new(),
3434 content: vec![AssistantContent::Text {
3435 text: "done A".to_string(),
3436 }],
3437 usage: None,
3438 context_window_tokens: None,
3439 timestamp: 3,
3440 },
3441 );
3442
3443 assert!(
3444 state
3445 .current_operation
3446 .as_ref()
3447 .is_some_and(|op| op.op_id == op_b)
3448 );
3449 assert!(state.operation_models.contains_key(&op_b));
3450 assert!(!state.operation_models.contains_key(&op_a));
3451
3452 let effects = reduce(
3453 &mut state,
3454 Action::ModelResponseComplete {
3455 session_id,
3456 op_id: op_b,
3457 message_id: MessageId::new(),
3458 content: vec![AssistantContent::Text {
3459 text: "done B".to_string(),
3460 }],
3461 usage: None,
3462 context_window_tokens: None,
3463 timestamp: 4,
3464 },
3465 );
3466
3467 assert!(effects.iter().any(|e| matches!(
3468 e,
3469 Effect::EmitEvent {
3470 event: SessionEvent::OperationCompleted { op_id },
3471 ..
3472 } if *op_id == op_b
3473 )));
3474 assert!(!effects.iter().any(|e| matches!(
3475 e,
3476 Effect::EmitEvent {
3477 event: SessionEvent::Error { message },
3478 ..
3479 } if message.contains("Missing model for operation")
3480 )));
3481 }
3482
3483 #[test]
3484 fn test_tool_approval_does_not_call_model_before_result() {
3485 let mut state = test_state();
3486 let session_id = state.session_id;
3487 let op_id = OpId::new();
3488
3489 state.current_operation = Some(OperationState {
3490 op_id,
3491 kind: OperationKind::AgentLoop,
3492 pending_tool_calls: HashSet::new(),
3493 });
3494 state
3495 .operation_models
3496 .insert(op_id, builtin::claude_sonnet_4_5());
3497
3498 let tool_call = steer_tools::ToolCall {
3499 id: "tc_1".to_string(),
3500 name: "bash".to_string(),
3501 parameters: serde_json::json!({"command": "ls"}),
3502 };
3503 let request_id = RequestId::new();
3504 state.pending_approval = Some(PendingApproval {
3505 request_id,
3506 tool_call: tool_call.clone(),
3507 });
3508
3509 let effects = reduce(
3510 &mut state,
3511 Action::ToolApprovalDecided {
3512 session_id,
3513 request_id,
3514 decision: ApprovalDecision::Approved,
3515 remember: None,
3516 },
3517 );
3518
3519 assert!(
3520 effects
3521 .iter()
3522 .any(|e| matches!(e, Effect::ExecuteTool { .. }))
3523 );
3524 assert!(
3525 !effects
3526 .iter()
3527 .any(|e| matches!(e, Effect::CallModel { .. }))
3528 );
3529 assert!(state.current_operation.as_ref().is_some_and(|op| {
3530 op.pending_tool_calls
3531 .contains(&ToolCallId::from_string("tc_1"))
3532 }));
3533 }
3534
3535 #[test]
3536 fn test_mcp_tool_visibility_and_disconnect_removal() {
3537 let mut state = test_state();
3538 let session_id = state.session_id;
3539
3540 let mut allowed = HashSet::new();
3541 allowed.insert("mcp__alpha__allowed".to_string());
3542
3543 let mut config = SessionConfig::read_only(builtin::claude_sonnet_4_5());
3544 config.tool_config.visibility = ToolVisibility::Whitelist(allowed);
3545 state.session_config = Some(config);
3546
3547 state.tools.push(test_schema("bash"));
3548
3549 let _ = reduce(
3550 &mut state,
3551 Action::McpServerStateChanged {
3552 session_id,
3553 server_name: "alpha".to_string(),
3554 state: McpServerState::Connected {
3555 tools: vec![
3556 test_schema("mcp__alpha__allowed"),
3557 test_schema("mcp__alpha__blocked"),
3558 ],
3559 },
3560 },
3561 );
3562
3563 assert!(state.tools.iter().any(|t| t.name == "mcp__alpha__allowed"));
3564 assert!(!state.tools.iter().any(|t| t.name == "mcp__alpha__blocked"));
3565
3566 let _ = reduce(
3567 &mut state,
3568 Action::McpServerStateChanged {
3569 session_id,
3570 server_name: "alpha".to_string(),
3571 state: McpServerState::Disconnected { error: None },
3572 },
3573 );
3574
3575 assert!(
3576 !state
3577 .tools
3578 .iter()
3579 .any(|t| t.name.starts_with("mcp__alpha__"))
3580 );
3581 assert!(state.tools.iter().any(|t| t.name == "bash"));
3582 }
3583
3584 #[test]
3585 fn test_tool_result_continues_agent_loop() {
3586 let mut state = test_state();
3587 let session_id = state.session_id;
3588 let op_id = OpId::new();
3589 let tool_call_id = ToolCallId::from_string("tc_1");
3590
3591 state.current_operation = Some(OperationState {
3592 op_id,
3593 kind: OperationKind::AgentLoop,
3594 pending_tool_calls: [tool_call_id.clone()].into_iter().collect(),
3595 });
3596 state
3597 .operation_models
3598 .insert(op_id, builtin::claude_sonnet_4_5());
3599
3600 let effects = reduce(
3601 &mut state,
3602 Action::ToolResult {
3603 session_id,
3604 tool_call_id,
3605 tool_name: "bash".to_string(),
3606 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3607 tool_name: "bash".to_string(),
3608 payload: "file1.txt\nfile2.txt".to_string(),
3609 })),
3610 },
3611 );
3612
3613 assert!(
3614 effects
3615 .iter()
3616 .any(|e| matches!(e, Effect::CallModel { .. }))
3617 );
3618 }
3619
3620 #[test]
3621 fn test_tool_result_waits_for_pending_tools() {
3622 let mut state = test_state();
3623 let session_id = state.session_id;
3624 let op_id = OpId::new();
3625 let tool_call_id_1 = ToolCallId::from_string("tc_1");
3626 let tool_call_id_2 = ToolCallId::from_string("tc_2");
3627
3628 state.current_operation = Some(OperationState {
3629 op_id,
3630 kind: OperationKind::AgentLoop,
3631 pending_tool_calls: [tool_call_id_1.clone(), tool_call_id_2.clone()]
3632 .into_iter()
3633 .collect(),
3634 });
3635 state
3636 .operation_models
3637 .insert(op_id, builtin::claude_sonnet_4_5());
3638
3639 let effects = reduce(
3640 &mut state,
3641 Action::ToolResult {
3642 session_id,
3643 tool_call_id: tool_call_id_1,
3644 tool_name: "bash".to_string(),
3645 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3646 tool_name: "bash".to_string(),
3647 payload: "done".to_string(),
3648 })),
3649 },
3650 );
3651
3652 assert!(
3653 !effects
3654 .iter()
3655 .any(|e| matches!(e, Effect::CallModel { .. }))
3656 );
3657
3658 let effects = reduce(
3659 &mut state,
3660 Action::ToolResult {
3661 session_id,
3662 tool_call_id: tool_call_id_2,
3663 tool_name: "bash".to_string(),
3664 result: Ok(ToolResult::External(steer_tools::result::ExternalResult {
3665 tool_name: "bash".to_string(),
3666 payload: "done".to_string(),
3667 })),
3668 },
3669 );
3670
3671 assert!(
3672 effects
3673 .iter()
3674 .any(|e| matches!(e, Effect::CallModel { .. }))
3675 );
3676 }
3677
3678 fn setup_auto_compact_state(
3681 enabled: bool,
3682 threshold_percent: u32,
3683 message_count: usize,
3684 ) -> AppState {
3685 use crate::session::state::AutoCompactionConfig;
3686
3687 let mut state = test_state();
3688
3689 let mut config = base_session_config();
3691 config.auto_compaction = AutoCompactionConfig {
3692 enabled,
3693 threshold_percent,
3694 };
3695 apply_session_config_state(&mut state, &config, Some("normal".to_string()), true);
3696
3697 let mut prev_id: Option<String> = None;
3700 for i in 0..message_count {
3701 let id = format!("msg_{}", i);
3702 state.message_graph.add_message(Message {
3703 data: MessageData::User {
3704 content: vec![UserContent::Text {
3705 text: format!("message {}", i),
3706 }],
3707 },
3708 timestamp: i as u64,
3709 id: id.clone(),
3710 parent_message_id: prev_id.clone(),
3711 });
3712 prev_id = Some(id);
3713 }
3714
3715 state
3716 }
3717
3718 #[test]
3719 fn test_maybe_auto_compact_triggers_when_all_guards_pass() {
3720 let mut state = setup_auto_compact_state(true, 90, 4);
3721 let session_id = state.session_id;
3722 let model = builtin::claude_sonnet_4_5();
3723
3724 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3725 let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3726
3727 assert!(!effects.is_empty(), "expected non-empty effects");
3728 assert!(
3729 effects
3730 .iter()
3731 .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3732 "expected Effect::RequestCompaction"
3733 );
3734 assert!(
3735 effects.iter().any(|e| matches!(
3736 e,
3737 Effect::EmitEvent {
3738 event: SessionEvent::OperationStarted { .. },
3739 ..
3740 }
3741 )),
3742 "expected OperationStarted event"
3743 );
3744 }
3745
3746 #[test]
3747 fn test_maybe_auto_compact_disabled_config() {
3748 let mut state = setup_auto_compact_state(false, 90, 4);
3749 let session_id = state.session_id;
3750 let model = builtin::claude_sonnet_4_5();
3751
3752 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3753 let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3754
3755 assert!(
3756 effects.is_empty(),
3757 "expected empty effects when auto-compaction is disabled"
3758 );
3759 }
3760
3761 #[test]
3762 fn test_maybe_auto_compact_below_threshold() {
3763 let mut state = setup_auto_compact_state(true, 90, 4);
3764 let session_id = state.session_id;
3765 let model = builtin::claude_sonnet_4_5();
3766
3767 let usage = Some(TokenUsage::new(40_000, 10_000, 50_000));
3769 let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3770
3771 assert!(
3772 effects.is_empty(),
3773 "expected empty effects when utilization is below threshold"
3774 );
3775 }
3776
3777 #[test]
3778 fn test_maybe_auto_compact_insufficient_messages() {
3779 let mut state = setup_auto_compact_state(true, 90, 2);
3781 let session_id = state.session_id;
3782 let model = builtin::claude_sonnet_4_5();
3783
3784 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3785 let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3786
3787 assert!(
3788 effects.is_empty(),
3789 "expected empty effects with insufficient messages"
3790 );
3791 }
3792
3793 #[test]
3794 fn test_maybe_auto_compact_queued_work_blocks() {
3795 use crate::app::domain::state::QueuedUserMessage;
3796
3797 let mut state = setup_auto_compact_state(true, 90, 4);
3798 let session_id = state.session_id;
3799 let model = builtin::claude_sonnet_4_5();
3800
3801 state.queue_user_message(QueuedUserMessage {
3803 op_id: OpId::new(),
3804 message_id: MessageId::new(),
3805 content: vec![UserContent::Text {
3806 text: "queued msg".to_string(),
3807 }],
3808 model: builtin::claude_sonnet_4_5(),
3809 queued_at: 0,
3810 });
3811
3812 let usage = Some(TokenUsage::new(80_000, 15_000, 95_000));
3813 let effects = maybe_auto_compact(&mut state, session_id, usage, Some(100_000), &model);
3814
3815 assert!(
3816 effects.is_empty(),
3817 "expected empty effects when there is queued work"
3818 );
3819 }
3820
3821 #[test]
3822 fn test_handle_compaction_failed_emits_compact_result() {
3823 use crate::app::domain::event::{CompactResult, CompactTrigger};
3824
3825 let mut state = test_state();
3826 let session_id = state.session_id;
3827 let op_id = OpId::new();
3828
3829 state.start_operation(
3831 op_id,
3832 OperationKind::Compact {
3833 trigger: CompactTrigger::Auto,
3834 },
3835 );
3836 state
3837 .operation_models
3838 .insert(op_id, builtin::claude_sonnet_4_5());
3839
3840 let effects = reduce(
3841 &mut state,
3842 Action::CompactionFailed {
3843 session_id,
3844 op_id,
3845 error: "test error".into(),
3846 },
3847 );
3848
3849 let has_compact_result = effects.iter().any(|e| {
3851 matches!(
3852 e,
3853 Effect::EmitEvent {
3854 event: SessionEvent::CompactResult {
3855 result: CompactResult::Failed(msg),
3856 trigger: CompactTrigger::Auto,
3857 },
3858 ..
3859 } if msg == "test error"
3860 )
3861 });
3862 assert!(
3863 has_compact_result,
3864 "expected CompactResult::Failed event with Auto trigger"
3865 );
3866
3867 let has_error_event = effects.iter().any(|e| {
3869 matches!(
3870 e,
3871 Effect::EmitEvent {
3872 event: SessionEvent::Error { .. },
3873 ..
3874 }
3875 )
3876 });
3877 assert!(
3878 !has_error_event,
3879 "should not emit SessionEvent::Error for compaction failure"
3880 );
3881 }
3882
3883 fn extract_callmodel_messages(effects: &[Effect]) -> Option<&Vec<Message>> {
3885 effects.iter().find_map(|e| match e {
3886 Effect::CallModel { messages, .. } => Some(messages),
3887 _ => None,
3888 })
3889 }
3890
3891 fn messages_contain_text(messages: &[Message], needle: &str) -> bool {
3893 messages.iter().any(|m| match &m.data {
3894 MessageData::User { content } => content.iter().any(|c| match c {
3895 UserContent::Text { text } => text.contains(needle),
3896 _ => false,
3897 }),
3898 MessageData::Assistant { content } => content.iter().any(|c| match c {
3899 AssistantContent::Text { text } => text.contains(needle),
3900 _ => false,
3901 }),
3902 _ => false,
3903 })
3904 }
3905
3906 #[test]
3907 fn test_compaction_full_cycle_callmodel_filters_messages() {
3908 use crate::app::domain::types::CompactionId;
3909
3910 let mut state = test_state();
3911 let session_id = state.session_id;
3912 let model = builtin::claude_sonnet_4_5();
3913
3914 let op_id_1 = OpId::new();
3917 let _ = reduce(
3918 &mut state,
3919 Action::UserInput {
3920 session_id,
3921 content: vec![UserContent::Text {
3922 text: "hello".to_string(),
3923 }],
3924 op_id: op_id_1,
3925 message_id: MessageId::new(),
3926 model: model.clone(),
3927 timestamp: 1,
3928 },
3929 );
3930 let _ = reduce(
3931 &mut state,
3932 Action::ModelResponseComplete {
3933 session_id,
3934 op_id: op_id_1,
3935 message_id: MessageId::new(),
3936 content: vec![AssistantContent::Text {
3937 text: "hi there".to_string(),
3938 }],
3939 usage: None,
3940 context_window_tokens: None,
3941 timestamp: 2,
3942 },
3943 );
3944
3945 let op_id_1b = OpId::new();
3947 let _ = reduce(
3948 &mut state,
3949 Action::UserInput {
3950 session_id,
3951 content: vec![UserContent::Text {
3952 text: "how are you".to_string(),
3953 }],
3954 op_id: op_id_1b,
3955 message_id: MessageId::new(),
3956 model: model.clone(),
3957 timestamp: 3,
3958 },
3959 );
3960 let assistant_msg_id = MessageId::new();
3961 let _ = reduce(
3962 &mut state,
3963 Action::ModelResponseComplete {
3964 session_id,
3965 op_id: op_id_1b,
3966 message_id: assistant_msg_id.clone(),
3967 content: vec![AssistantContent::Text {
3968 text: "doing well".to_string(),
3969 }],
3970 usage: None,
3971 context_window_tokens: None,
3972 timestamp: 4,
3973 },
3974 );
3975 assert!(
3976 state.current_operation.is_none(),
3977 "operation should be complete after text-only response"
3978 );
3979 assert!(state.message_graph.messages.len() >= 3);
3980
3981 let op_id_2 = OpId::new();
3983 let compact_effects = reduce(
3984 &mut state,
3985 Action::RequestCompaction {
3986 session_id,
3987 op_id: op_id_2,
3988 model: model.clone(),
3989 },
3990 );
3991 assert!(
3992 compact_effects
3993 .iter()
3994 .any(|e| matches!(e, Effect::RequestCompaction { .. })),
3995 "expected Effect::RequestCompaction"
3996 );
3997
3998 let compaction_id = CompactionId::new();
4000 let summary_message_id = MessageId::new();
4001 let active_before = state.message_graph.active_message_id.clone();
4002 let _ = reduce(
4003 &mut state,
4004 Action::CompactionComplete {
4005 session_id,
4006 op_id: op_id_2,
4007 compaction_id,
4008 summary_message_id: summary_message_id.clone(),
4009 summary: "Summary of conversation.".to_string(),
4010 compacted_head_message_id: assistant_msg_id,
4011 previous_active_message_id: active_before.map(MessageId::from_string),
4012 model: "claude-sonnet-4-5-20250929".to_string(),
4013 timestamp: 3,
4014 },
4015 );
4016 assert!(
4017 state.compaction_summary_ids.contains(&summary_message_id.0),
4018 "compaction_summary_ids should contain the summary id"
4019 );
4020
4021 let op_id_3 = OpId::new();
4023 let effects = reduce(
4024 &mut state,
4025 Action::UserInput {
4026 session_id,
4027 content: vec![UserContent::Text {
4028 text: "new question".to_string(),
4029 }],
4030 op_id: op_id_3,
4031 message_id: MessageId::new(),
4032 model: model.clone(),
4033 timestamp: 10,
4034 },
4035 );
4036
4037 let messages = extract_callmodel_messages(&effects)
4039 .expect("expected Effect::CallModel from UserInput after compaction");
4040
4041 assert_eq!(
4042 messages.len(),
4043 2,
4044 "CallModel should contain exactly 2 messages (summary + new user msg), got: {:?}",
4045 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4046 );
4047
4048 assert!(
4050 matches!(&messages[0].data, MessageData::Assistant { content } if content.iter().any(|c| matches!(c, AssistantContent::Text { text } if text == "Summary of conversation."))),
4051 "messages[0] should be the compaction summary"
4052 );
4053
4054 assert!(
4056 matches!(&messages[1].data, MessageData::User { content } if content.iter().any(|c| matches!(c, UserContent::Text { text } if text == "new question"))),
4057 "messages[1] should be the new user message"
4058 );
4059
4060 assert!(
4062 !messages_contain_text(messages, "hello"),
4063 "CallModel messages must not contain pre-compaction user text 'hello'"
4064 );
4065 assert!(
4066 !messages_contain_text(messages, "hi there"),
4067 "CallModel messages must not contain pre-compaction assistant text 'hi there'"
4068 );
4069 assert!(
4070 !messages_contain_text(messages, "how are you"),
4071 "CallModel messages must not contain pre-compaction user text 'how are you'"
4072 );
4073 assert!(
4074 !messages_contain_text(messages, "doing well"),
4075 "CallModel messages must not contain pre-compaction assistant text 'doing well'"
4076 );
4077 }
4078
4079 #[test]
4080 fn test_compaction_queued_message_callmodel_filters_messages() {
4081 use crate::app::domain::types::CompactionId;
4082
4083 let mut state = test_state();
4084 let session_id = state.session_id;
4085 let model = builtin::claude_sonnet_4_5();
4086
4087 let op_id_1 = OpId::new();
4090 let _ = reduce(
4091 &mut state,
4092 Action::UserInput {
4093 session_id,
4094 content: vec![UserContent::Text {
4095 text: "msg A".to_string(),
4096 }],
4097 op_id: op_id_1,
4098 message_id: MessageId::new(),
4099 model: model.clone(),
4100 timestamp: 1,
4101 },
4102 );
4103 let _ = reduce(
4104 &mut state,
4105 Action::ModelResponseComplete {
4106 session_id,
4107 op_id: op_id_1,
4108 message_id: MessageId::new(),
4109 content: vec![AssistantContent::Text {
4110 text: "response A".to_string(),
4111 }],
4112 usage: None,
4113 context_window_tokens: None,
4114 timestamp: 2,
4115 },
4116 );
4117
4118 let op_id_1b = OpId::new();
4120 let _ = reduce(
4121 &mut state,
4122 Action::UserInput {
4123 session_id,
4124 content: vec![UserContent::Text {
4125 text: "msg A2".to_string(),
4126 }],
4127 op_id: op_id_1b,
4128 message_id: MessageId::new(),
4129 model: model.clone(),
4130 timestamp: 3,
4131 },
4132 );
4133 let assistant_msg_id = MessageId::new();
4134 let _ = reduce(
4135 &mut state,
4136 Action::ModelResponseComplete {
4137 session_id,
4138 op_id: op_id_1b,
4139 message_id: assistant_msg_id.clone(),
4140 content: vec![AssistantContent::Text {
4141 text: "response A2".to_string(),
4142 }],
4143 usage: None,
4144 context_window_tokens: None,
4145 timestamp: 4,
4146 },
4147 );
4148 assert!(state.current_operation.is_none());
4149
4150 let op_id_2 = OpId::new();
4152 let _ = reduce(
4153 &mut state,
4154 Action::RequestCompaction {
4155 session_id,
4156 op_id: op_id_2,
4157 model: model.clone(),
4158 },
4159 );
4160 assert!(
4161 state.has_active_operation(),
4162 "Compact operation should be active"
4163 );
4164
4165 let op_id_3 = OpId::new();
4167 let queued_effects = reduce(
4168 &mut state,
4169 Action::UserInput {
4170 session_id,
4171 content: vec![UserContent::Text {
4172 text: "msg B".to_string(),
4173 }],
4174 op_id: op_id_3,
4175 message_id: MessageId::new(),
4176 model: model.clone(),
4177 timestamp: 5,
4178 },
4179 );
4180 assert!(
4181 queued_effects.iter().any(|e| matches!(
4182 e,
4183 Effect::EmitEvent {
4184 event: SessionEvent::QueueUpdated { .. },
4185 ..
4186 }
4187 )),
4188 "queued user message should emit QueueUpdated"
4189 );
4190
4191 let compaction_id = CompactionId::new();
4193 let summary_message_id = MessageId::new();
4194 let active_before = state.message_graph.active_message_id.clone();
4195 let completion_effects = reduce(
4196 &mut state,
4197 Action::CompactionComplete {
4198 session_id,
4199 op_id: op_id_2,
4200 compaction_id,
4201 summary_message_id: summary_message_id.clone(),
4202 summary: "Summary of A.".to_string(),
4203 compacted_head_message_id: assistant_msg_id,
4204 previous_active_message_id: active_before.map(MessageId::from_string),
4205 model: "claude-sonnet-4-5-20250929".to_string(),
4206 timestamp: 6,
4207 },
4208 );
4209
4210 let messages = extract_callmodel_messages(&completion_effects)
4212 .expect("CompactionComplete should dequeue 'msg B' and produce CallModel");
4213
4214 assert_eq!(
4215 messages.len(),
4216 2,
4217 "CallModel should contain exactly 2 messages (summary + 'msg B'), got: {:?}",
4218 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4219 );
4220
4221 assert!(
4223 messages_contain_text(messages, "Summary of A."),
4224 "CallModel messages should contain the compaction summary"
4225 );
4226
4227 assert!(
4229 messages_contain_text(messages, "msg B"),
4230 "CallModel messages should contain the dequeued 'msg B'"
4231 );
4232
4233 assert!(
4235 !messages_contain_text(messages, "response A"),
4236 "CallModel messages must not contain pre-compaction 'response A'"
4237 );
4238 assert!(
4239 !messages_contain_text(messages, "response A2"),
4240 "CallModel messages must not contain pre-compaction 'response A2'"
4241 );
4242 }
4243
4244 #[test]
4245 fn test_compaction_multi_round_conversation_then_compact() {
4246 use crate::app::domain::types::CompactionId;
4247
4248 let mut state = test_state();
4249 let session_id = state.session_id;
4250 let model = builtin::claude_sonnet_4_5();
4251
4252 let pre_compaction_texts = [
4254 ("alpha user", "alpha assistant"),
4255 ("beta user", "beta assistant"),
4256 ("gamma user", "gamma assistant"),
4257 ];
4258 let mut last_assistant_msg_id = MessageId::new();
4259 for (i, (user_text, assistant_text)) in pre_compaction_texts.iter().enumerate() {
4260 let op = OpId::new();
4261 let _ = reduce(
4262 &mut state,
4263 Action::UserInput {
4264 session_id,
4265 content: vec![UserContent::Text {
4266 text: (*user_text).to_string(),
4267 }],
4268 op_id: op,
4269 message_id: MessageId::new(),
4270 model: model.clone(),
4271 timestamp: (i * 2 + 1) as u64,
4272 },
4273 );
4274 last_assistant_msg_id = MessageId::new();
4275 let _ = reduce(
4276 &mut state,
4277 Action::ModelResponseComplete {
4278 session_id,
4279 op_id: op,
4280 message_id: last_assistant_msg_id.clone(),
4281 content: vec![AssistantContent::Text {
4282 text: assistant_text.to_string(),
4283 }],
4284 usage: None,
4285 context_window_tokens: None,
4286 timestamp: (i * 2 + 2) as u64,
4287 },
4288 );
4289 }
4290 assert_eq!(state.message_graph.messages.len(), 6);
4291 assert!(state.current_operation.is_none());
4292
4293 let op_compact = OpId::new();
4295 let _ = reduce(
4296 &mut state,
4297 Action::RequestCompaction {
4298 session_id,
4299 op_id: op_compact,
4300 model: model.clone(),
4301 },
4302 );
4303
4304 let compaction_id = CompactionId::new();
4305 let summary_message_id = MessageId::new();
4306 let active_before = state.message_graph.active_message_id.clone();
4307 let _ = reduce(
4308 &mut state,
4309 Action::CompactionComplete {
4310 session_id,
4311 op_id: op_compact,
4312 compaction_id,
4313 summary_message_id: summary_message_id.clone(),
4314 summary: "Summary of greek turns.".to_string(),
4315 compacted_head_message_id: last_assistant_msg_id,
4316 previous_active_message_id: active_before.map(MessageId::from_string),
4317 model: "claude-sonnet-4-5-20250929".to_string(),
4318 timestamp: 100,
4319 },
4320 );
4321
4322 let post_compaction_texts = [
4324 ("delta user", "delta assistant"),
4325 ("epsilon user", "epsilon assistant"),
4326 ];
4327 for (i, (user_text, assistant_text)) in post_compaction_texts.iter().enumerate() {
4328 let op = OpId::new();
4329 let _ = reduce(
4330 &mut state,
4331 Action::UserInput {
4332 session_id,
4333 content: vec![UserContent::Text {
4334 text: (*user_text).to_string(),
4335 }],
4336 op_id: op,
4337 message_id: MessageId::new(),
4338 model: model.clone(),
4339 timestamp: (101 + i * 2) as u64,
4340 },
4341 );
4342 let _ = reduce(
4343 &mut state,
4344 Action::ModelResponseComplete {
4345 session_id,
4346 op_id: op,
4347 message_id: MessageId::new(),
4348 content: vec![AssistantContent::Text {
4349 text: assistant_text.to_string(),
4350 }],
4351 usage: None,
4352 context_window_tokens: None,
4353 timestamp: (102 + i * 2) as u64,
4354 },
4355 );
4356 }
4357
4358 let final_op = OpId::new();
4360 let effects = reduce(
4361 &mut state,
4362 Action::UserInput {
4363 session_id,
4364 content: vec![UserContent::Text {
4365 text: "final question".to_string(),
4366 }],
4367 op_id: final_op,
4368 message_id: MessageId::new(),
4369 model: model.clone(),
4370 timestamp: 200,
4371 },
4372 );
4373
4374 let messages =
4375 extract_callmodel_messages(&effects).expect("expected CallModel from final UserInput");
4376
4377 assert_eq!(
4379 messages.len(),
4380 6,
4381 "CallModel should have 6 messages (summary + 4 post-compaction + final), got: {:?}",
4382 messages.iter().map(|m| m.id()).collect::<Vec<_>>()
4383 );
4384
4385 assert!(
4387 messages_contain_text(&messages[..1], "Summary of greek turns."),
4388 "first message should be the compaction summary"
4389 );
4390
4391 assert!(
4393 messages_contain_text(&messages[5..], "final question"),
4394 "last message should be the final user question"
4395 );
4396
4397 assert!(
4399 messages_contain_text(messages, "delta user"),
4400 "should contain post-compaction user text"
4401 );
4402 assert!(
4403 messages_contain_text(messages, "epsilon assistant"),
4404 "should contain post-compaction assistant text"
4405 );
4406
4407 for (user_text, assistant_text) in &pre_compaction_texts {
4409 assert!(
4410 !messages_contain_text(messages, user_text),
4411 "CallModel messages must not contain pre-compaction text '{user_text}'"
4412 );
4413 assert!(
4414 !messages_contain_text(messages, assistant_text),
4415 "CallModel messages must not contain pre-compaction text '{assistant_text}'"
4416 );
4417 }
4418 }
4419}