Skip to main content

microagents_core/
agent.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    env::{self},
4    fmt::{self, Debug},
5    fs,
6    str::FromStr,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_stream::stream;
12use chrono::Utc;
13use futures_util::StreamExt;
14use microagents_events::{
15    AgentEventAny, AssistantResponseEvent, DeltaType, SessionInitEvent, SessionInitType,
16    SessionStopEvent, SkillLoadEvent, StreamDeltaEvent, ToolCallEvent, ToolResultEvent, Usage,
17    UserPromptSubmitEvent, types::ToolResult,
18};
19use microagents_storage::{
20    jsonl::JsonlAgentStorage,
21    memory::InMemoryAgentStorage,
22    sqlite::SqliteAgentStorage,
23    types::{AgentStorage, AgentStorageChoice},
24};
25use serde_json::{Value, json};
26use thiserror::Error;
27use tokio::{sync::Semaphore, task::JoinSet};
28use ultrafast_models_sdk::{
29    ChatRequest, CircuitBreakerConfig, Message, ProviderConfig, Role, UltrafastClient,
30    cache::{CacheConfig, CacheType},
31    models::{Delta, FunctionCall, Tool, ToolCall},
32};
33
34use crate::{
35    common::{
36        JsonResult, call_tool, check_api_key, convert_event_to_message, estimate_tokens,
37        parse_json_fragment,
38    },
39    skills::{self, ensure_skill, find_skills, parse_skill},
40    types::{Agent, AgentError, GenerationStream, RunStream, ToolExecutionContext, ToolFunction},
41};
42
43/// Relative path to the project-local skills directory.
44pub const SKILLS_PATH: &str = ".agents/skills";
45/// Name of the built-in skill-loading tool exposed to the LLM.
46pub const SKILLS_TOOL_NAME: &str = "skills";
47/// Path alias for the global skills directory (resolved at runtime).
48pub const GLOBAL_SKILLS_PATH: &str = "~/.agents/skills";
49/// Base system prompt injected into every conversation.
50pub const BASE_SYSTEM_PROMPT: &str = r#"<identity>
51You are MicroAgent, an AI agent whose purpose is to
52fulfil request coming from a user, employing the tools and skills
53available to you and interacting with the environment
54you are given
55</identity>
56<guidelines>
57<general>
58To carry out a task, follow the main rules of the Zen of Python whenever possible:
59- Beautiful is better than ugly.
60- Explicit is better than implicit.
61- Simple is better than complex.
62- Complex is better than complicated.
63- Flat is better than nested.
64- Readability counts.
65- Special cases aren't special enough to break the rules, although practicality beats purity.
66- Errors should never pass silently, unless explicitly silenced.
67- In the face of ambiguity, refuse the temptation to guess.
68- There should be one (and preferably only one) obvious way to do it.
69- If the implementation is hard to explain, it's a bad idea.
70- If the implementation is easy to explain, it _may_ be a good idea, but **it is not necessarily**.
71</general>
72<tools_and_skills_usage>
73Tools can be invoked by providing their name and an input conforming to their input JSON schema.
74Call tools either when requested by the user, or when the description of the tool seems compelling
75enough for the task at hand.
76You also have a special tool called 'skills'. When you want to access specialized knowledge over a
77particular area, you can invoke the skill pertaining to that area by calling the 'skills' tool and
78providing the name of the skill to it. The 'skills' tool will return the specific instructions for that
79skill. Invoke a skill either when directly prompted by the user to do so, or when the skill's description
80seems compelling enough for the task at hand.
81</tools_and_skills_usage>
82</guidelines>
83"#;
84/// Maximum number of tool calls executed concurrently when
85/// `parallel_tool_calls` is enabled.
86const MAX_CONCURRENT_TOOL_CALLS: usize = 10;
87
88/// Supported LLM providers.
89#[derive(Debug, Hash, PartialEq, Eq, Clone, Default)]
90pub enum SupportedProvider {
91    #[default]
92    OpenAI,
93    OpenRouter,
94    Ollama,
95    Groq,
96}
97
98impl FromStr for SupportedProvider {
99    type Err = MicroAgentBuilderError;
100    fn from_str(s: &str) -> Result<Self, Self::Err> {
101        match s {
102            "openai" => Ok(Self::OpenAI),
103            "openrouter" => Ok(Self::OpenRouter),
104            "ollama" => Ok(Self::Ollama),
105            "groq" => Ok(Self::Groq),
106            _ => Err(MicroAgentBuilderError::ProviderNotSupported(s.into())),
107        }
108    }
109}
110
111impl fmt::Display for SupportedProvider {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        let s = match self {
114            Self::OpenRouter => "openrouter",
115            Self::Groq => "groq",
116            Self::Ollama => "ollama",
117            Self::OpenAI => "openai",
118        };
119        write!(f, "{}", s)
120    }
121}
122
123impl SupportedProvider {
124    /// Return the default model identifier for this provider.
125    pub fn default_model(&self) -> &'static str {
126        match self {
127            // GPT-5.5 is the current default ChatGPT model as of May 2026
128            SupportedProvider::OpenAI => "gpt-5.5",
129
130            // llama3.2 is the most widely tested, hardware-friendly default
131            SupportedProvider::Ollama => "llama3.2",
132
133            // llama-3.3-70b-versatile is Groq's documented default recommendation
134            SupportedProvider::Groq => "llama-3.3-70b-versatile",
135
136            // Claude Opus 4.7 by Anthropic is cuttig-edge in the models market
137            SupportedProvider::OpenRouter => "anthropic/claude-opus-4.7",
138        }
139    }
140}
141
142/// Errors that can occur while configuring or building a [`MicroAgent`].
143#[derive(Debug, Error)]
144pub enum MicroAgentBuilderError {
145    #[error("Skill {0} not found")]
146    SkillNotFound(String),
147    #[error("Skill parsing error")]
148    SkillParsingError(#[from] skills::SkillLoadingError),
149    #[error("Provider {0} not supported")]
150    ProviderNotSupported(String),
151    #[error("Tool with name {0} already exists")]
152    ToolAlreadyDefined(String),
153    #[error("Storage could not be loaded: {0}")]
154    StorageLoadError(String),
155    #[error("API key not found for provider {0}")]
156    APIKeyNotFoundError(String),
157}
158
159/// Newtype wrapper so that [`UltrafastClient`] can implement [`Debug`].
160pub struct DebuggableClient(pub Arc<UltrafastClient>);
161
162impl Debug for DebuggableClient {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(f, "UltrafastClient")
165    }
166}
167
168/// A fully-configured agent ready to generate responses or run conversations.
169///
170/// Created via [`MicroAgentBuilder`]. Holds the conversation history, tool
171/// registry, and LLM client configuration.
172#[derive(Debug)]
173pub struct MicroAgent<Ctx> {
174    pub history: Vec<Message>,
175    pub tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>>,
176    pub skills: HashMap<String, String>,
177    pub provider: SupportedProvider,
178    pub model: String,
179    pub system: String,
180    client: Option<DebuggableClient>,
181    pub tool_context: Arc<ToolExecutionContext<Ctx>>,
182    pub storage: Box<dyn AgentStorage>,
183    pub parallel_tool_calls: bool,
184}
185
186/// Builder for [`MicroAgent`].
187///
188/// # Example
189/// ```no_run
190/// use microagents_core::agent::MicroAgentBuilder;
191/// use microagents_core::types::ToolExecutionContext;
192///
193/// let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
194///     .provider("openai".into()).unwrap()
195///     .model("gpt-5.5".into())
196///     .build()
197///     .expect("API key must be set");
198/// ```
199#[derive(Debug)]
200pub struct MicroAgentBuilder<Ctx> {
201    tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>>,
202    skills: HashMap<String, String>,
203    provider: SupportedProvider,
204    model: String,
205    custom_instructions: String,
206    tool_context: Arc<ToolExecutionContext<Ctx>>,
207    pub storage: Box<dyn AgentStorage>,
208    pub parallel_tool_calls: bool,
209}
210
211impl<Ctx: Send + Sync + 'static> MicroAgentBuilder<Ctx> {
212    /// Create a new builder with the given tool execution context.
213    ///
214    /// The `skills` tool is registered automatically.
215    pub fn new(tool_context: ToolExecutionContext<Ctx>) -> Self {
216        Self {
217            tools: HashMap::from([(
218                "skills".to_string(),
219                Arc::new(SkillsTool) as Arc<dyn ToolFunction<Ctx>>,
220            )]),
221            skills: HashMap::new(),
222            provider: SupportedProvider::default(),
223            model: String::new(),
224            custom_instructions: String::new(),
225            tool_context: Arc::new(tool_context),
226            storage: Box::new(InMemoryAgentStorage::default()) as Box<dyn AgentStorage>,
227            parallel_tool_calls: false,
228        }
229    }
230
231    /// Register a single skill by name.
232    ///
233    /// Searches `.agents/skills/{name}` then `~/.agents/skills/{name}`.
234    pub fn add_skill(mut self, skill_name: String) -> Result<Self, MicroAgentBuilderError> {
235        if let Some(skill_path) = ensure_skill(&skill_name) {
236            let description = parse_skill(&skill_path)?;
237            self.skills.insert(skill_name, description);
238            return Ok(self);
239        }
240        Err(MicroAgentBuilderError::SkillNotFound(skill_name))
241    }
242
243    /// Auto-discover and register all skills found in the local and global
244    /// skills directories.
245    pub fn find_skills(mut self) -> Result<Self, MicroAgentBuilderError> {
246        let loaded_skills = find_skills()?;
247        for (skill, des) in loaded_skills {
248            self.skills.insert(skill, des);
249        }
250        Ok(self)
251    }
252
253    /// Set the LLM provider (e.g. `"openai"`, `"groq"`, `"ollama"`).
254    pub fn provider(mut self, provider: String) -> Result<Self, MicroAgentBuilderError> {
255        let prov = SupportedProvider::from_str(&provider)?;
256        self.provider = prov;
257        Ok(self)
258    }
259
260    /// Set the model identifier. If empty, the provider's default is used.
261    pub fn model(mut self, model: String) -> Self {
262        self.model = model;
263        self
264    }
265
266    /// Enable or disable parallel tool execution.
267    pub fn parallel_tool_calls(mut self, parallel_tool_calls: bool) -> Self {
268        self.parallel_tool_calls = parallel_tool_calls;
269        self
270    }
271
272    /// Configure the session storage backend.
273    pub async fn storage(
274        mut self,
275        storage: AgentStorageChoice,
276    ) -> Result<Self, MicroAgentBuilderError> {
277        match storage {
278            AgentStorageChoice::Jsonl => self.storage = Box::new(JsonlAgentStorage::default()),
279            AgentStorageChoice::Memory => self.storage = Box::new(InMemoryAgentStorage::default()),
280            AgentStorageChoice::Sqlite => {
281                let store = SqliteAgentStorage::new(None)
282                    .await
283                    .map_err(|e| MicroAgentBuilderError::StorageLoadError(e.to_string()))?;
284                self.storage = Box::new(store);
285            }
286        }
287
288        Ok(self)
289    }
290
291    /// Register a custom tool.
292    pub fn add_tool(
293        mut self,
294        tool: Arc<dyn ToolFunction<Ctx>>,
295    ) -> Result<Self, MicroAgentBuilderError> {
296        self.tools.insert(tool.name().to_owned(), tool);
297        Ok(self)
298    }
299
300    /// Append free-form instructions to the system prompt.
301    pub fn custom_instructions(mut self, instructions: String) -> Self {
302        self.custom_instructions = instructions;
303        self
304    }
305
306    /// Choose the effective model: user-supplied or provider default.
307    fn resolve_model(&self) -> String {
308        if self.model.is_empty() {
309            return self.provider.default_model().into();
310        }
311        self.model.clone()
312    }
313
314    /// Assemble the full system prompt from the base prompt, model info,
315    /// registered tools, skills, and any custom instructions.
316    fn resolve_system(&self, model: &str) -> String {
317        let mut base = BASE_SYSTEM_PROMPT.to_string();
318        base += &format!(
319            r#"<model>
320You are {} provided by {}
321</model>
322"#,
323            model, self.provider
324        );
325        if !self.tools.is_empty() {
326            base += "\n<tools>";
327            for (k, v) in &self.tools {
328                base += &format!(
329                    "\n<tool>\n<name>{}</name>\n<description>{}</description>\n<input_schema>{}</input_schema>\n</tool>",
330                    k,
331                    v.description(),
332                    v.input_schema()
333                )
334            }
335            base += "\n</tools>"
336        }
337        if !self.skills.is_empty() {
338            base += "\n<skills>";
339            for (k, v) in &self.skills {
340                base += &format!(
341                    "\n<skill>\n<name>{}</name>\n<description>{}</description>\n</skill>",
342                    k, v
343                );
344            }
345            base += "\n</skills>";
346        }
347        if !self.custom_instructions.is_empty() {
348            base += &format!(
349                "\n<additional_instructions>\n{}\n</additional_instructions>",
350                self.custom_instructions
351            )
352        }
353
354        base
355    }
356
357    /// Finalise the builder and return a [`MicroAgent`].
358    ///
359    /// Fails early if a required API key is missing for the chosen provider.
360    #[must_use = "The builder needs to call `build` otherwise it hangs without turning into an actual agent."]
361    pub fn build(self) -> Result<MicroAgent<Ctx>, MicroAgentBuilderError> {
362        let model = self.resolve_model();
363        let system = self.resolve_system(&model);
364        match self.provider {
365            SupportedProvider::Groq => {
366                check_api_key("GROQ_API_KEY")
367                    .map_err(|_| MicroAgentBuilderError::APIKeyNotFoundError("groq".into()))?;
368            }
369            SupportedProvider::OpenAI => {
370                check_api_key("OPENAI_API_KEY")
371                    .map_err(|_| MicroAgentBuilderError::APIKeyNotFoundError("openai".into()))?;
372            }
373            SupportedProvider::OpenRouter => {
374                check_api_key("OPENROUTER_API_KEY").map_err(|_| {
375                    MicroAgentBuilderError::APIKeyNotFoundError("openrouter".into())
376                })?;
377            }
378            _ => {}
379        }
380        Ok(MicroAgent {
381            history: vec![],
382            tools: self.tools,
383            skills: self.skills,
384            model,
385            provider: self.provider,
386            client: None,
387            system,
388            tool_context: self.tool_context,
389            storage: self.storage,
390            parallel_tool_calls: self.parallel_tool_calls,
391        })
392    }
393}
394
395impl<Ctx> MicroAgent<Ctx> {
396    /// Lazily initialise the LLM client.
397    ///
398    /// The client is cached after the first call.
399    fn init_client(&mut self) -> Result<Arc<UltrafastClient>, AgentError> {
400        if let Some(c) = self.client.as_ref() {
401            return Ok(c.0.clone());
402        }
403        let mut base_client = UltrafastClient::standalone();
404        base_client = match self.provider {
405            SupportedProvider::OpenRouter => {
406                base_client.with_openrouter(env::var("OPENROUTER_API_KEY")?)
407            }
408            SupportedProvider::OpenAI => base_client.with_openai(env::var("OPENAI_API_KEY")?),
409            SupportedProvider::Groq => base_client.with_groq(env::var("GROQ_API_KEY")?),
410            SupportedProvider::Ollama => base_client.with_provider(
411                "openai",
412                ProviderConfig {
413                    base_url: Some(
414                        env::var("OLLAMA_BASE_URL").unwrap_or("http://localhost:11434/v1".into()),
415                    ),
416                    api_key: "ollama".into(),
417                    name: "openai".into(),
418                    timeout: Duration::from_secs(300),
419                    max_retries: 3,
420                    retry_delay: Duration::from_millis(500),
421                    rate_limit: None,
422                    model_mapping: HashMap::new(),
423                    headers: HashMap::new(),
424                    enabled: true,
425                    circuit_breaker: Some(CircuitBreakerConfig {
426                        failure_threshold: 5,
427                        recovery_timeout: Duration::from_secs(30),
428                        request_timeout: Duration::from_secs(10),
429                        half_open_max_calls: 3,
430                    }),
431                },
432            ),
433        };
434        let client = base_client
435            .with_routing_strategy(ultrafast_models_sdk::RoutingStrategy::Single)
436            .with_cache(CacheConfig {
437                enabled: true,
438                ttl: Duration::from_secs(600),
439                max_size: 1000,
440                cache_type: CacheType::InMemory,
441            })
442            .build()
443            .map_err(|e| AgentError::ClientInitFailed(e.to_string()))?;
444        let arcc = Arc::new(client);
445        self.client = Some(DebuggableClient(arcc.clone()));
446        Ok(arcc)
447    }
448}
449
450/// Built-in tool that loads skill instructions at runtime.
451#[derive(Debug)]
452pub struct SkillsTool;
453
454#[async_trait::async_trait]
455impl<Ctx: Send + Sync + 'static> ToolFunction<Ctx> for SkillsTool {
456    fn name(&self) -> &'static str {
457        SKILLS_TOOL_NAME
458    }
459
460    fn description(&self) -> &'static str {
461        "Call this tool to load a skill, providing the name of the skill you are invoking"
462    }
463
464    fn input_schema(&self) -> serde_json::Value {
465        json!({
466          "type": "object",
467          "required": [
468            "skill_name"
469          ],
470          "properties": {
471            "skill_name": {
472              "type": "string",
473              "description": "Name of the skill to load"
474            }
475          }
476        })
477    }
478
479    async fn execute(
480        &self,
481        input: Value,
482        _ctx: &Arc<ToolExecutionContext<Ctx>>,
483    ) -> Result<ToolResult, AgentError> {
484        let skill_name = input["skill_name"]
485            .as_str()
486            .ok_or_else(|| AgentError::ToolCallError("missing skill_name".into()))?;
487        let skill_path = ensure_skill(skill_name);
488        if let Some(p) = skill_path {
489            let content = fs::read_to_string(p.join("SKILL.md")).map_err(|e| {
490                AgentError::ToolCallError(format!("Skill {skill_name} could not be read: {}", e))
491            })?;
492            return Ok(ToolResult::Ok(content));
493        }
494        Ok(ToolResult::Err(format!(
495            "Skill {skill_name} could not be found"
496        )))
497    }
498}
499
500#[async_trait::async_trait]
501impl<Ctx: Send + Sync + 'static> Agent for MicroAgent<Ctx> {
502    /// Generate the next assistant response as a raw token stream.
503    ///
504    /// The stream yields [`StreamChunk`]s that may contain text deltas or
505    /// partial tool calls. Higher-level orchestration (e.g. [`run`]) is
506    /// responsible for buffering and acting on tool calls.
507    async fn generate(&mut self) -> Result<GenerationStream, AgentError> {
508        let client = self.init_client()?;
509        let tools: Vec<Tool> = self.tools.values().map(|t| t.to_sdk_tool()).collect();
510        let stream = client
511            .stream_chat_completion(ChatRequest {
512                model: self.model.clone(),
513                messages: self.history.clone(),
514                temperature: None,
515                stream: Some(true),
516                max_tokens: None,
517                tools: Some(tools),
518                tool_choice: Some(ultrafast_models_sdk::models::ToolChoice::Auto),
519                top_p: None,
520                frequency_penalty: None,
521                user: None,
522                presence_penalty: None,
523                stop: None,
524            })
525            .await
526            .map_err(|e| AgentError::GenerationError(e.to_string()))?;
527        let mapped =
528            stream.map(|item| item.map_err(|e| AgentError::GenerationError(e.to_string())));
529        return Ok(Box::pin(mapped));
530    }
531
532    /// Run a complete conversation turn.
533    ///
534    /// If `session_id` is [`Some`] the conversation history is restored from
535    /// storage; otherwise a new session is created. The returned stream yields
536    /// high-level events ([`AgentEventAny`]) including deltas, tool calls,
537    /// results, and the final stop event.
538    async fn run(
539        mut self,
540        prompt: String,
541        session_id: Option<String>,
542    ) -> Result<RunStream, AgentError> {
543        let local_tools: HashMap<String, Arc<dyn ToolFunction<Ctx>>> = self.tools.clone();
544        let mut input_text = self.system.clone();
545        let mut completion_text = String::new();
546        let start_processing = Utc::now();
547        let s: RunStream = Box::pin(stream! {
548            let resolved_sid;
549            let messages: Vec<Message> = if let Some(sid) = session_id {
550                let ev = AgentEventAny::SessionInit(SessionInitEvent {
551                    session_id: sid.clone(),
552                    model: self.model.clone(),
553                    system: self.system.clone(),
554                    provider: self.provider.to_string(),
555                    init_type: SessionInitType::Resume,
556                    timestamp: Utc::now(),
557                });
558                yield Ok(ev);
559
560                let events_res = self
561                    .storage
562                    .get_session(&sid)
563                    .await
564                    .map_err(|e| AgentError::SessionLoadError(e.to_string()));
565
566                let events = match events_res {
567                    Ok(e) => e,
568                    Err(e) => {
569                        yield Err(AgentError::RunError(format!("Error while getting the session: {}", e)));
570                        return;
571                    }
572                };
573
574                resolved_sid = sid;
575
576                events
577                    .iter()
578                    .filter_map(|e| convert_event_to_message(e.clone()))
579                    .collect()
580            } else {
581                let sid = uuid::Uuid::new_v4().to_string();
582                let sint = SessionInitEvent {
583                    session_id: sid.clone(),
584                    model: self.model.clone(),
585                    system: self.system.clone(),
586                    provider: self.provider.to_string(),
587                    init_type: SessionInitType::Start,
588                    timestamp: Utc::now(),
589                };
590                resolved_sid = sid;
591                let ev = AgentEventAny::SessionInit(sint.clone());
592                match self.storage.create_session(sint).await {
593                    Ok(_) => {},
594                    Err(e) => {
595                        yield Err(AgentError::RunError(format!("An error occurred while creating the session in the storage: {}", e)));
596                        return;
597                    }
598                }
599                yield Ok(ev);
600                vec![]
601            };
602            self.history = messages;
603            self.history.insert(0, Message { role: Role::System, content: self.system.clone(), name: None, tool_calls: None, tool_call_id: None });
604            self.history.push(Message {
605                role: Role::User,
606                content: prompt.to_owned(),
607                name: None,
608                tool_calls: None,
609                tool_call_id: None,
610            });
611            input_text += &prompt;
612            let turn_id = uuid::Uuid::new_v4().to_string();
613            let user_prompt_submit = AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
614                session_id: resolved_sid.clone(),
615                turn_id: turn_id.clone(),
616                prompt,
617                timestamp: Utc::now(),
618            });
619            match self.storage.update_session(user_prompt_submit.clone()).await {
620                Ok(_) => {},
621                Err(e) => {
622                    yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
623                    return;
624                }
625            };
626            yield Ok(user_prompt_submit);
627
628            loop {
629                let mut generation = match self.generate().await {
630                    Ok(g) => g,
631                    Err(e) => {
632                        yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
633                        return;
634                    }
635                };
636                let mut text = String::new();
637                let mut tool_messages: Vec<Message> = vec![];
638                let mut tool_calls: BTreeMap<u32, (String, String, String)> = BTreeMap::new();
639                while let Some(g) = generation.next().await {
640                    match g {
641                        Ok(chunk) => {
642                            let mut deltas: Vec<(u32, Delta)> = vec![];
643                            for choice in chunk.choices {
644                                deltas.push((choice.index, choice.delta));
645                            }
646                            deltas.sort_by_key(|a| a.0);
647                            for (_, delta) in deltas {
648                                if let Some(c) = delta.content {
649                                    text += &c;
650                                    completion_text += &c;
651                                    let ev = AgentEventAny::StreamDelta(StreamDeltaEvent {
652                                        session_id: resolved_sid.clone(),
653                                        turn_id: turn_id.clone(),
654                                        delta: c,
655                                        delta_type: DeltaType::Text,
656                                        timestamp: Utc::now(),
657                                    });
658                                    match self.storage.update_session(ev.clone()).await {
659                                        Ok(_) => {},
660                                        Err(e) => {
661                                            yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
662                                            return;
663                                        }
664                                    }
665                                    yield Ok(ev);
666                                }
667                                if let Some(tcs) = delta.tool_calls {
668                                    for tc in tcs {
669                                        if let Some(func) = tc.function {
670                                            if let Some(tid) = tc.id && let Some(name) = func.name {
671                                                // First chunk with id and name: initialize entry
672                                                tool_calls.entry(tc.index).or_insert((tid, name, String::new()));
673                                            }
674                                            // Accumulate arguments regardless
675                                            if let Some(args) = func.arguments {
676                                                tool_calls.entry(tc.index).and_modify(|v| v.2 += &args);
677                                                completion_text += &args;
678                                            }
679                                        }
680                                    }
681                                }
682                            }
683                        },
684                        Err(e) => {
685                            let latency = (Utc::now() - start_processing).num_milliseconds();
686                            let stop_ev = AgentEventAny::SessionStop(SessionStopEvent { session_id: resolved_sid.clone(), success: false, result: None, error: Some(e.to_string()), timestamp: Utc::now(), usage: Usage {
687                                latency,
688                                ..Default::default()
689                            }});
690                            match self.storage.update_session(stop_ev.clone()).await {
691                                Ok(_) => {},
692                                Err(e) => {
693                                    yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
694                                    return;
695                                }
696                            }
697                            yield Ok(stop_ev);
698                            return;
699                        }
700                    }
701                }
702
703                if tool_calls.is_empty() {
704                    let latency = (Utc::now() - start_processing).num_milliseconds();
705                    let input_tokens = estimate_tokens(&input_text).unwrap_or_default();
706                    let output_tokens = estimate_tokens(&completion_text).unwrap_or_default();
707                    let ev = AgentEventAny::AssistantResponse(AssistantResponseEvent {
708                        session_id: resolved_sid.clone(),
709                        turn_id: turn_id.clone(),
710                        full_text: text.clone(),
711                        tool_calls: None,
712                        timestamp: Utc::now(),
713                    });
714                    let stop_ev = AgentEventAny::SessionStop(SessionStopEvent {
715                        session_id: resolved_sid.clone(),
716                        success: true,
717                        result: Some(text),
718                        error: None,
719                        timestamp: Utc::now(),
720                        usage: Usage {
721                            latency,
722                            output_chars: completion_text.len(),
723                            input_chars: input_text.len(),
724                            estimated_output_tokens: output_tokens,
725                            estimated_input_tokens: input_tokens,
726                        }
727                    });
728                    match self.storage.update_session(ev.clone()).await {
729                        Ok(_) => {},
730                        Err(e) => {
731                            yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
732                            return;
733                        }
734                    }
735                    match self.storage.update_session(stop_ev.clone()).await {
736                        Ok(_) => {},
737                        Err(e) => {
738                            yield Err(AgentError::RunError(format!("An error occurred while starting the generation stream: {}", e)));
739                            return;
740                        }
741                    }
742                    yield Ok(ev);
743                    yield Ok(stop_ev);
744                    return;
745                }
746
747                let mut to_pop = HashSet::new();
748                let mut to_call = JoinSet::new();
749                let tool_ctx = self.tool_context.clone();
750                let concurrency = if !self.parallel_tool_calls {
751                    1
752                } else {
753                    MAX_CONCURRENT_TOOL_CALLS
754                };
755                let semaphore = Arc::new(Semaphore::new(concurrency));
756                for (tid, name, args) in tool_calls.values() {
757                    match parse_json_fragment(args) {
758                        JsonResult::Valid(v) => {
759                            let tool = local_tools.get(name);
760                            if let Some(t) = tool {
761                                let tool_name = name.clone();
762                                let tc_ev = if tool_name != SKILLS_TOOL_NAME {
763                                    AgentEventAny::ToolCall(ToolCallEvent {
764                                        session_id: resolved_sid.clone(),
765                                        turn_id: turn_id.clone(),
766                                        name: tool_name,
767                                        input: v.clone(),
768                                        timestamp: Utc::now(),
769                                    })
770                                } else {
771                                    AgentEventAny::SkillLoad(SkillLoadEvent {
772                                        session_id: resolved_sid.clone(),
773                                        turn_id: turn_id.clone(),
774                                        skill_name: v["skill_name"].as_str().unwrap_or_default().to_string(),
775                                        timestamp: Utc::now(),
776                                    })
777                                };
778                                match self.storage.update_session(tc_ev.clone()).await {
779                                    Ok(_) => {},
780                                    Err(e) => {
781                                        yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
782                                        return;
783                                    }
784                                }
785                                yield Ok(tc_ev);
786                                let permit_res = semaphore.clone().acquire_owned().await;
787                                let permit = match permit_res {
788                                    Ok(p) => p,
789                                    Err(e) => {
790                                        yield Err(AgentError::RunError(format!("Error while acquiring semaphore: {}", e)));
791                                        return;
792                                    }
793                                };
794                                let t = t.clone();
795                                let tool_call_id = tid.clone();
796                                let ctx = tool_ctx.clone();
797                                to_call.spawn(async move {
798                                    let _permit = permit;
799                                    let result = call_tool(t, v, ctx).await;
800                                    match result {
801                                        Ok(r) => Ok((tool_call_id, r)),
802                                        Err(e) => Err(e)
803                                    }
804                                });
805                            }
806                        },
807                        JsonResult::Incomplete => {},
808                        JsonResult::Malformed => {
809                            to_pop.insert(tid.clone());
810                        }
811                    }
812                }
813                while let Some(res) = to_call.join_next().await {
814                    match res {
815                        Ok(Ok((tid, tool_result))) => {
816                            let ev = AgentEventAny::ToolResult(ToolResultEvent {
817                                session_id: resolved_sid.clone(),
818                                turn_id: turn_id.clone(),
819                                result: tool_result.clone(),
820                                tool_call_id: tid.clone(),
821                                timestamp: Utc::now(),
822                            });
823                            match self.storage.update_session(ev.clone()).await {
824                                Ok(_) => {},
825                                Err(e) => {
826                                    yield Err(AgentError::RunError(format!("An error occurred while updating the session in the storage: {}", e)));
827                                    return;
828                                }
829                            }
830                            yield Ok(ev);
831                            let content = match tool_result {
832                                ToolResult::Ok(r) => {
833                                    format!("Tool succeeded: {r}")
834                                },
835                                ToolResult::Err(r) => {
836                                    format!("Tool failed: {r}")
837                                },
838                                _ => unreachable!("ToolResult should not reach this branch")
839                            };
840                            input_text += &content;
841                            tool_messages.push(Message { role: Role::Tool, content, name: None, tool_calls: None, tool_call_id: Some(tid) });
842                        }
843                        Ok(Err(e)) => {
844                            yield Err(AgentError::RunError(format!("Tool call failed: {}", e)));
845                        }
846                        Err(e) => {
847                            yield Err(AgentError::RunError(format!("Task join failed: {}", e)));
848                        }
849                    }
850                }
851
852                self.history.push(Message {
853                    role: Role::Assistant,
854                    content: std::mem::take(&mut text),
855                    name: None,
856                    tool_calls: Some(tool_calls.iter().
857                        filter(|(_, (tid, _, _))| !to_pop.contains(tid))
858                        .map(|(_, (tid, name, args))| ToolCall {
859                        call_type: "function".into(),
860                        id: tid.clone(),
861                        function: FunctionCall {
862                            name: name.clone(),
863                            arguments: args.clone(),
864                        }
865                    }).collect()),
866                    tool_call_id: None,
867                });
868                self.history.extend(tool_messages);
869            }
870        });
871        Ok(s)
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878    use crate::types::{
879        Agent, AgentError, GenerationStream, RunStream, ToolExecutionContext, ToolFunction,
880    };
881    use async_stream::stream;
882    use futures_util::StreamExt;
883    use microagents_events::types::ToolResult;
884    use serde_json::Value;
885    use std::sync::Arc;
886
887    // ------------------------------------------------------------------
888    // DummyAgent – a mock implementation of the Agent trait
889    // ------------------------------------------------------------------
890
891    #[derive(Debug)]
892    struct DummyAgent {
893        pub generate_called: bool,
894        pub run_called: bool,
895        pub last_prompt: Option<String>,
896        pub last_session_id: Option<String>,
897    }
898
899    impl DummyAgent {
900        fn new() -> Self {
901            Self {
902                generate_called: false,
903                run_called: false,
904                last_prompt: None,
905                last_session_id: None,
906            }
907        }
908    }
909
910    #[async_trait::async_trait]
911    impl Agent for DummyAgent {
912        async fn generate(&mut self) -> Result<GenerationStream, AgentError> {
913            self.generate_called = true;
914            let s = stream! {
915                yield Ok(ultrafast_models_sdk::models::StreamChunk {
916                    id: "1".into(),
917                    object: "chat.completion.chunk".into(),
918                    created: 0,
919                    model: "dummy".into(),
920                    choices: vec![],
921                });
922            };
923            Ok(Box::pin(s))
924        }
925
926        async fn run(
927            mut self,
928            prompt: String,
929            session_id: Option<String>,
930        ) -> Result<RunStream, AgentError> {
931            self.run_called = true;
932            self.last_prompt = Some(prompt.clone());
933            self.last_session_id = session_id.clone();
934            let s = stream! {
935                yield Ok(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
936                    session_id: session_id.unwrap_or_else(|| "new".into()),
937                    turn_id: "t1".into(),
938                    prompt,
939                    timestamp: Utc::now(),
940                }));
941            };
942            Ok(Box::pin(s))
943        }
944    }
945
946    // ------------------------------------------------------------------
947    // A simple dummy tool for builder tests
948    // ------------------------------------------------------------------
949
950    #[derive(Debug)]
951    struct DummyTool;
952
953    #[async_trait::async_trait]
954    impl ToolFunction<()> for DummyTool {
955        fn name(&self) -> &'static str {
956            "dummy"
957        }
958        fn description(&self) -> &'static str {
959            "A dummy tool"
960        }
961        fn input_schema(&self) -> Value {
962            json!({"type": "object"})
963        }
964        async fn execute(
965            &self,
966            _input: Value,
967            _ctx: &Arc<ToolExecutionContext<()>>,
968        ) -> Result<ToolResult, AgentError> {
969            Ok(ToolResult::Ok("done".into()))
970        }
971    }
972
973    // ------------------------------------------------------------------
974    // Builder default tests
975    // ------------------------------------------------------------------
976
977    #[test]
978    fn test_builder_default_provider_is_openai() {
979        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
980        assert_eq!(builder.provider, SupportedProvider::OpenAI);
981    }
982
983    #[test]
984    fn test_builder_default_model_is_empty() {
985        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
986        assert!(builder.model.is_empty());
987    }
988
989    #[test]
990    fn test_builder_default_skills_is_empty() {
991        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
992        assert!(builder.skills.is_empty());
993    }
994
995    #[test]
996    fn test_builder_default_tools_contains_skills_tool() {
997        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
998        assert!(builder.tools.contains_key("skills"));
999        assert_eq!(builder.tools.len(), 1);
1000    }
1001
1002    #[test]
1003    fn test_builder_default_parallel_tool_calls_is_false() {
1004        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()));
1005        assert!(!builder.parallel_tool_calls);
1006    }
1007
1008    // ------------------------------------------------------------------
1009    // Builder pattern tests
1010    // ------------------------------------------------------------------
1011
1012    #[test]
1013    fn test_builder_provider_sets_provider() {
1014        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1015            .provider("groq".into())
1016            .unwrap();
1017        assert_eq!(builder.provider, SupportedProvider::Groq);
1018    }
1019
1020    #[test]
1021    fn test_builder_provider_invalid_returns_error() {
1022        let result =
1023            MicroAgentBuilder::new(ToolExecutionContext::new(())).provider("unknown".into());
1024        assert!(result.is_err());
1025        assert!(
1026            matches!(
1027                result.unwrap_err(),
1028                MicroAgentBuilderError::ProviderNotSupported(_)
1029            ),
1030            "expected ProviderNotSupported error"
1031        );
1032    }
1033
1034    #[test]
1035    fn test_builder_model_sets_model() {
1036        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(())).model("gpt-5.5".into());
1037        assert_eq!(builder.model, "gpt-5.5");
1038    }
1039
1040    #[test]
1041    fn test_builder_parallel_tool_calls_sets_flag() {
1042        let builder =
1043            MicroAgentBuilder::new(ToolExecutionContext::new(())).parallel_tool_calls(true);
1044        assert!(builder.parallel_tool_calls);
1045    }
1046
1047    #[test]
1048    fn test_builder_custom_instructions_sets_instructions() {
1049        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1050            .custom_instructions("Be concise".into());
1051        assert_eq!(builder.custom_instructions, "Be concise");
1052    }
1053
1054    #[test]
1055    fn test_builder_add_tool_increments_tools() {
1056        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1057            .add_tool(Arc::new(DummyTool))
1058            .unwrap();
1059        assert_eq!(builder.tools.len(), 2);
1060        assert!(builder.tools.contains_key("dummy"));
1061    }
1062
1063    #[tokio::test]
1064    async fn test_builder_storage_sets_jsonl() {
1065        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1066            .storage(AgentStorageChoice::Jsonl)
1067            .await
1068            .unwrap();
1069        // We cannot directly inspect the dyn type, but building should succeed
1070        let _agent = builder.build().expect("Should be able to build the agent");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_builder_storage_sets_memory() {
1075        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1076            .storage(AgentStorageChoice::Memory)
1077            .await
1078            .unwrap();
1079        let _agent = builder.build().expect("Should be able to build the agent");
1080    }
1081
1082    #[tokio::test]
1083    async fn test_builder_storage_sets_sqlite() {
1084        let builder = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1085            .storage(AgentStorageChoice::Sqlite)
1086            .await
1087            .unwrap();
1088        let _agent = builder.build().expect("Should be able to build the agent");
1089    }
1090
1091    // ------------------------------------------------------------------
1092    // Build / resolve tests
1093    // ------------------------------------------------------------------
1094
1095    #[test]
1096    fn test_build_sets_empty_history() {
1097        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1098            .provider("ollama".into())
1099            .unwrap()
1100            .build()
1101            .expect("Should be able to build the agent");
1102        assert!(agent.history.is_empty());
1103    }
1104
1105    #[test]
1106    fn test_build_sets_tools_on_agent() {
1107        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1108            .provider("ollama".into())
1109            .unwrap()
1110            .add_tool(Arc::new(DummyTool))
1111            .unwrap()
1112            .build()
1113            .expect("Should be able to build the agent");
1114        assert_eq!(agent.tools.len(), 2);
1115    }
1116
1117    #[test]
1118    fn test_build_sets_provider_on_agent() {
1119        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1120            .provider("ollama".into())
1121            .unwrap()
1122            .build()
1123            .expect("Should be able to build the agent");
1124        assert_eq!(agent.provider, SupportedProvider::Ollama);
1125    }
1126
1127    #[test]
1128    fn test_build_sets_model_on_agent() {
1129        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1130            .provider("ollama".into())
1131            .unwrap()
1132            .model("llama3.2".into())
1133            .build()
1134            .expect("Should be able to build the agent");
1135        assert_eq!(agent.model, "llama3.2");
1136    }
1137
1138    #[test]
1139    fn test_build_sets_parallel_tool_calls_on_agent() {
1140        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1141            .provider("ollama".into())
1142            .unwrap()
1143            .parallel_tool_calls(true)
1144            .build()
1145            .expect("Should be able to build the agent");
1146        assert!(agent.parallel_tool_calls);
1147    }
1148
1149    #[test]
1150    fn test_build_system_prompt_contains_base() {
1151        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1152            .provider("ollama".into())
1153            .unwrap()
1154            .build()
1155            .expect("Should be able to build the agent");
1156        assert!(agent.system.contains("You are MicroAgent"));
1157    }
1158
1159    #[test]
1160    fn test_build_system_prompt_contains_tools() {
1161        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1162            .provider("ollama".into())
1163            .unwrap()
1164            .add_tool(Arc::new(DummyTool))
1165            .unwrap()
1166            .build()
1167            .expect("Should be able to build the agent");
1168        assert!(agent.system.contains("<tools>"));
1169        assert!(agent.system.contains("<name>dummy</name>"));
1170    }
1171
1172    #[test]
1173    fn test_build_system_prompt_contains_default_model_when_model_empty() {
1174        let original_value = std::env::var("OPENAI_API_KEY").unwrap_or_default();
1175        unsafe {
1176            std::env::set_var("OPENAI_API_KEY", "test");
1177        }
1178        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1179            .build()
1180            .expect("Should be able to build the agent");
1181        // Default provider is OpenAI -> default model is gpt-5.5
1182        assert!(agent.system.contains("gpt-5.5"));
1183        unsafe {
1184            std::env::set_var("OPENAI_API_KEY", original_value);
1185        }
1186    }
1187
1188    #[test]
1189    fn test_build_system_prompt_contains_custom_model_when_set() {
1190        let agent = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1191            .provider("ollama".into())
1192            .unwrap()
1193            .model("custom-model".into())
1194            .build()
1195            .expect("Should be able to build the agent");
1196        assert!(agent.system.contains("custom-model"));
1197        assert!(!agent.system.contains("llama-3.2"));
1198    }
1199
1200    #[test]
1201    fn test_agent_fails_to_build_if_not_api_key() {
1202        let result = MicroAgentBuilder::new(ToolExecutionContext::new(()))
1203            .provider("groq".into())
1204            .unwrap()
1205            .build();
1206        assert!(result.is_err_and(|e| matches!(e, MicroAgentBuilderError::APIKeyNotFoundError(_))));
1207    }
1208
1209    // ------------------------------------------------------------------
1210    // SupportedProvider tests
1211    // ------------------------------------------------------------------
1212
1213    #[test]
1214    fn test_supported_provider_from_str_valid() {
1215        assert_eq!(
1216            SupportedProvider::from_str("openai").unwrap(),
1217            SupportedProvider::OpenAI
1218        );
1219        assert_eq!(
1220            SupportedProvider::from_str("openrouter").unwrap(),
1221            SupportedProvider::OpenRouter
1222        );
1223        assert_eq!(
1224            SupportedProvider::from_str("ollama").unwrap(),
1225            SupportedProvider::Ollama
1226        );
1227        assert_eq!(
1228            SupportedProvider::from_str("groq").unwrap(),
1229            SupportedProvider::Groq
1230        );
1231    }
1232
1233    #[test]
1234    fn test_supported_provider_from_str_invalid() {
1235        assert!(SupportedProvider::from_str("azure").is_err());
1236    }
1237
1238    #[test]
1239    fn test_supported_provider_display() {
1240        assert_eq!(SupportedProvider::OpenAI.to_string(), "openai");
1241        assert_eq!(SupportedProvider::OpenRouter.to_string(), "openrouter");
1242        assert_eq!(SupportedProvider::Ollama.to_string(), "ollama");
1243        assert_eq!(SupportedProvider::Groq.to_string(), "groq");
1244    }
1245
1246    #[test]
1247    fn test_supported_provider_default_model() {
1248        assert_eq!(SupportedProvider::OpenAI.default_model(), "gpt-5.5");
1249        assert_eq!(SupportedProvider::Ollama.default_model(), "llama3.2");
1250        assert_eq!(
1251            SupportedProvider::Groq.default_model(),
1252            "llama-3.3-70b-versatile"
1253        );
1254        assert_eq!(
1255            SupportedProvider::OpenRouter.default_model(),
1256            "anthropic/claude-opus-4.7"
1257        );
1258    }
1259
1260    #[test]
1261    fn test_supported_provider_default_is_openai() {
1262        let provider: SupportedProvider = Default::default();
1263        assert_eq!(provider, SupportedProvider::OpenAI);
1264    }
1265
1266    // ------------------------------------------------------------------
1267    // DummyAgent mock tests
1268    // ------------------------------------------------------------------
1269
1270    #[tokio::test]
1271    async fn test_dummy_agent_generate_sets_flag() {
1272        let mut agent = DummyAgent::new();
1273        assert!(!agent.generate_called);
1274        let _ = agent.generate().await;
1275        assert!(agent.generate_called);
1276    }
1277
1278    #[tokio::test]
1279    async fn test_dummy_agent_generate_returns_stream() {
1280        let mut agent = DummyAgent::new();
1281        let mut stream = agent.generate().await.unwrap();
1282        let item = stream.next().await;
1283        assert!(item.is_some());
1284    }
1285
1286    #[tokio::test]
1287    async fn test_dummy_agent_run_streams_prompt() {
1288        let agent = DummyAgent::new();
1289        let mut stream = agent
1290            .run("hello".into(), Some("sid-123".into()))
1291            .await
1292            .unwrap();
1293        // We consumed self in run, so we can't check the fields directly.
1294        // Instead we verify via the yielded event.
1295        let item = stream.next().await.unwrap().unwrap();
1296        match item {
1297            AgentEventAny::UserPromptSubmit(ev) => {
1298                assert_eq!(ev.prompt, "hello");
1299                assert_eq!(ev.session_id, "sid-123");
1300            }
1301            _ => panic!("expected UserPromptSubmit"),
1302        }
1303    }
1304
1305    #[tokio::test]
1306    async fn test_dummy_agent_run_with_none_session_id() {
1307        let agent = DummyAgent::new();
1308        let mut stream = agent.run("test".into(), None).await.unwrap();
1309        let item = stream.next().await.unwrap().unwrap();
1310        match item {
1311            AgentEventAny::UserPromptSubmit(ev) => {
1312                assert_eq!(ev.session_id, "new");
1313            }
1314            _ => panic!("expected UserPromptSubmit"),
1315        }
1316    }
1317
1318    #[tokio::test]
1319    async fn test_dummy_agent_run_stream_yields_single_event() {
1320        let agent = DummyAgent::new();
1321        let mut stream = agent.run("prompt".into(), None).await.unwrap();
1322        let first = stream.next().await;
1323        assert!(first.is_some());
1324        let second = stream.next().await;
1325        assert!(second.is_none());
1326    }
1327}