Skip to main content

crabtalk_runtime/engine/
execution.rs

1//! Execution — message sending and streaming through agents.
2
3use super::Runtime;
4use crate::{Config, Conversation, Env, Hook};
5use anyhow::Result;
6use async_stream::stream;
7use crabllm_core::{ChatCompletionRequest, Message, ToolChoice};
8use futures_core::Stream;
9use futures_util::StreamExt;
10use tokio::sync::{mpsc, watch};
11use wcore::{AgentEvent, AgentResponse, AgentStopReason, model::HistoryEntry};
12
13impl<C: Config> Runtime<C> {
14    fn prepare_history(
15        &self,
16        conversation: &mut Conversation,
17        agent: &str,
18        content: &str,
19        sender: &str,
20    ) {
21        let content = self
22            .env
23            .hook()
24            .preprocess(agent, content)
25            .unwrap_or_else(|| content.to_owned());
26        if sender.is_empty() {
27            conversation.history.push(HistoryEntry::user(&content));
28        } else {
29            conversation
30                .history
31                .push(HistoryEntry::user_with_sender(&content, sender));
32        }
33
34        conversation.history.retain(|e| !e.auto_injected);
35
36        let mut recall_msgs =
37            self.env
38                .hook()
39                .on_before_run(agent, conversation.id, &conversation.history);
40
41        // Layered instructions (Crab.md).
42        let cwd = self.env.effective_cwd(conversation.id);
43        if let Some(instructions) = self.env.discover_instructions(&cwd) {
44            recall_msgs.push(
45                HistoryEntry::user(format!("<instructions>\n{instructions}\n</instructions>"))
46                    .auto_injected(),
47            );
48        }
49
50        // Guest agent framing.
51        if conversation.history.iter().any(|e| !e.agent.is_empty()) {
52            recall_msgs.push(
53                HistoryEntry::user(
54                    "Messages wrapped in <from agent=\"...\"> tags are from guest agents \
55                     who were consulted in this conversation. Continue responding as yourself."
56                        .to_string(),
57                )
58                .auto_injected(),
59            );
60        }
61        if !recall_msgs.is_empty() {
62            let insert_pos = conversation.history.len().saturating_sub(1);
63            for (i, entry) in recall_msgs.into_iter().enumerate() {
64                conversation.history.insert(insert_pos + i, entry);
65            }
66        }
67    }
68
69    pub async fn send_to(
70        &self,
71        conversation_id: u64,
72        content: &str,
73        sender: &str,
74        tool_choice: Option<ToolChoice>,
75    ) -> Result<AgentResponse> {
76        let (agent_name, created_by, conversation_mutex) = self
77            .acquire_slot(conversation_id)
78            .await
79            .ok_or_else(|| anyhow::anyhow!("conversation {conversation_id} not found"))?;
80
81        let mut conversation = conversation_mutex.lock().await;
82        let pre_run_len = conversation.history.len();
83        self.prepare_history(&mut conversation, &agent_name, content, sender);
84        let agent = self
85            .resolve_agent(&agent_name)
86            .await
87            .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", agent_name))?;
88
89        let (tx, mut rx) = mpsc::unbounded_channel();
90        let run_start = std::time::Instant::now();
91        let response = agent
92            .run(&mut conversation.history, tx, None, tool_choice)
93            .await;
94
95        let mut compact_summary: Option<String> = None;
96        while let Ok(event) = rx.try_recv() {
97            if let AgentEvent::Compact { ref summary } = event {
98                compact_summary = Some(summary.clone());
99            }
100            self.env
101                .hook()
102                .on_event(&agent_name, conversation_id, &event);
103            self.env
104                .on_agent_event(&agent_name, conversation_id, &event);
105        }
106
107        self.finalize_run(
108            conversation_id,
109            &mut conversation,
110            conversation_mutex.clone(),
111            &agent_name,
112            &created_by,
113            run_start,
114            pre_run_len,
115            compact_summary,
116            &[],
117        );
118        Ok(response)
119    }
120
121    pub fn stream_to(
122        &self,
123        conversation_id: u64,
124        content: &str,
125        sender: &str,
126        tool_choice: Option<ToolChoice>,
127    ) -> impl Stream<Item = AgentEvent> + '_ {
128        let content = content.to_owned();
129        let sender = sender.to_owned();
130        stream! {
131            let Some((agent_name, created_by, conversation_mutex)) =
132                self.acquire_slot(conversation_id).await
133            else {
134                yield AgentEvent::Done(AgentResponse::error(
135                    format!("conversation {conversation_id} not found"),
136                ));
137                return;
138            };
139
140            let mut conversation = conversation_mutex.lock().await;
141            let pre_run_len = conversation.history.len();
142            self.prepare_history(&mut conversation, &agent_name, &content, &sender);
143            let Some(agent) = self.resolve_agent(&agent_name).await else {
144                yield AgentEvent::Done(AgentResponse::error(
145                    format!("agent '{}' not registered", agent_name),
146                ));
147                return;
148            };
149
150            let run_start = std::time::Instant::now();
151            let (steer_tx, steer_rx) = watch::channel(None::<String>);
152            self.steering.write().await.insert(conversation_id, steer_tx);
153            let mut compact_summary: Option<String> = None;
154            let mut done_event: Option<AgentEvent> = None;
155            let mut event_trace: Vec<wcore::EventLine> = Vec::new();
156            {
157                let mut event_stream = std::pin::pin!(agent.run_stream(&mut conversation.history, Some(conversation_id), Some(steer_rx), tool_choice));
158                while let Some(event) = event_stream.next().await {
159                    if let AgentEvent::Compact { ref summary } = event {
160                        compact_summary = Some(summary.clone());
161                    }
162                    self.env.hook().on_event(&agent_name, conversation_id, &event);
163                    self.env.on_agent_event(&agent_name, conversation_id, &event);
164                    if let Some(line) = wcore::EventLine::from_agent_event(&event) {
165                        event_trace.push(line);
166                    }
167                    if matches!(event, AgentEvent::Done(_)) {
168                        done_event = Some(event);
169                    } else {
170                        yield event;
171                    }
172                }
173            }
174            self.steering.write().await.remove(&conversation_id);
175            self.finalize_run(
176                conversation_id,
177                &mut conversation,
178                conversation_mutex.clone(),
179                &agent_name,
180                &created_by,
181                run_start,
182                pre_run_len,
183                compact_summary,
184                &event_trace,
185            );
186            if let Some(event) = done_event {
187                yield event;
188            }
189        }
190    }
191
192    pub fn guest_stream_to(
193        &self,
194        conversation_id: u64,
195        content: &str,
196        sender: &str,
197        guest: &str,
198    ) -> impl Stream<Item = AgentEvent> + '_ {
199        let content = content.to_owned();
200        let sender = sender.to_owned();
201        let guest = guest.to_owned();
202        stream! {
203            let Some(guest_agent) = self.resolve_agent(&guest).await else {
204                yield AgentEvent::Done(AgentResponse::error(
205                    format!("guest agent '{guest}' not registered"),
206                ));
207                return;
208            };
209
210            let Some((agent_name, created_by, conversation_mutex)) =
211                self.acquire_slot(conversation_id).await
212            else {
213                yield AgentEvent::Done(AgentResponse::error(
214                    format!("conversation {conversation_id} not found"),
215                ));
216                return;
217            };
218
219            let mut conversation = conversation_mutex.lock().await;
220            let pre_run_len = conversation.history.len();
221
222            let content = self
223                .env
224                .hook()
225                .preprocess(&agent_name, &content)
226                .unwrap_or_else(|| content.clone());
227            if sender.is_empty() {
228                conversation.history.push(HistoryEntry::user(&content));
229            } else {
230                conversation
231                    .history
232                    .push(HistoryEntry::user_with_sender(&content, &sender));
233            }
234
235            conversation.history.retain(|e| !e.auto_injected);
236
237            let framing = HistoryEntry::system(format!(
238                "You are joining a conversation as a guest. The primary agent is '{}'. \
239                 Messages wrapped in <from agent=\"...\"> tags are from other agents. \
240                 Respond as yourself to the user's latest message.",
241                agent_name
242            ))
243            .auto_injected();
244            let insert_pos = conversation.history.len().saturating_sub(1);
245            conversation.history.insert(insert_pos, framing);
246
247            let run_start = std::time::Instant::now();
248            let model_name = guest_agent.config.model.clone();
249
250            let mut messages = Vec::with_capacity(1 + conversation.history.len());
251            if !guest_agent.config.system_prompt.is_empty() {
252                messages.push(Message::system(&guest_agent.config.system_prompt));
253            }
254            messages.extend(conversation.history.iter().map(|e| e.to_wire_message()));
255
256            let request = ChatCompletionRequest {
257                model: model_name.clone(),
258                messages,
259                temperature: None,
260                top_p: None,
261                max_tokens: None,
262                stream: None,
263                stop: None,
264                tools: None,
265                tool_choice: None,
266                frequency_penalty: None,
267                presence_penalty: None,
268                seed: None,
269                user: None,
270                reasoning_effort: if guest_agent.config.thinking {
271                    Some("high".to_string())
272                } else {
273                    None
274                },
275                extra: Default::default(),
276            };
277
278            let mut response_text = String::new();
279            let mut reasoning = String::new();
280            {
281                let mut stream = std::pin::pin!(self.model.stream_ct(request));
282                while let Some(result) = stream.next().await {
283                    match result {
284                        Ok(chunk) => {
285                            if let Some(text) = chunk.content() {
286                                response_text.push_str(text);
287                                yield AgentEvent::TextDelta(text.to_string());
288                            }
289                            if let Some(text) = chunk.reasoning_content() {
290                                reasoning.push_str(text);
291                                yield AgentEvent::ThinkingDelta(text.to_string());
292                            }
293                        }
294                        Err(e) => {
295                            yield AgentEvent::Done(AgentResponse {
296                                final_response: None,
297                                iterations: 1,
298                                stop_reason: AgentStopReason::Error(e.to_string()),
299                                steps: vec![],
300                                model: model_name.clone(),
301                            });
302                            return;
303                        }
304                    }
305                }
306            }
307
308            let reasoning = if reasoning.is_empty() {
309                None
310            } else {
311                Some(reasoning)
312            };
313            let mut response_entry = HistoryEntry::assistant(&response_text, reasoning, None);
314            response_entry.agent = guest.clone();
315            conversation.history.push(response_entry);
316
317            self.finalize_run(
318                conversation_id,
319                &mut conversation,
320                conversation_mutex.clone(),
321                &agent_name,
322                &created_by,
323                run_start,
324                pre_run_len,
325                None,
326                &[],
327            );
328
329            yield AgentEvent::Done(AgentResponse {
330                final_response: Some(response_text),
331                iterations: 1,
332                stop_reason: AgentStopReason::TextResponse,
333                steps: vec![],
334                model: model_name,
335            });
336        }
337    }
338}