1use super::Runtime;
4use crate::{Config, Conversation, Env, Hook};
5use anyhow::Result;
6use async_stream::stream;
7use crabllm_core::{ChatCompletionRequest, Message, ToolChoice};
8use futures_core::Stream;
9use futures_util::StreamExt;
10use tokio::sync::{mpsc, watch};
11use wcore::{AgentEvent, AgentResponse, AgentStopReason, model::HistoryEntry};
12
13impl<C: Config> Runtime<C> {
14 fn prepare_history(
15 &self,
16 conversation: &mut Conversation,
17 agent: &str,
18 content: &str,
19 sender: &str,
20 ) {
21 let content = self
22 .env
23 .hook()
24 .preprocess(agent, content)
25 .unwrap_or_else(|| content.to_owned());
26 if sender.is_empty() {
27 conversation.history.push(HistoryEntry::user(&content));
28 } else {
29 conversation
30 .history
31 .push(HistoryEntry::user_with_sender(&content, sender));
32 }
33
34 conversation.history.retain(|e| !e.auto_injected);
35
36 let mut recall_msgs =
37 self.env
38 .hook()
39 .on_before_run(agent, conversation.id, &conversation.history);
40
41 let cwd = self.env.effective_cwd(conversation.id);
43 if let Some(instructions) = self.env.discover_instructions(&cwd) {
44 recall_msgs.push(
45 HistoryEntry::user(format!("<instructions>\n{instructions}\n</instructions>"))
46 .auto_injected(),
47 );
48 }
49
50 if conversation.history.iter().any(|e| !e.agent.is_empty()) {
52 recall_msgs.push(
53 HistoryEntry::user(
54 "Messages wrapped in <from agent=\"...\"> tags are from guest agents \
55 who were consulted in this conversation. Continue responding as yourself."
56 .to_string(),
57 )
58 .auto_injected(),
59 );
60 }
61 if !recall_msgs.is_empty() {
62 let insert_pos = conversation.history.len().saturating_sub(1);
63 for (i, entry) in recall_msgs.into_iter().enumerate() {
64 conversation.history.insert(insert_pos + i, entry);
65 }
66 }
67 }
68
69 pub async fn send_to(
70 &self,
71 conversation_id: u64,
72 content: &str,
73 sender: &str,
74 tool_choice: Option<ToolChoice>,
75 ) -> Result<AgentResponse> {
76 let (agent_name, created_by, conversation_mutex) = self
77 .acquire_slot(conversation_id)
78 .await
79 .ok_or_else(|| anyhow::anyhow!("conversation {conversation_id} not found"))?;
80
81 let mut conversation = conversation_mutex.lock().await;
82 let pre_run_len = conversation.history.len();
83 self.prepare_history(&mut conversation, &agent_name, content, sender);
84 let agent = self
85 .resolve_agent(&agent_name)
86 .await
87 .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", agent_name))?;
88
89 let (tx, mut rx) = mpsc::unbounded_channel();
90 let run_start = std::time::Instant::now();
91 let response = agent
92 .run(&mut conversation.history, tx, None, tool_choice)
93 .await;
94
95 let mut compact_summary: Option<String> = None;
96 while let Ok(event) = rx.try_recv() {
97 if let AgentEvent::Compact { ref summary } = event {
98 compact_summary = Some(summary.clone());
99 }
100 self.env
101 .hook()
102 .on_event(&agent_name, conversation_id, &event);
103 self.env
104 .on_agent_event(&agent_name, conversation_id, &event);
105 }
106
107 self.finalize_run(
108 conversation_id,
109 &mut conversation,
110 conversation_mutex.clone(),
111 &agent_name,
112 &created_by,
113 run_start,
114 pre_run_len,
115 compact_summary,
116 &[],
117 );
118 Ok(response)
119 }
120
121 pub fn stream_to(
122 &self,
123 conversation_id: u64,
124 content: &str,
125 sender: &str,
126 tool_choice: Option<ToolChoice>,
127 ) -> impl Stream<Item = AgentEvent> + '_ {
128 let content = content.to_owned();
129 let sender = sender.to_owned();
130 stream! {
131 let Some((agent_name, created_by, conversation_mutex)) =
132 self.acquire_slot(conversation_id).await
133 else {
134 yield AgentEvent::Done(AgentResponse::error(
135 format!("conversation {conversation_id} not found"),
136 ));
137 return;
138 };
139
140 let mut conversation = conversation_mutex.lock().await;
141 let pre_run_len = conversation.history.len();
142 self.prepare_history(&mut conversation, &agent_name, &content, &sender);
143 let Some(agent) = self.resolve_agent(&agent_name).await else {
144 yield AgentEvent::Done(AgentResponse::error(
145 format!("agent '{}' not registered", agent_name),
146 ));
147 return;
148 };
149
150 let run_start = std::time::Instant::now();
151 let (steer_tx, steer_rx) = watch::channel(None::<String>);
152 self.steering.write().await.insert(conversation_id, steer_tx);
153 let mut compact_summary: Option<String> = None;
154 let mut done_event: Option<AgentEvent> = None;
155 let mut event_trace: Vec<wcore::EventLine> = Vec::new();
156 {
157 let mut event_stream = std::pin::pin!(agent.run_stream(&mut conversation.history, Some(conversation_id), Some(steer_rx), tool_choice));
158 while let Some(event) = event_stream.next().await {
159 if let AgentEvent::Compact { ref summary } = event {
160 compact_summary = Some(summary.clone());
161 }
162 self.env.hook().on_event(&agent_name, conversation_id, &event);
163 self.env.on_agent_event(&agent_name, conversation_id, &event);
164 if let Some(line) = wcore::EventLine::from_agent_event(&event) {
165 event_trace.push(line);
166 }
167 if matches!(event, AgentEvent::Done(_)) {
168 done_event = Some(event);
169 } else {
170 yield event;
171 }
172 }
173 }
174 self.steering.write().await.remove(&conversation_id);
175 self.finalize_run(
176 conversation_id,
177 &mut conversation,
178 conversation_mutex.clone(),
179 &agent_name,
180 &created_by,
181 run_start,
182 pre_run_len,
183 compact_summary,
184 &event_trace,
185 );
186 if let Some(event) = done_event {
187 yield event;
188 }
189 }
190 }
191
192 pub fn guest_stream_to(
193 &self,
194 conversation_id: u64,
195 content: &str,
196 sender: &str,
197 guest: &str,
198 ) -> impl Stream<Item = AgentEvent> + '_ {
199 let content = content.to_owned();
200 let sender = sender.to_owned();
201 let guest = guest.to_owned();
202 stream! {
203 let Some(guest_agent) = self.resolve_agent(&guest).await else {
204 yield AgentEvent::Done(AgentResponse::error(
205 format!("guest agent '{guest}' not registered"),
206 ));
207 return;
208 };
209
210 let Some((agent_name, created_by, conversation_mutex)) =
211 self.acquire_slot(conversation_id).await
212 else {
213 yield AgentEvent::Done(AgentResponse::error(
214 format!("conversation {conversation_id} not found"),
215 ));
216 return;
217 };
218
219 let mut conversation = conversation_mutex.lock().await;
220 let pre_run_len = conversation.history.len();
221
222 let content = self
223 .env
224 .hook()
225 .preprocess(&agent_name, &content)
226 .unwrap_or_else(|| content.clone());
227 if sender.is_empty() {
228 conversation.history.push(HistoryEntry::user(&content));
229 } else {
230 conversation
231 .history
232 .push(HistoryEntry::user_with_sender(&content, &sender));
233 }
234
235 conversation.history.retain(|e| !e.auto_injected);
236
237 let framing = HistoryEntry::system(format!(
238 "You are joining a conversation as a guest. The primary agent is '{}'. \
239 Messages wrapped in <from agent=\"...\"> tags are from other agents. \
240 Respond as yourself to the user's latest message.",
241 agent_name
242 ))
243 .auto_injected();
244 let insert_pos = conversation.history.len().saturating_sub(1);
245 conversation.history.insert(insert_pos, framing);
246
247 let run_start = std::time::Instant::now();
248 let model_name = guest_agent.config.model.clone();
249
250 let mut messages = Vec::with_capacity(1 + conversation.history.len());
251 if !guest_agent.config.system_prompt.is_empty() {
252 messages.push(Message::system(&guest_agent.config.system_prompt));
253 }
254 messages.extend(conversation.history.iter().map(|e| e.to_wire_message()));
255
256 let request = ChatCompletionRequest {
257 model: model_name.clone(),
258 messages,
259 temperature: None,
260 top_p: None,
261 max_tokens: None,
262 stream: None,
263 stop: None,
264 tools: None,
265 tool_choice: None,
266 frequency_penalty: None,
267 presence_penalty: None,
268 seed: None,
269 user: None,
270 reasoning_effort: if guest_agent.config.thinking {
271 Some("high".to_string())
272 } else {
273 None
274 },
275 extra: Default::default(),
276 };
277
278 let mut response_text = String::new();
279 let mut reasoning = String::new();
280 {
281 let mut stream = std::pin::pin!(self.model.stream_ct(request));
282 while let Some(result) = stream.next().await {
283 match result {
284 Ok(chunk) => {
285 if let Some(text) = chunk.content() {
286 response_text.push_str(text);
287 yield AgentEvent::TextDelta(text.to_string());
288 }
289 if let Some(text) = chunk.reasoning_content() {
290 reasoning.push_str(text);
291 yield AgentEvent::ThinkingDelta(text.to_string());
292 }
293 }
294 Err(e) => {
295 yield AgentEvent::Done(AgentResponse {
296 final_response: None,
297 iterations: 1,
298 stop_reason: AgentStopReason::Error(e.to_string()),
299 steps: vec![],
300 model: model_name.clone(),
301 });
302 return;
303 }
304 }
305 }
306 }
307
308 let reasoning = if reasoning.is_empty() {
309 None
310 } else {
311 Some(reasoning)
312 };
313 let mut response_entry = HistoryEntry::assistant(&response_text, reasoning, None);
314 response_entry.agent = guest.clone();
315 conversation.history.push(response_entry);
316
317 self.finalize_run(
318 conversation_id,
319 &mut conversation,
320 conversation_mutex.clone(),
321 &agent_name,
322 &created_by,
323 run_start,
324 pre_run_len,
325 None,
326 &[],
327 );
328
329 yield AgentEvent::Done(AgentResponse {
330 final_response: Some(response_text),
331 iterations: 1,
332 stop_reason: AgentStopReason::TextResponse,
333 steps: vec![],
334 model: model_name,
335 });
336 }
337 }
338}