Skip to main content

steer_core/app/domain/runtime/
interpreter.rs

1use std::sync::Arc;
2
3use futures_util::StreamExt;
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6
7use crate::api::Client as ApiClient;
8use crate::api::provider::StreamChunk;
9use crate::app::SystemContext;
10use crate::app::conversation::{AssistantContent, Message};
11use crate::app::domain::delta::{StreamDelta, ToolCallDelta};
12use crate::app::domain::types::{MessageId, OpId, SessionId, ToolCallId};
13use crate::config::model::ModelId;
14use crate::tools::{SessionMcpBackends, ToolExecutor};
15use steer_tools::{ToolCall, ToolError, ToolResult, ToolSchema};
16
17#[derive(Clone)]
18pub struct EffectInterpreter {
19    api_client: Arc<ApiClient>,
20    tool_executor: Arc<ToolExecutor>,
21    session_id: Option<SessionId>,
22    session_backends: Option<Arc<SessionMcpBackends>>,
23}
24
25pub(crate) struct DeltaStreamContext {
26    tx: mpsc::Sender<StreamDelta>,
27    context: (OpId, MessageId),
28}
29
30impl DeltaStreamContext {
31    pub(crate) fn new(tx: mpsc::Sender<StreamDelta>, context: (OpId, MessageId)) -> Self {
32        Self { tx, context }
33    }
34}
35
36impl EffectInterpreter {
37    pub fn new(api_client: Arc<ApiClient>, tool_executor: Arc<ToolExecutor>) -> Self {
38        Self {
39            api_client,
40            tool_executor,
41            session_id: None,
42            session_backends: None,
43        }
44    }
45
46    pub fn with_session(mut self, session_id: SessionId) -> Self {
47        self.session_id = Some(session_id);
48        self
49    }
50
51    pub fn with_session_backends(mut self, backends: Arc<SessionMcpBackends>) -> Self {
52        self.session_backends = Some(backends);
53        self
54    }
55
56    pub async fn call_model(
57        &self,
58        model: ModelId,
59        messages: Vec<Message>,
60        system_context: Option<SystemContext>,
61        tools: Vec<ToolSchema>,
62        cancel_token: CancellationToken,
63    ) -> Result<Vec<AssistantContent>, String> {
64        self.call_model_with_deltas(model, messages, system_context, tools, cancel_token, None)
65            .await
66    }
67
68    pub(crate) async fn call_model_with_deltas(
69        &self,
70        model: ModelId,
71        messages: Vec<Message>,
72        system_context: Option<SystemContext>,
73        tools: Vec<ToolSchema>,
74        cancel_token: CancellationToken,
75        delta_stream: Option<DeltaStreamContext>,
76    ) -> Result<Vec<AssistantContent>, String> {
77        let tools_option = if tools.is_empty() { None } else { Some(tools) };
78
79        let mut stream = self
80            .api_client
81            .stream_complete(
82                &model,
83                messages,
84                system_context,
85                tools_option,
86                None,
87                cancel_token,
88            )
89            .await
90            .map_err(|e| e.to_string())?;
91
92        let mut final_content: Option<Vec<AssistantContent>> = None;
93        while let Some(chunk) = stream.next().await {
94            match chunk {
95                StreamChunk::TextDelta(text) => {
96                    if let Some(delta_stream) = &delta_stream {
97                        let (op_id, message_id) = &delta_stream.context;
98                        let delta = StreamDelta::TextChunk {
99                            op_id: *op_id,
100                            message_id: message_id.clone(),
101                            delta: text,
102                        };
103                        let _ = delta_stream.tx.send(delta).await;
104                    }
105                }
106                StreamChunk::ThinkingDelta(thinking) => {
107                    if let Some(delta_stream) = &delta_stream {
108                        let (op_id, message_id) = &delta_stream.context;
109                        let delta = StreamDelta::ThinkingChunk {
110                            op_id: *op_id,
111                            message_id: message_id.clone(),
112                            delta: thinking,
113                        };
114                        let _ = delta_stream.tx.send(delta).await;
115                    }
116                }
117                StreamChunk::ToolUseInputDelta { id, delta } => {
118                    if let Some(delta_stream) = &delta_stream {
119                        let (op_id, message_id) = &delta_stream.context;
120                        let delta = StreamDelta::ToolCallChunk {
121                            op_id: *op_id,
122                            message_id: message_id.clone(),
123                            tool_call_id: ToolCallId::from_string(&id),
124                            delta: ToolCallDelta::ArgumentChunk(delta),
125                        };
126                        let _ = delta_stream.tx.send(delta).await;
127                    }
128                }
129                StreamChunk::MessageComplete(response) => {
130                    final_content = Some(response.content);
131                }
132                StreamChunk::Error(err) => {
133                    return Err(err.to_string());
134                }
135                StreamChunk::ToolUseStart { .. } | StreamChunk::ContentBlockStop { .. } => {}
136            }
137        }
138
139        final_content.ok_or_else(|| "Stream ended without MessageComplete".to_string())
140    }
141
142    pub async fn execute_tool(
143        &self,
144        tool_call: ToolCall,
145        cancel_token: CancellationToken,
146    ) -> Result<ToolResult, ToolError> {
147        let resolver = self
148            .session_backends
149            .as_ref()
150            .map(|b| b.as_ref() as &dyn crate::tools::BackendResolver);
151
152        if let Some(session_id) = self.session_id {
153            self.tool_executor
154                .execute_tool_with_session_resolver(&tool_call, session_id, cancel_token, resolver)
155                .await
156        } else {
157            self.tool_executor
158                .execute_tool_with_resolver(&tool_call, cancel_token, resolver)
159                .await
160        }
161    }
162
163    pub async fn get_tool_schemas(&self) -> Vec<ToolSchema> {
164        let resolver = self
165            .session_backends
166            .as_ref()
167            .map(|b| b.as_ref() as &dyn crate::tools::BackendResolver);
168
169        self.tool_executor
170            .get_tool_schemas_with_resolver(resolver)
171            .await
172    }
173}