steer_core/app/domain/runtime/
interpreter.rs1use std::sync::Arc;
2
3use futures_util::StreamExt;
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6
7use crate::api::Client as ApiClient;
8use crate::api::provider::StreamChunk;
9use crate::app::SystemContext;
10use crate::app::conversation::{AssistantContent, Message};
11use crate::app::domain::delta::{StreamDelta, ToolCallDelta};
12use crate::app::domain::types::{MessageId, OpId, SessionId, ToolCallId};
13use crate::config::model::ModelId;
14use crate::tools::{SessionMcpBackends, ToolExecutor};
15use steer_tools::{ToolCall, ToolError, ToolResult, ToolSchema};
16
17#[derive(Clone)]
18pub struct EffectInterpreter {
19 api_client: Arc<ApiClient>,
20 tool_executor: Arc<ToolExecutor>,
21 session_id: Option<SessionId>,
22 session_backends: Option<Arc<SessionMcpBackends>>,
23}
24
25pub(crate) struct DeltaStreamContext {
26 tx: mpsc::Sender<StreamDelta>,
27 context: (OpId, MessageId),
28}
29
30impl DeltaStreamContext {
31 pub(crate) fn new(tx: mpsc::Sender<StreamDelta>, context: (OpId, MessageId)) -> Self {
32 Self { tx, context }
33 }
34}
35
36impl EffectInterpreter {
37 pub fn new(api_client: Arc<ApiClient>, tool_executor: Arc<ToolExecutor>) -> Self {
38 Self {
39 api_client,
40 tool_executor,
41 session_id: None,
42 session_backends: None,
43 }
44 }
45
46 pub fn with_session(mut self, session_id: SessionId) -> Self {
47 self.session_id = Some(session_id);
48 self
49 }
50
51 pub fn with_session_backends(mut self, backends: Arc<SessionMcpBackends>) -> Self {
52 self.session_backends = Some(backends);
53 self
54 }
55
56 pub async fn call_model(
57 &self,
58 model: ModelId,
59 messages: Vec<Message>,
60 system_context: Option<SystemContext>,
61 tools: Vec<ToolSchema>,
62 cancel_token: CancellationToken,
63 ) -> Result<Vec<AssistantContent>, String> {
64 self.call_model_with_deltas(model, messages, system_context, tools, cancel_token, None)
65 .await
66 }
67
68 pub(crate) async fn call_model_with_deltas(
69 &self,
70 model: ModelId,
71 messages: Vec<Message>,
72 system_context: Option<SystemContext>,
73 tools: Vec<ToolSchema>,
74 cancel_token: CancellationToken,
75 delta_stream: Option<DeltaStreamContext>,
76 ) -> Result<Vec<AssistantContent>, String> {
77 let tools_option = if tools.is_empty() { None } else { Some(tools) };
78
79 let mut stream = self
80 .api_client
81 .stream_complete(
82 &model,
83 messages,
84 system_context,
85 tools_option,
86 None,
87 cancel_token,
88 )
89 .await
90 .map_err(|e| e.to_string())?;
91
92 let mut final_content: Option<Vec<AssistantContent>> = None;
93 while let Some(chunk) = stream.next().await {
94 match chunk {
95 StreamChunk::TextDelta(text) => {
96 if let Some(delta_stream) = &delta_stream {
97 let (op_id, message_id) = &delta_stream.context;
98 let delta = StreamDelta::TextChunk {
99 op_id: *op_id,
100 message_id: message_id.clone(),
101 delta: text,
102 };
103 let _ = delta_stream.tx.send(delta).await;
104 }
105 }
106 StreamChunk::ThinkingDelta(thinking) => {
107 if let Some(delta_stream) = &delta_stream {
108 let (op_id, message_id) = &delta_stream.context;
109 let delta = StreamDelta::ThinkingChunk {
110 op_id: *op_id,
111 message_id: message_id.clone(),
112 delta: thinking,
113 };
114 let _ = delta_stream.tx.send(delta).await;
115 }
116 }
117 StreamChunk::ToolUseInputDelta { id, delta } => {
118 if let Some(delta_stream) = &delta_stream {
119 let (op_id, message_id) = &delta_stream.context;
120 let delta = StreamDelta::ToolCallChunk {
121 op_id: *op_id,
122 message_id: message_id.clone(),
123 tool_call_id: ToolCallId::from_string(&id),
124 delta: ToolCallDelta::ArgumentChunk(delta),
125 };
126 let _ = delta_stream.tx.send(delta).await;
127 }
128 }
129 StreamChunk::MessageComplete(response) => {
130 final_content = Some(response.content);
131 }
132 StreamChunk::Error(err) => {
133 return Err(err.to_string());
134 }
135 StreamChunk::ToolUseStart { .. } | StreamChunk::ContentBlockStop { .. } => {}
136 }
137 }
138
139 final_content.ok_or_else(|| "Stream ended without MessageComplete".to_string())
140 }
141
142 pub async fn execute_tool(
143 &self,
144 tool_call: ToolCall,
145 cancel_token: CancellationToken,
146 ) -> Result<ToolResult, ToolError> {
147 let resolver = self
148 .session_backends
149 .as_ref()
150 .map(|b| b.as_ref() as &dyn crate::tools::BackendResolver);
151
152 if let Some(session_id) = self.session_id {
153 self.tool_executor
154 .execute_tool_with_session_resolver(&tool_call, session_id, cancel_token, resolver)
155 .await
156 } else {
157 self.tool_executor
158 .execute_tool_with_resolver(&tool_call, cancel_token, resolver)
159 .await
160 }
161 }
162
163 pub async fn get_tool_schemas(&self) -> Vec<ToolSchema> {
164 let resolver = self
165 .session_backends
166 .as_ref()
167 .map(|b| b.as_ref() as &dyn crate::tools::BackendResolver);
168
169 self.tool_executor
170 .get_tool_schemas_with_resolver(resolver)
171 .await
172 }
173}