aether_core/core/
agent_builder.rs1use super::agent::{AgentConfig, AutoContinue, RetryConfig};
2use crate::agent_spec::AgentSpec;
3use crate::context::CompactionConfig;
4use crate::core::{Agent, Prompt, Result};
5use crate::events::{AgentMessage, UserMessage};
6use crate::mcp::run_mcp_task::McpCommand;
7use llm::parser::ModelProviderParser;
8use llm::types::IsoString;
9use llm::{ChatMessage, Context, StreamingModelProvider, ToolDefinition};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::mpsc::{self, Receiver, Sender};
13use tokio::task::JoinHandle;
14
15pub struct AgentHandle {
17 handle: JoinHandle<()>,
18}
19
20impl AgentHandle {
21 pub fn abort(&self) {
23 self.handle.abort();
24 }
25
26 pub fn is_finished(&self) -> bool {
28 self.handle.is_finished()
29 }
30
31 pub async fn await_completion(self) {
33 let _ = self.handle.await;
34 }
35}
36
37pub struct AgentBuilder {
38 llm: Arc<dyn StreamingModelProvider>,
39 prompts: Vec<Prompt>,
40 tool_definitions: Vec<ToolDefinition>,
41 initial_messages: Vec<ChatMessage>,
42 mcp_tx: Option<Sender<McpCommand>>,
43 channel_capacity: usize,
44 tool_timeout: Duration,
45 compaction_config: Option<CompactionConfig>,
46 max_auto_continues: u32,
47 retry_config: RetryConfig,
48 prompt_cache_key: Option<String>,
49}
50
51impl AgentBuilder {
52 pub fn new(llm: Arc<dyn StreamingModelProvider>) -> Self {
53 Self {
54 llm,
55 prompts: Vec::new(),
56 tool_definitions: Vec::new(),
57 initial_messages: Vec::new(),
58 mcp_tx: None,
59 channel_capacity: 1000,
60 tool_timeout: Duration::from_mins(20),
61 compaction_config: Some(CompactionConfig::default()),
62 max_auto_continues: 3,
63 retry_config: RetryConfig::default(),
64 prompt_cache_key: None,
65 }
66 }
67
68 pub async fn from_spec(spec: &AgentSpec, base_prompts: Vec<Prompt>) -> Result<Self> {
73 let (provider, _) = ModelProviderParser::default().parse(&spec.model).await?;
74 let mut builder = Self::new(Arc::from(provider));
75 for prompt in base_prompts {
76 builder = builder.system_prompt(prompt);
77 }
78 for prompt in &spec.prompts {
79 builder = builder.system_prompt(prompt.clone());
80 }
81 Ok(builder)
82 }
83
84 pub fn system_prompt(mut self, prompt: Prompt) -> Self {
88 self.prompts.push(prompt);
89 self
90 }
91
92 pub fn tools(mut self, tx: Sender<McpCommand>, tools: Vec<ToolDefinition>) -> Self {
93 self.tool_definitions = tools;
94 self.mcp_tx = Some(tx);
95 self
96 }
97
98 pub fn tool_timeout(mut self, timeout: Duration) -> Self {
105 self.tool_timeout = timeout;
106 self
107 }
108
109 pub fn compaction(mut self, config: CompactionConfig) -> Self {
130 self.compaction_config = Some(config);
131 self
132 }
133
134 pub fn disable_compaction(mut self) -> Self {
138 self.compaction_config = None;
139 self
140 }
141
142 pub fn max_auto_continues(mut self, max: u32) -> Self {
162 self.max_auto_continues = max;
163 self
164 }
165
166 pub fn retry(mut self, config: RetryConfig) -> Self {
168 self.retry_config = config;
169 self
170 }
171
172 pub fn prompt_cache_key(mut self, key: String) -> Self {
177 self.prompt_cache_key = Some(key);
178 self
179 }
180
181 pub fn messages(mut self, messages: Vec<ChatMessage>) -> Self {
185 self.initial_messages = messages;
186 self
187 }
188
189 pub async fn spawn(self) -> Result<(Sender<UserMessage>, Receiver<AgentMessage>, AgentHandle)> {
190 let mut messages = Vec::new();
191
192 if !self.prompts.is_empty() {
193 let system_content = Prompt::build_all(&self.prompts).await?;
194 if !system_content.is_empty() {
195 messages.push(ChatMessage::System { content: system_content, timestamp: IsoString::now() });
196 }
197 }
198
199 messages.extend(self.initial_messages);
200
201 let (user_message_tx, user_message_rx) = mpsc::channel::<UserMessage>(self.channel_capacity);
202
203 let (message_tx, agent_message_rx) = mpsc::channel::<AgentMessage>(self.channel_capacity);
204
205 let mut context = Context::new(messages, self.tool_definitions);
206 context.set_prompt_cache_key(self.prompt_cache_key);
207
208 let config = AgentConfig {
209 llm: self.llm,
210 context,
211 mcp_command_tx: self.mcp_tx,
212 tool_timeout: self.tool_timeout,
213 compaction_config: self.compaction_config,
214 auto_continue: AutoContinue::new(self.max_auto_continues),
215 retry_config: self.retry_config,
216 };
217
218 let agent = Agent::new(config, user_message_rx, message_tx);
219
220 let agent_handle = tokio::spawn(agent.run());
221
222 Ok((user_message_tx, agent_message_rx, AgentHandle { handle: agent_handle }))
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::agent_spec::{AgentSpecExposure, ToolFilter};
230
231 #[tokio::test]
232 async fn test_agent_handle_is_finished() {
233 let handle = AgentHandle { handle: tokio::spawn(async {}) };
234 handle.await_completion().await;
235 }
236
237 #[tokio::test]
238 async fn test_agent_handle_abort() {
239 let handle = AgentHandle {
240 handle: tokio::spawn(async {
241 tokio::time::sleep(Duration::from_mins(1)).await;
242 }),
243 };
244 assert!(!handle.is_finished());
245 handle.abort();
246 tokio::time::sleep(Duration::from_millis(10)).await;
248 assert!(handle.is_finished());
249 }
250
251 #[tokio::test]
252 async fn system_prompt_preserves_add_order() {
253 let builder = AgentBuilder::new(Arc::new(llm::testing::FakeLlmProvider::new(vec![])))
254 .system_prompt(Prompt::text("first"))
255 .system_prompt(Prompt::text("second"))
256 .system_prompt(Prompt::text("third"));
257
258 let rendered = Prompt::build_all(&builder.prompts).await.unwrap();
259
260 assert_eq!(rendered, "first\n\nsecond\n\nthird");
261 }
262
263 #[tokio::test]
264 async fn from_spec_accepts_alloy_model_specs() {
265 let spec = AgentSpec {
266 name: "alloy".to_string(),
267 description: "alloy".to_string(),
268 model: "ollama:llama3.2,llamacpp:local".to_string(),
269 reasoning_effort: None,
270 prompts: vec![],
271 mcp_config_sources: Vec::new(),
272 exposure: AgentSpecExposure::both(),
273 tools: ToolFilter::default(),
274 };
275
276 let builder = AgentBuilder::from_spec(&spec, vec![]).await;
277 assert!(builder.is_ok());
278 }
279}