stynx_code_engine/application/
query_engine.rs1use std::sync::{Arc, atomic::AtomicU8};
2
3use stynx_code_config::HooksConfig;
4use stynx_code_errors::{AppError, AppResult};
5use stynx_code_types::{
6 ContentBlock, Conversation, Message, PermissionChecker, PermissionLevel, PermissionMode,
7 Provider, Role, StopReason,
8};
9use stynx_code_tools::ToolRegistry;
10
11use crate::application::undo::UndoStack;
12use crate::domain::EngineEvent;
13use super::compactor::compact;
14use super::hook_runner::{run_post_tool_use, run_pre_tool_use, run_stop_hooks};
15use super::stream_reader::read_stream;
16use super::tool_executor::{execute_tool, is_overloaded, retry_after_ms};
17
18pub struct QueryEngine {
19 provider: Arc<dyn Provider>,
20 registry: Arc<ToolRegistry>,
21 permission: Arc<dyn PermissionChecker>,
22 hooks: HooksConfig,
23 max_turns: usize,
24 context_limit: u64,
25 mode: Arc<AtomicU8>,
26 undo_stack: Arc<UndoStack>,
27}
28
29impl QueryEngine {
30 pub fn new(
31 provider: Arc<dyn Provider>,
32 registry: Arc<ToolRegistry>,
33 permission: Arc<dyn PermissionChecker>,
34 mode: Arc<AtomicU8>,
35 hooks: HooksConfig,
36 ) -> Self {
37 Self {
38 provider, registry, permission, hooks, mode,
39 max_turns: 200, context_limit: 80_000,
40 undo_stack: Arc::new(UndoStack::default()),
41 }
42 }
43
44 pub fn with_max_turns(mut self, n: usize) -> Self {
45 self.max_turns = n;
46 self
47 }
48
49 pub fn mode_flag(&self) -> Arc<AtomicU8> { self.mode.clone() }
50 pub fn undo_stack(&self) -> Arc<UndoStack> { self.undo_stack.clone() }
51
52 pub async fn run<F>(
53 &self,
54 mut conversation: Conversation,
55 mut on_event: F,
56 ) -> AppResult<Conversation>
57 where
58 F: FnMut(EngineEvent) + Send,
59 {
60 let mut last_input_tokens: u64 = 0;
61
62 for turn in 0..self.max_turns {
63 tracing::info!(turn, "starting provider turn");
64 let is_plan = PermissionMode::load(&self.mode) == PermissionMode::Plan;
65 let tools = if is_plan {
66 self.registry.tool_definitions_filtered(|t| {
67 t.permission_level() == PermissionLevel::ReadOnly || t.name() == "exit_plan_mode"
68 })
69 } else {
70 self.registry.tool_definitions_filtered(|t| {
71 t.name() != "enter_plan_mode" && t.name() != "exit_plan_mode"
72 })
73 };
74
75 if last_input_tokens > 0
76 && last_input_tokens > self.context_limit * 60 / 100
77 && conversation.messages.len() > 2
78 {
79 let original_turns = conversation.messages.len();
80 conversation = compact(&self.provider, conversation, &mut on_event).await?;
81 on_event(EngineEvent::Compacted { original_turns });
82 }
83
84 let mut attempts = 0u32;
85 let (assistant_blocks, stop_reason) = loop {
86 let mut stream = match self.provider.stream(&conversation, &tools).await {
87 Ok(s) => s,
88 Err(e) if attempts < 3 && is_overloaded(&e.to_string()) => {
89 attempts += 1;
90 let delay = retry_after_ms(&e.to_string())
91 .map(std::time::Duration::from_millis)
92 .unwrap_or_else(|| std::time::Duration::from_secs(2u64.pow(attempts)));
93 tracing::warn!(?delay, attempt = attempts, "provider overloaded, retrying");
94 tokio::time::sleep(delay).await;
95 continue;
96 }
97 Err(e) => return Err(e),
98 };
99
100 let (blocks, stop_reason, input_tokens, stream_error) =
101 read_stream(&mut stream, &mut on_event).await;
102
103 if input_tokens > 0 {
104 last_input_tokens = input_tokens;
105 }
106
107 if let Some(err_msg) = stream_error {
108 if attempts < 3 && is_overloaded(&err_msg) {
109 attempts += 1;
110 let delay = retry_after_ms(&err_msg)
111 .map(std::time::Duration::from_millis)
112 .unwrap_or_else(|| std::time::Duration::from_secs(2u64.pow(attempts)));
113 tracing::warn!(?delay, attempt = attempts, "stream error, retrying");
114 tokio::time::sleep(delay).await;
115 continue;
116 }
117 return Err(AppError::Provider(err_msg));
118 }
119
120 break (blocks, stop_reason);
121 };
122
123 conversation.push(Message::assistant(assistant_blocks.clone()));
124
125 if !matches!(stop_reason, StopReason::ToolUse) {
126 on_event(EngineEvent::TurnComplete);
127 let stop_out = run_stop_hooks(&self.hooks).await;
128 if !stop_out.is_empty() {
129 on_event(EngineEvent::HookOutput { source: "stop".into(), output: stop_out });
130 }
131 return Ok(conversation);
132 }
133
134 let tool_uses: Vec<(String, String, serde_json::Value)> = assistant_blocks
135 .iter()
136 .filter_map(|b| match b {
137 ContentBlock::ToolUse { id, name, input } => Some((id.clone(), name.clone(), input.clone())),
138 _ => None,
139 })
140 .collect();
141
142 let mut pre_outs = Vec::new();
143 for (_, name, input) in &tool_uses {
144 let pre = run_pre_tool_use(&self.hooks, name, &input.to_string()).await;
145 pre_outs.push(pre);
146 }
147
148 let registry = self.registry.clone();
149 let permission = self.permission.clone();
150 let undo = self.undo_stack.clone();
151
152 let mut exec_results: Vec<Result<Result<String, AppError>, tokio::task::JoinError>> =
153 Vec::with_capacity(tool_uses.len());
154
155 let mut parallel_handles: Vec<(usize, tokio::task::JoinHandle<Result<String, AppError>>)> = Vec::new();
156 for (i, ((_, name, input), pre)) in tool_uses.iter().zip(pre_outs.iter()).enumerate() {
157 if pre.blocked {
158 continue;
159 }
160 let tool = registry.get(name);
161 let is_safe = tool.is_some_and(|t| t.is_concurrent_safe(input));
162 if is_safe {
163 let reg = registry.clone();
164 let perm = permission.clone();
165 let ud = undo.clone();
166 let n = name.clone();
167 let inp = input.clone();
168 parallel_handles.push((i, tokio::spawn(async move {
169 execute_tool(®, &perm, &n, &inp, &ud).await
170 })));
171 }
172 }
173
174 let parallel_results: Vec<_> = futures::future::join_all(
175 parallel_handles.into_iter().map(|(i, h)| async move { (i, h.await) })
176 ).await;
177 let mut result_map: std::collections::HashMap<usize, Result<Result<String, AppError>, tokio::task::JoinError>> =
178 parallel_results.into_iter().collect();
179
180 for (i, ((_, name, input), pre)) in tool_uses.iter().zip(pre_outs.iter()).enumerate() {
181 if pre.blocked {
182 exec_results.push(Ok(Ok(String::new())));
183 } else if let Some(result) = result_map.remove(&i) {
184 exec_results.push(result);
185 } else {
186
187 let result = execute_tool(®istry, &permission, name, input, &undo).await;
188 exec_results.push(Ok(result));
189 }
190 }
191
192 let mut tool_results = Vec::new();
193 let mut exit_plan_called = false;
194 let mut pre_iter = pre_outs.into_iter();
195 let mut exec_iter = exec_results.into_iter();
196 for (id, name, input) in &tool_uses {
197 let pre = pre_iter.next().unwrap();
198 let exec_result = exec_iter.next().unwrap();
199 let input_json = input.to_string();
200 if !pre.output.is_empty() {
201 on_event(EngineEvent::HookOutput { source: "pre-tool".into(), output: pre.output });
202 }
203 if pre.blocked {
204 on_event(EngineEvent::ToolResult { name: name.clone(), output: pre.reason.clone(), is_error: true });
205 tool_results.push(ContentBlock::ToolResult { tool_use_id: id.clone(), content: pre.reason, is_error: Some(true) });
206 } else {
207 let result: AppResult<String> = match exec_result {
208 Ok(r) => r,
209 Err(e) => Err(AppError::Tool(e.to_string())),
210 };
211 match result {
212 Ok(output) => {
213 let post = run_post_tool_use(&self.hooks, name, &input_json, &output).await;
214 if !post.is_empty() {
215 on_event(EngineEvent::HookOutput { source: "post-tool".into(), output: post });
216 }
217 on_event(EngineEvent::ToolResult { name: name.clone(), output: output.clone(), is_error: false });
218 tool_results.push(ContentBlock::ToolResult { tool_use_id: id.clone(), content: output, is_error: None });
219 if name == "exit_plan_mode" {
220 PermissionMode::Normal.store(&self.mode);
221 on_event(EngineEvent::ModeChanged { mode: PermissionMode::Normal });
222 exit_plan_called = true;
223 }
224 }
225 Err(ref e) if e.is_interrupted() => {
226 return Err(AppError::Interrupted);
227 }
228 Err(e) => {
229 let msg = e.to_string();
230 on_event(EngineEvent::ToolResult { name: name.clone(), output: msg.clone(), is_error: true });
231 tool_results.push(ContentBlock::ToolResult { tool_use_id: id.clone(), content: msg, is_error: Some(true) });
232 }
233 }
234 }
235 }
236
237 conversation.push(Message {
238 role: Role::User,
239 content: tool_results,
240 });
241
242 if exit_plan_called {
243 on_event(EngineEvent::TurnComplete);
244 return Ok(conversation);
245 }
246 }
247
248 Err(AppError::MaxTurnsExceeded(self.max_turns))
249 }
250}