1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 env::{self},
4 fmt::{self, Debug},
5 fs,
6 str::FromStr,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_stream::stream;
12use chrono::Utc;
13use futures_util::StreamExt;
14use microagents_events::{
15 AgentEventAny, AssistantResponseEvent, DeltaType, SessionInitEvent, SessionInitType,
16 SessionStopEvent, SkillLoadEvent, StreamDeltaEvent, ToolCallEvent, ToolResultEvent, Usage,
17 UserPromptSubmitEvent, types::ToolResult,
18};
19use microagents_storage::{
20 jsonl::JsonlAgentStorage,
21 memory::InMemoryAgentStorage,
22 sqlite::SqliteAgentStorage,
23 types::{AgentStorage, AgentStorageChoice},
24};
25use serde_json::{Value, json};
26use thiserror::Error;
27use tokio::{sync::Semaphore, task::JoinSet};
28use ultrafast_models_sdk::{
29 ChatRequest, CircuitBreakerConfig, Message, ProviderConfig, Role, UltrafastClient,
30 cache::{CacheConfig, CacheType},
31 models::{Delta, FunctionCall, Tool, ToolCall},
32};
33
34use crate::{
35 common::{
36 JsonResult, call_tool, check_api_key, convert_event_to_message, estimate_tokens,
37 parse_json_fragment,
38 },
39 skills::{self, ensure_skill, find_skills, parse_skill},
40 types::{Agent, AgentError, GenerationStream, RunStream, ToolExecutionContext, ToolFunction},
41};
42
43pub const SKILLS_PATH: &str = ".agents/skills";
45pub const SKILLS_TOOL_NAME: &str = "skills";
47pub const GLOBAL_SKILLS_PATH: &str = "~/.agents/skills";
49pub const BASE_SYSTEM_PROMPT: &str = r#"<identity>
51You are MicroAgent, an AI agent whose purpose is to
52fulfil request coming from a user, employing the tools and skills
53available to you and interacting with the environment
54you are given
55</identity>
56<guidelines>
57<general>
58To carry out a task, follow the main rules of the Zen of Python whenever possible:
59- Beautiful is better than ugly.
60- Explicit is better than implicit.
61- Simple is better than complex.
62- Complex is better than complicated.
63- Flat is better than nested.
64- Readability counts.
65- Special cases aren't special enough to break the rules, although practicality beats purity.
66- Errors should never pass silently, unless explicitly silenced.
67- In the face of ambiguity, refuse the temptation to guess.
68- There should be one (and preferably only one) obvious way to do it.
69- If the implementation is hard to explain, it's a bad idea.
70- If the implementation is easy to explain, it _may_ be a good idea, but **it is not necessarily**.
71</general>
72<tools_and_skills_usage>
73Tools can be invoked by providing their name and an input conforming to their input JSON schema.
74Call tools either when requested by the user, or when the description of the tool seems compelling
75enough for the task at hand.
76You also have a special tool called 'skills'. When you want to access specialized knowledge over a
77particular area, you can invoke the skill pertaining to that area by calling the 'skills' tool and
78providing the name of the skill to it. The 'skills' tool will return the specific instructions for that
79skill. Invoke a skill either when directly prompted by the user to do so, or when the skill's description
80seems compelling enough for the task at hand.
81</tools_and_skills_usage>
82</guidelines>
83"#;
84const MAX_CONCURRENT_TOOL_CALLS: usize = 10;
87
88#[derive(Debug, Hash, PartialEq, Eq, Clone, Default)]
90pub enum SupportedProvider {
91 #[default]
92 OpenAI,
93 OpenRouter,
94 Ollama,
95 Groq,
96}
97
98impl FromStr for SupportedProvider {
99 type Err = MicroAgentBuilderError;
100 fn from_str(s: &str) -> Result<Self, Self::Err> {
101 match s {
102 "openai" => Ok(Self::OpenAI),
103 "openrouter" => Ok(Self::OpenRouter),
104 "ollama" => Ok(Self::Ollama),
105 "groq" => Ok(Self::Groq),
106 _ => Err(MicroAgentBuilderError::ProviderNotSupported(s.into())),
107 }
108 }
109}
110
111impl fmt::Display for SupportedProvider {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 let s = match self {
114 Self::OpenRouter => "openrouter",
115 Self::Groq => "groq",
116 Self::Ollama => "ollama",
117 Self::OpenAI => "openai",
118 };
119 write!(f, "{}", s)
120 }
121}
122
123impl SupportedProvider {
124 pub fn default_model(&self) -> &'static str {
126 match self {
127 SupportedProvider::OpenAI => "gpt-5.5",
129
130 SupportedProvider::Ollama => "llama3.2",
132
133 SupportedProvider::Groq => "llama-3.3-70b-versatile",
135
136 SupportedProvider::OpenRouter => "anthropic/claude-opus-4.7",
138 }
139 }
140}
141
142#[derive(Debug, Error)]
144pub enum MicroAgentBuilderError {
145 #[error("Skill {0} not found")]
146 SkillNotFound(String),
147 #[error("Skill parsing error")]
148 SkillParsingError(#[from] skills::SkillLoadingError),
149 #[error("Provider {0} not supported")]
150 ProviderNotSupported(String),
151 #[error("Tool with name {0} already exists")]
152 ToolAlreadyDefined(String),
153 #[error("Storage could not be loaded: {0}")]
154 StorageLoadError(String),
155 #[error("API key not found for provider {0}")]
156 APIKeyNotFoundError(String),
157}
158
159pub struct DebuggableClient(pub Arc<UltrafastClient>);
161
162impl Debug for DebuggableClient {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "UltrafastClient")
165 }
166}
167
168#[derive(Debug)]
173pub struct MicroAgent<Ctx> {
174 pub history: Vec<Message>,
175 pub tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>>,
176 pub skills: HashMap<String, String>,
177 pub provider: SupportedProvider,
178 pub model: String,
179 pub system: String,
180 client: Option<DebuggableClient>,
181 pub tool_context: Arc<ToolExecutionContext<Ctx>>,
182 pub storage: Box<dyn AgentStorage>,
183 pub parallel_tool_calls: bool,
184}
185
186#[derive(Debug)]
200pub struct MicroAgentBuilder<Ctx> {
201 tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>>,
202 skills: HashMap<String, String>,
203 provider: SupportedProvider,
204 model: String,
205 custom_instructions: String,
206 tool_context: Arc<ToolExecutionContext<Ctx>>,
207 pub storage: Box<dyn AgentStorage>,
208 pub parallel_tool_calls: bool,
209}
210
211impl<Ctx: Send + Sync + 'static> MicroAgentBuilder<Ctx> {
212 pub fn new(tool_context: ToolExecutionContext<Ctx>) -> Self {
216 Self {
217 tools: HashMap::from([(
218 "skills".to_string(),
219 Arc::new(SkillsTool) as Arc<dyn ToolFunction<Ctx>>,
220 )]),
221 skills: HashMap::new(),
222 provider: SupportedProvider::default(),
223 model: String::new(),
224 custom_instructions: String::new(),
225 tool_context: Arc::new(tool_context),
226 storage: Box::new(InMemoryAgentStorage::default()) as Box<dyn AgentStorage>,
227 parallel_tool_calls: false,
228 }
229 }
230
231 pub fn add_skill(mut self, skill_name: String) -> Result<Self, MicroAgentBuilderError> {
235 if let Some(skill_path) = ensure_skill(&skill_name) {
236 let description = parse_skill(&skill_path)?;
237 self.skills.insert(skill_name, description);
238 return Ok(self);
239 }
240 Err(MicroAgentBuilderError::SkillNotFound(skill_name))
241 }
242
243 pub fn find_skills(mut self) -> Result<Self, MicroAgentBuilderError> {
246 let loaded_skills = find_skills()?;
247 for (skill, des) in loaded_skills {
248 self.skills.insert(skill, des);
249 }
250 Ok(self)
251 }
252
253 pub fn provider(mut self, provider: String) -> Result<Self, MicroAgentBuilderError> {
255 let prov = SupportedProvider::from_str(&provider)?;
256 self.provider = prov;
257 Ok(self)
258 }
259
260 pub fn model(mut self, model: String) -> Self {
262 self.model = model;
263 self
264 }
265
266 pub fn parallel_tool_calls(mut self, parallel_tool_calls: bool) -> Self {
268 self.parallel_tool_calls = parallel_tool_calls;
269 self
270 }
271
272 pub async fn storage(
274 mut self,
275 storage: AgentStorageChoice,
276 ) -> Result<Self, MicroAgentBuilderError> {
277 match storage {
278 AgentStorageChoice::Jsonl => self.storage = Box::new(JsonlAgentStorage::default()),
279 AgentStorageChoice::Memory => self.storage = Box::new(InMemoryAgentStorage::default()),
280 AgentStorageChoice::Sqlite => {
281 let store = SqliteAgentStorage::new(None)
282 .await
283 .map_err(|e| MicroAgentBuilderError::StorageLoadError(e.to_string()))?;
284 self.storage = Box::new(store);
285 }
286 }
287
288 Ok(self)
289 }
290
291 pub fn add_tool(
293 mut self,
294 tool: Arc<dyn ToolFunction<Ctx>>,
295 ) -> Result<Self, MicroAgentBuilderError> {
296 self.tools.insert(tool.name().to_owned(), tool);
297 Ok(self)
298 }
299
300 pub fn custom_instructions(mut self, instructions: String) -> Self {
302 self.custom_instructions = instructions;
303 self
304 }
305
306 fn resolve_model(&self) -> String {
308 if self.model.is_empty() {
309 return self.provider.default_model().into();
310 }
311 self.model.clone()
312 }
313
314 fn resolve_system(&self, model: &str) -> String {
317 let mut base = BASE_SYSTEM_PROMPT.to_string();
318 base += &format!(
319 r#"<model>
320You are {} provided by {}
321</model>
322"#,
323 model, self.provider
324 );
325 if !self.tools.is_empty() {
326 base += "\n<tools>";
327 for (k, v) in &self.tools {
328 base += &format!(
329 "\n<tool>\n<name>{}</name>\n<description>{}</description>\n<input_schema>{}</input_schema>\n</tool>",
330 k,
331 v.description(),
332 v.input_schema()
333 )
334 }
335 base += "\n</tools>"
336 }
337 if !self.skills.is_empty() {
338 base += "\n<skills>";
339 for (k, v) in &self.skills {
340 base += &format!(
341 "\n<skill>\n<name>{}</name>\n<description>{}</description>\n</skill>",
342 k, v
343 );
344 }
345 base += "\n</skills>";
346 }
347 if !self.custom_instructions.is_empty() {
348 base += &format!(
349 "\n<additional_instructions>\n{}\n</additional_instructions>",
350 self.custom_instructions
351 )
352 }
353
354 base
355 }
356
357 #[must_use = "The builder needs to call `build` otherwise it hangs without turning into an actual agent."]
361 pub fn build(self) -> Result<MicroAgent<Ctx>, MicroAgentBuilderError> {
362 let model = self.resolve_model();
363 let system = self.resolve_system(&model);
364 match self.provider {
365 SupportedProvider::Groq => {
366 check_api_key("GROQ_API_KEY")
367 .map_err(|_| MicroAgentBuilderError::APIKeyNotFoundError("groq".into()))?;
368 }
369 SupportedProvider::OpenAI => {
370 check_api_key("OPENAI_API_KEY")
371 .map_err(|_| MicroAgentBuilderError::APIKeyNotFoundError("openai".into()))?;
372 }
373 SupportedProvider::OpenRouter => {
374 check_api_key("OPENROUTER_API_KEY").map_err(|_| {
375 MicroAgentBuilderError::APIKeyNotFoundError("openrouter".into())
376 })?;
377 }
378 _ => {}
379 }
380 Ok(MicroAgent {
381 history: vec![],
382 tools: self.tools,
383 skills: self.skills,
384 model,
385 provider: self.provider,
386 client: None,
387 system,
388 tool_context: self.tool_context,
389 storage: self.storage,
390 parallel_tool_calls: self.parallel_tool_calls,
391 })
392 }
393}
394
395impl<Ctx> MicroAgent<Ctx> {
396 fn init_client(&mut self) -> Result<Arc<UltrafastClient>, AgentError> {
400 if let Some(c) = self.client.as_ref() {
401 return Ok(c.0.clone());
402 }
403 let mut base_client = UltrafastClient::standalone();
404 base_client = match self.provider {
405 SupportedProvider::OpenRouter => {
406 base_client.with_openrouter(env::var("OPENROUTER_API_KEY")?)
407 }
408 SupportedProvider::OpenAI => base_client.with_openai(env::var("OPENAI_API_KEY")?),
409 SupportedProvider::Groq => base_client.with_groq(env::var("GROQ_API_KEY")?),
410 SupportedProvider::Ollama => base_client.with_provider(
411 "openai",
412 ProviderConfig {
413 base_url: Some(
414 env::var("OLLAMA_BASE_URL").unwrap_or("http://localhost:11434/v1".into()),
415 ),
416 api_key: "ollama".into(),
417 name: "openai".into(),
418 timeout: Duration::from_secs(300),
419 max_retries: 3,
420 retry_delay: Duration::from_millis(500),
421 rate_limit: None,
422 model_mapping: HashMap::new(),
423 headers: HashMap::new(),
424 enabled: true,
425 circuit_breaker: Some(CircuitBreakerConfig {
426 failure_threshold: 5,
427 recovery_timeout: Duration::from_secs(30),
428 request_timeout: Duration::from_secs(10),
429 half_open_max_calls: 3,
430 }),
431 },
432 ),
433 };
434 let client = base_client
435 .with_routing_strategy(ultrafast_models_sdk::RoutingStrategy::Single)
436 .with_cache(CacheConfig {
437 enabled: true,
438 ttl: Duration::from_secs(600),
439 max_size: 1000,
440 cache_type: CacheType::InMemory,
441 })
442 .build()
443 .map_err(|e| AgentError::ClientInitFailed(e.to_string()))?;
444 let arcc = Arc::new(client);
445 self.client = Some(DebuggableClient(arcc.clone()));
446 Ok(arcc)
447 }
448}
449
450#[derive(Debug)]
452pub struct SkillsTool;
453
454#[async_trait::async_trait]
455impl<Ctx: Send + Sync + 'static> ToolFunction<Ctx> for SkillsTool {
456 fn name(&self) -> &'static str {
457 SKILLS_TOOL_NAME
458 }
459
460 fn description(&self) -> &'static str {
461 "Call this tool to load a skill, providing the name of the skill you are invoking"
462 }
463
464 fn input_schema(&self) -> serde_json::Value {
465 json!({
466 "type": "object",
467 "required": [
468 "skill_name"
469 ],
470 "properties": {
471 "skill_name": {
472 "type": "string",
473 "description": "Name of the skill to load"
474 }
475 }
476 })
477 }
478
479 async fn execute(
480 &self,
481 input: Value,
482 _ctx: &Arc<ToolExecutionContext<Ctx>>,
483 ) -> Result<ToolResult, AgentError> {
484 let skill_name = input["skill_name"]
485 .as_str()
486 .ok_or_else(|| AgentError::ToolCallError("missing skill_name".into()))?;
487 let skill_path = ensure_skill(skill_name);
488 if let Some(p) = skill_path {
489 let content = fs::read_to_string(p.join("SKILL.md")).map_err(|e| {
490 AgentError::ToolCallError(format!("Skill {skill_name} could not be read: {}", e))
491 })?;
492 return Ok(ToolResult::Ok(content));
493 }
494 Ok(ToolResult::Err(format!(
495 "Skill {skill_name} could not be found"
496 )))
497 }
498}
499
500#[async_trait::async_trait]
501impl<Ctx: Send + Sync + 'static> Agent for MicroAgent<Ctx> {
502 async fn generate(&mut self) -> Result<GenerationStream, AgentError> {
508 let client = self.init_client()?;
509 let tools: Vec<Tool> = self.tools.values().map(|t| t.to_sdk_tool()).collect();
510 let stream = client
511 .stream_chat_completion(ChatRequest {
512 model: self.model.clone(),
513 messages: self.history.clone(),
514 temperature: None,
515 stream: Some(true),
516 max_tokens: None,
517 tools: Some(tools),
518 tool_choice: Some(ultrafast_models_sdk::models::ToolChoice::Auto),
519 top_p: None,
520 frequency_penalty: None,
521 user: None,
522 presence_penalty: None,
523 stop: None,
524 })
525 .await
526 .map_err(|e| AgentError::GenerationError(e.to_string()))?;
527 let mapped =
528 stream.map(|item| item.map_err(|e| AgentError::GenerationError(e.to_string())));
529 return Ok(Box::pin(mapped));
530 }
531
532 async fn run(
539 mut self,
540 prompt: String,
541 session_id: Option<String>,
542 ) -> Result<RunStream, AgentError> {
543 let local_tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>> = self.tools.clone();
544 let mut input_text = self.system.clone();
545 let mut completion_text = String::new();
546 let start_processing = Utc::now();
547 let s: RunStream = Box::pin(stream! {
548 let resolved_sid;
549 let messages: Vec<Message> = if let Some(sid) = session_id {
550 let ev = AgentEventAny::SessionInit(SessionInitEvent {
551 session_id: sid.clone(),
552 model: self.model.clone(),
553 system: self.system.clone(),
554 provider: self.provider.to_string(),
555 init_type: SessionInitType::Resume,
556 timestamp: Utc::now(),
557 });
558 yield Ok(ev);
559
560 let events_res = self
561 .storage
562 .get_session(&sid)
563 .await
564 .map_err(|e| AgentError::SessionLoadError(e.to_string()));
565
566 let events = match events_res {
567 Ok(e) => e,
568 Err(e) => {
569 yield Err(AgentError::RunError(format!("Error while getting the session: {}", e)));
570 return;
571 }
572 };
573
574 resolved_sid = sid;
575
576 events
577 .iter()
578 .filter_map(|e| convert_event_to_message(e.clone()))
579 .collect()
580 } else {
581 let sid = uuid::Uuid::new_v4().to_string();
582 let sint = SessionInitEvent {
583 session_id: sid.clone(),
584 model: self.model.clone(),
585 system: self.system.clone(),
586 provider: self.provider.to_string(),
587 init_type: SessionInitType::Start,
588 timestamp: Utc::now(),
589 };
590 resolved_sid = sid;
591 let ev = AgentEventAny::SessionInit(sint.clone());
592 match self.storage.create_session(sint).await {
593 Ok(_) => {},
594 Err(e) => {
595 yield Err(AgentError::RunError(format!("An error occurred while creating the session in the storage: {}", e)));
596 return;
597 }
598 }
599 yield Ok(ev);
600 vec![]
601 };
602 self.history = messages;
603 self.history.insert(0, Message { role: Role::System, content: self.system.clone(), name: None, tool_calls: None, tool_call_id: None });
604 self.history.push(Message {
605 role: Role::User,
606 content: prompt.to_owned(),
607 name: None,
608 tool_calls: None,
609 tool_call_id: None,
610 });
611 input_text += &prompt;
612 let turn_id = uuid::Uuid::new_v4().to_string();
613 let user_prompt_submit = AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
614 session_id: resolved_sid.clone(),
615 turn_id: turn_id.clone(),
616 prompt,
617 timestamp: Utc::now(),
618 });
619 match self.storage.update_session(user_prompt_submit.clone()).await {
620 Ok(_) => {},
621 Err(e) => {
622 yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
623 return;
624 }
625 };
626 yield Ok(user_prompt_submit);
627
628 loop {
629 let mut generation = match self.generate().await {
630 Ok(g) => g,
631 Err(e) => {
632 yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
633 return;
634 }
635 };
636 let mut text = String::new();
637 let mut tool_messages: Vec<Message> = vec![];
638 let mut tool_calls: BTreeMap<u32, (String, String, String)> = BTreeMap::new();
639 while let Some(g) = generation.next().await {
640 match g {
641 Ok(chunk) => {
642 let mut deltas: Vec<(u32, Delta)> = vec![];
643 for choice in chunk.choices {
644 deltas.push((choice.index, choice.delta));
645 }
646 deltas.sort_by_key(|a| a.0);
647 for (_, delta) in deltas {
648 if let Some(c) = delta.content {
649 text += &c;
650 completion_text += &c;
651 let ev = AgentEventAny::StreamDelta(StreamDeltaEvent {
652 session_id: resolved_sid.clone(),
653 turn_id: turn_id.clone(),
654 delta: c,
655 delta_type: DeltaType::Text,
656 timestamp: Utc::now(),
657 });
658 match self.storage.update_session(ev.clone()).await {
659 Ok(_) => {},
660 Err(e) => {
661 yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
662 return;
663 }
664 }
665 yield Ok(ev);
666 }
667 if let Some(tcs) = delta.tool_calls {
668 for tc in tcs {
669 if let Some(func) = tc.function {
670 if let Some(tid) = tc.id && let Some(name) = func.name {
671 tool_calls.entry(tc.index).or_insert((tid, name, String::new()));
673 }
674 if let Some(args) = func.arguments {
676 tool_calls.entry(tc.index).and_modify(|v| v.2 += &args);
677 completion_text += &args;
678 }
679 }
680 }
681 }
682 }
683 },
684 Err(e) => {
685 let latency = (Utc::now() - start_processing).num_milliseconds();
686 let stop_ev = AgentEventAny::SessionStop(SessionStopEvent { session_id: resolved_sid.clone(), success: false, result: None, error: Some(e.to_string()), timestamp: Utc::now(), usage: Usage {
687 latency,
688 ..Default::default()
689 }});
690 match self.storage.update_session(stop_ev.clone()).await {
691 Ok(_) => {},
692 Err(e) => {
693 yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
694 return;
695 }
696 }
697 yield Ok(stop_ev);
698 return;
699 }
700 }
701 }
702
703 if tool_calls.is_empty() {
704 let latency = (Utc::now() - start_processing).num_milliseconds();
705 let input_tokens = estimate_tokens(&input_text).unwrap_or_default();
706 let output_tokens = estimate_tokens(&completion_text).unwrap_or_default();
707 let ev = AgentEventAny::AssistantResponse(AssistantResponseEvent {
708 session_id: resolved_sid.clone(),
709 turn_id: turn_id.clone(),
710 full_text: text.clone(),
711 tool_calls: None,
712 timestamp: Utc::now(),
713 });
714 let stop_ev = AgentEventAny::SessionStop(SessionStopEvent {
715 session_id: resolved_sid.clone(),
716 success: true,
717 result: Some(text),
718 error: None,
719 timestamp: Utc::now(),
720 usage: Usage {
721 latency,
722 output_chars: completion_text.len(),
723 input_chars: input_text.len(),
724 estimated_output_tokens: output_tokens,
725 estimated_input_tokens: input_tokens,
726 }
727 });
728 match self.storage.update_session(ev.clone()).await {
729 Ok(_) => {},
730 Err(e) => {
731 yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
732 return;
733 }
734 }
735 match self.storage.update_session(stop_ev.clone()).await {
736 Ok(_) => {},
737 Err(e) => {
738 yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
739 return;
740 }
741 }
742 yield Ok(ev);
743 yield Ok(stop_ev);
744 return;
745 }
746
747 let mut to_pop = HashSet::new();
748 let mut to_call = JoinSet::new();
749 let tool_ctx = self.tool_context.clone();
750 let concurrency = if !self.parallel_tool_calls {
751 1
752 } else {
753 MAX_CONCURRENT_TOOL_CALLS
754 };
755 let semaphore = Arc::new(Semaphore::new(concurrency));
756 for (tid, name, args) in tool_calls.values() {
757 match parse_json_fragment(args) {
758 JsonResult::Valid(v) => {
759 let tool = local_tools.get(name);
760 if let Some(t) = tool {
761 let tool_name = name.clone();
762 let tc_ev = if tool_name != SKILLS_TOOL_NAME {
763 AgentEventAny::ToolCall(ToolCallEvent {
764 session_id: resolved_sid.clone(),
765 turn_id: turn_id.clone(),
766 name: tool_name,
767 input: v.clone(),
768 timestamp: Utc::now(),
769 })
770 } else {
771 AgentEventAny::SkillLoad(SkillLoadEvent {
772 session_id: resolved_sid.clone(),
773 turn_id: turn_id.clone(),
774 skill_name: v["skill_name"].as_str().unwrap_or_default().to_string(),
775 timestamp: Utc::now(),
776 })
777 };
778 match self.storage.update_session(tc_ev.clone()).await {
779 Ok(_) => {},
780 Err(e) => {
781 yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
782 return;
783 }
784 }
785 yield Ok(tc_ev);
786 let permit_res = semaphore.clone().acquire_owned().await;
787 let permit = match permit_res {
788 Ok(p) => p,
789 Err(e) => {
790 yield Err(AgentError::RunError(format!("Error while acquiring semaphore: {}", e)));
791 return;
792 }
793 };
794 let t = t.clone();
795 let tool_call_id = tid.clone();
796 let ctx = tool_ctx.clone();
797 to_call.spawn(async move {
798 let _permit = permit;
799 let result = call_tool(t, v, ctx).await;
800 match result {
801 Ok(r) => Ok((tool_call_id, r)),
802 Err(e) => Err(e)
803 }
804 });
805 }
806 },
807 JsonResult::Incomplete => {},
808 JsonResult::Malformed => {
809 to_pop.insert(tid.clone());
810 }
811 }
812 }
813 while let Some(res) = to_call.join_next().await {
814 match res {
815 Ok(Ok((tid, tool_result))) => {
816 let ev = AgentEventAny::ToolResult(ToolResultEvent {
817 session_id: resolved_sid.clone(),
818 turn_id: turn_id.clone(),
819 result: tool_result.clone(),
820 tool_call_id: tid.clone(),
821 timestamp: Utc::now(),
822 });
823 match self.storage.update_session(ev.clone()).await {
824 Ok(_) => {},
825 Err(e) => {
826 yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
827 return;
828 }
829 }
830 yield Ok(ev);
831 let content = match tool_result {
832 ToolResult::Ok(r) => {
833 format!("Tool succeeded: {r}")
834 },
835 ToolResult::Err(r) => {
836 format!("Tool failed: {r}")
837 },
838 _ => unreachable!("ToolResult should not reach this branch")
839 };
840 input_text += &content;
841 tool_messages.push(Message { role: Role::Tool, content, name: None, tool_calls: None, tool_call_id: Some(tid) });
842 }
843 Ok(Err(e)) => {
844 yield Err(AgentError::RunError(format!("Tool call failed: {}", e)));
845 }
846 Err(e) => {
847 yield Err(AgentError::RunError(format!("Task join failed: {}", e)));
848 }
849 }
850 }
851
852 self.history.push(Message {
853 role: Role::Assistant,
854 content: std::mem::take(&mut text),
855 name: None,
856 tool_calls: Some(tool_calls.iter().
857 filter(|(_, (tid, _, _))| !to_pop.contains(tid))
858 .map(|(_, (tid, name, args))| ToolCall {
859 call_type: "function".into(),
860 id: tid.clone(),
861 function: FunctionCall {
862 name: name.clone(),
863 arguments: args.clone(),
864 }
865 }).collect()),
866 tool_call_id: None,
867 });
868 self.history.extend(tool_messages);
869 }
870 });
871 Ok(s)
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878 use crate::types::{
879 Agent, AgentError, GenerationStream, RunStream, ToolExecutionContext, ToolFunction,
880 };
881 use async_stream::stream;
882 use futures_util::StreamExt;
883 use microagents_events::types::ToolResult;
884 use serde_json::Value;
885 use std::sync::Arc;
886
887 #[derive(Debug)]
892 struct DummyAgent {
893 pub generate_called: bool,
894 pub run_called: bool,
895 pub last_prompt: Option<String>,
896 pub last_session_id: Option<String>,
897 }
898
899 impl DummyAgent {
900 fn new() -> Self {
901 Self {
902 generate_called: false,
903 run_called: false,
904 last_prompt: None,
905 last_session_id: None,
906 }
907 }
908 }
909
910 #[async_trait::async_trait]
911 impl Agent for DummyAgent {
912 async fn generate(&mut self) -> Result<GenerationStream, AgentError> {
913 self.generate_called = true;
914 let s = stream! {
915 yield Ok(ultrafast_models_sdk::models::StreamChunk {
916 id: "1".into(),
917 object: "chat.completion.chunk".into(),
918 created: 0,
919 model: "dummy".into(),
920 choices: vec![],
921 });
922 };
923 Ok(Box::pin(s))
924 }
925
926 async fn run(
927 mut self,
928 prompt: String,
929 session_id: Option<String>,
930 ) -> Result<RunStream, AgentError> {
931 self.run_called = true;
932 self.last_prompt = Some(prompt.clone());
933 self.last_session_id = session_id.clone();
934 let s = stream! {
935 yield Ok(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
936 session_id: session_id.unwrap_or_else(|| "new".into()),
937 turn_id: "t1".into(),
938 prompt,
939 timestamp: Utc::now(),
940 }));
941 };
942 Ok(Box::pin(s))
943 }
944 }
945
946 #[derive(Debug)]
951 struct DummyTool;
952
953 #[async_trait::async_trait]
954 impl ToolFunction<()> for DummyTool {
955 fn name(&self) -> &'static str {
956 "dummy"
957 }
958 fn description(&self) -> &'static str {
959 "A dummy tool"
960 }
961 fn input_schema(&self) -> Value {
962 json!({"type": "object"})
963 }
964 async fn execute(
965 &self,
966 _input: Value,
967 _ctx: &Arc<ToolExecutionContext<()>>,
968 ) -> Result<ToolResult, AgentError> {
969 Ok(ToolResult::Ok("done".into()))
970 }
971 }
972
973 #[test]
978 fn test_builder_default_provider_is_openai() {
979 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
980 assert_eq!(builder.provider, SupportedProvider::OpenAI);
981 }
982
983 #[test]
984 fn test_builder_default_model_is_empty() {
985 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
986 assert!(builder.model.is_empty());
987 }
988
989 #[test]
990 fn test_builder_default_skills_is_empty() {
991 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
992 assert!(builder.skills.is_empty());
993 }
994
995 #[test]
996 fn test_builder_default_tools_contains_skills_tool() {
997 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
998 assert!(builder.tools.contains_key("skills"));
999 assert_eq!(builder.tools.len(), 1);
1000 }
1001
1002 #[test]
1003 fn test_builder_default_parallel_tool_calls_is_false() {
1004 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
1005 assert!(!builder.parallel_tool_calls);
1006 }
1007
1008 #[test]
1013 fn test_builder_provider_sets_provider() {
1014 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1015 .provider("groq".into())
1016 .unwrap();
1017 assert_eq!(builder.provider, SupportedProvider::Groq);
1018 }
1019
1020 #[test]
1021 fn test_builder_provider_invalid_returns_error() {
1022 let result =
1023 MicroAgentBuilder::new(ToolExecutionContext::new(())).provider("unknown".into());
1024 assert!(result.is_err());
1025 assert!(
1026 matches!(
1027 result.unwrap_err(),
1028 MicroAgentBuilderError::ProviderNotSupported(_)
1029 ),
1030 "expected ProviderNotSupported error"
1031 );
1032 }
1033
1034 #[test]
1035 fn test_builder_model_sets_model() {
1036 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(())).model("gpt-5.5".into());
1037 assert_eq!(builder.model, "gpt-5.5");
1038 }
1039
1040 #[test]
1041 fn test_builder_parallel_tool_calls_sets_flag() {
1042 let builder =
1043 MicroAgentBuilder::new(ToolExecutionContext::new(())).parallel_tool_calls(true);
1044 assert!(builder.parallel_tool_calls);
1045 }
1046
1047 #[test]
1048 fn test_builder_custom_instructions_sets_instructions() {
1049 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1050 .custom_instructions("Be concise".into());
1051 assert_eq!(builder.custom_instructions, "Be concise");
1052 }
1053
1054 #[test]
1055 fn test_builder_add_tool_increments_tools() {
1056 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1057 .add_tool(Arc::new(DummyTool))
1058 .unwrap();
1059 assert_eq!(builder.tools.len(), 2);
1060 assert!(builder.tools.contains_key("dummy"));
1061 }
1062
1063 #[tokio::test]
1064 async fn test_builder_storage_sets_jsonl() {
1065 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1066 .storage(AgentStorageChoice::Jsonl)
1067 .await
1068 .unwrap();
1069 let _agent = builder.build().expect("Should be able to build the agent");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_builder_storage_sets_memory() {
1075 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1076 .storage(AgentStorageChoice::Memory)
1077 .await
1078 .unwrap();
1079 let _agent = builder.build().expect("Should be able to build the agent");
1080 }
1081
1082 #[tokio::test]
1083 async fn test_builder_storage_sets_sqlite() {
1084 let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1085 .storage(AgentStorageChoice::Sqlite)
1086 .await
1087 .unwrap();
1088 let _agent = builder.build().expect("Should be able to build the agent");
1089 }
1090
1091 #[test]
1096 fn test_build_sets_empty_history() {
1097 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1098 .provider("ollama".into())
1099 .unwrap()
1100 .build()
1101 .expect("Should be able to build the agent");
1102 assert!(agent.history.is_empty());
1103 }
1104
1105 #[test]
1106 fn test_build_sets_tools_on_agent() {
1107 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1108 .provider("ollama".into())
1109 .unwrap()
1110 .add_tool(Arc::new(DummyTool))
1111 .unwrap()
1112 .build()
1113 .expect("Should be able to build the agent");
1114 assert_eq!(agent.tools.len(), 2);
1115 }
1116
1117 #[test]
1118 fn test_build_sets_provider_on_agent() {
1119 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1120 .provider("ollama".into())
1121 .unwrap()
1122 .build()
1123 .expect("Should be able to build the agent");
1124 assert_eq!(agent.provider, SupportedProvider::Ollama);
1125 }
1126
1127 #[test]
1128 fn test_build_sets_model_on_agent() {
1129 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1130 .provider("ollama".into())
1131 .unwrap()
1132 .model("llama3.2".into())
1133 .build()
1134 .expect("Should be able to build the agent");
1135 assert_eq!(agent.model, "llama3.2");
1136 }
1137
1138 #[test]
1139 fn test_build_sets_parallel_tool_calls_on_agent() {
1140 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1141 .provider("ollama".into())
1142 .unwrap()
1143 .parallel_tool_calls(true)
1144 .build()
1145 .expect("Should be able to build the agent");
1146 assert!(agent.parallel_tool_calls);
1147 }
1148
1149 #[test]
1150 fn test_build_system_prompt_contains_base() {
1151 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1152 .provider("ollama".into())
1153 .unwrap()
1154 .build()
1155 .expect("Should be able to build the agent");
1156 assert!(agent.system.contains("You are MicroAgent"));
1157 }
1158
1159 #[test]
1160 fn test_build_system_prompt_contains_tools() {
1161 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1162 .provider("ollama".into())
1163 .unwrap()
1164 .add_tool(Arc::new(DummyTool))
1165 .unwrap()
1166 .build()
1167 .expect("Should be able to build the agent");
1168 assert!(agent.system.contains("<tools>"));
1169 assert!(agent.system.contains("<name>dummy</name>"));
1170 }
1171
1172 #[test]
1173 fn test_build_system_prompt_contains_default_model_when_model_empty() {
1174 let original_value = std::env::var("OPENAI_API_KEY").unwrap_or_default();
1175 unsafe {
1176 std::env::set_var("OPENAI_API_KEY", "test");
1177 }
1178 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1179 .build()
1180 .expect("Should be able to build the agent");
1181 assert!(agent.system.contains("gpt-5.5"));
1183 unsafe {
1184 std::env::set_var("OPENAI_API_KEY", original_value);
1185 }
1186 }
1187
1188 #[test]
1189 fn test_build_system_prompt_contains_custom_model_when_set() {
1190 let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1191 .provider("ollama".into())
1192 .unwrap()
1193 .model("custom-model".into())
1194 .build()
1195 .expect("Should be able to build the agent");
1196 assert!(agent.system.contains("custom-model"));
1197 assert!(!agent.system.contains("llama-3.2"));
1198 }
1199
1200 #[test]
1201 fn test_agent_fails_to_build_if_not_api_key() {
1202 let result = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1203 .provider("groq".into())
1204 .unwrap()
1205 .build();
1206 assert!(result.is_err_and(|e| matches!(e, MicroAgentBuilderError::APIKeyNotFoundError(_))));
1207 }
1208
1209 #[test]
1214 fn test_supported_provider_from_str_valid() {
1215 assert_eq!(
1216 SupportedProvider::from_str("openai").unwrap(),
1217 SupportedProvider::OpenAI
1218 );
1219 assert_eq!(
1220 SupportedProvider::from_str("openrouter").unwrap(),
1221 SupportedProvider::OpenRouter
1222 );
1223 assert_eq!(
1224 SupportedProvider::from_str("ollama").unwrap(),
1225 SupportedProvider::Ollama
1226 );
1227 assert_eq!(
1228 SupportedProvider::from_str("groq").unwrap(),
1229 SupportedProvider::Groq
1230 );
1231 }
1232
1233 #[test]
1234 fn test_supported_provider_from_str_invalid() {
1235 assert!(SupportedProvider::from_str("azure").is_err());
1236 }
1237
1238 #[test]
1239 fn test_supported_provider_display() {
1240 assert_eq!(SupportedProvider::OpenAI.to_string(), "openai");
1241 assert_eq!(SupportedProvider::OpenRouter.to_string(), "openrouter");
1242 assert_eq!(SupportedProvider::Ollama.to_string(), "ollama");
1243 assert_eq!(SupportedProvider::Groq.to_string(), "groq");
1244 }
1245
1246 #[test]
1247 fn test_supported_provider_default_model() {
1248 assert_eq!(SupportedProvider::OpenAI.default_model(), "gpt-5.5");
1249 assert_eq!(SupportedProvider::Ollama.default_model(), "llama3.2");
1250 assert_eq!(
1251 SupportedProvider::Groq.default_model(),
1252 "llama-3.3-70b-versatile"
1253 );
1254 assert_eq!(
1255 SupportedProvider::OpenRouter.default_model(),
1256 "anthropic/claude-opus-4.7"
1257 );
1258 }
1259
1260 #[test]
1261 fn test_supported_provider_default_is_openai() {
1262 let provider: SupportedProvider = Default::default();
1263 assert_eq!(provider, SupportedProvider::OpenAI);
1264 }
1265
1266 #[tokio::test]
1271 async fn test_dummy_agent_generate_sets_flag() {
1272 let mut agent = DummyAgent::new();
1273 assert!(!agent.generate_called);
1274 let _ = agent.generate().await;
1275 assert!(agent.generate_called);
1276 }
1277
1278 #[tokio::test]
1279 async fn test_dummy_agent_generate_returns_stream() {
1280 let mut agent = DummyAgent::new();
1281 let mut stream = agent.generate().await.unwrap();
1282 let item = stream.next().await;
1283 assert!(item.is_some());
1284 }
1285
1286 #[tokio::test]
1287 async fn test_dummy_agent_run_streams_prompt() {
1288 let agent = DummyAgent::new();
1289 let mut stream = agent
1290 .run("hello".into(), Some("sid-123".into()))
1291 .await
1292 .unwrap();
1293 let item = stream.next().await.unwrap().unwrap();
1296 match item {
1297 AgentEventAny::UserPromptSubmit(ev) => {
1298 assert_eq!(ev.prompt, "hello");
1299 assert_eq!(ev.session_id, "sid-123");
1300 }
1301 _ => panic!("expected UserPromptSubmit"),
1302 }
1303 }
1304
1305 #[tokio::test]
1306 async fn test_dummy_agent_run_with_none_session_id() {
1307 let agent = DummyAgent::new();
1308 let mut stream = agent.run("test".into(), None).await.unwrap();
1309 let item = stream.next().await.unwrap().unwrap();
1310 match item {
1311 AgentEventAny::UserPromptSubmit(ev) => {
1312 assert_eq!(ev.session_id, "new");
1313 }
1314 _ => panic!("expected UserPromptSubmit"),
1315 }
1316 }
1317
1318 #[tokio::test]
1319 async fn test_dummy_agent_run_stream_yields_single_event() {
1320 let agent = DummyAgent::new();
1321 let mut stream = agent.run("prompt".into(), None).await.unwrap();
1322 let first = stream.next().await;
1323 assert!(first.is_some());
1324 let second = stream.next().await;
1325 assert!(second.is_none());
1326 }
1327}