autoagents_core/agent/
direct.rs

1use crate::agent::base::AgentType;
2use crate::agent::error::{AgentBuildError, RunnableAgentError};
3use crate::agent::task::Task;
4use crate::agent::{AgentBuilder, AgentDeriveT, AgentExecutor, AgentHooks, BaseAgent, HookOutcome};
5use crate::error::Error;
6use crate::protocol::Event;
7use futures::Stream;
8
9use crate::agent::constants::DEFAULT_CHANNEL_BUFFER;
10
11use crate::channel::{channel, Receiver, Sender};
12
13use crate::utils::{receiver_into_stream, BoxEventStream};
14
15/// Marker type for direct (non-actor) agents.
16///
17/// Direct agents execute immediately within the caller's task without
18/// requiring a runtime or event wiring. Use this for simple one-shot
19/// invocations and unit tests.
20pub struct DirectAgent {}
21
22impl AgentType for DirectAgent {
23    fn type_name() -> &'static str {
24        "direct_agent"
25    }
26}
27
28/// Handle for a direct agent containing the agent instance and an event stream
29/// receiver. Use `agent.run(...)` for one-shot calls or `agent.run_stream(...)`
30/// to receive streaming outputs.
31pub struct DirectAgentHandle<T: AgentDeriveT + AgentExecutor + AgentHooks + Send + Sync> {
32    pub agent: BaseAgent<T, DirectAgent>,
33    pub rx: BoxEventStream<Event>,
34}
35
36impl<T: AgentDeriveT + AgentExecutor + AgentHooks> DirectAgentHandle<T> {
37    pub fn new(agent: BaseAgent<T, DirectAgent>, rx: BoxEventStream<Event>) -> Self {
38        Self { agent, rx }
39    }
40}
41
42impl<T: AgentDeriveT + AgentExecutor + AgentHooks> AgentBuilder<T, DirectAgent> {
43    /// Build the BaseAgent and return a wrapper
44    #[allow(clippy::result_large_err)]
45    pub async fn build(self) -> Result<DirectAgentHandle<T>, Error> {
46        let llm = self.llm.ok_or(AgentBuildError::BuildFailure(
47            "LLM provider is required".to_string(),
48        ))?;
49        let (tx, rx): (Sender<Event>, Receiver<Event>) = channel(DEFAULT_CHANNEL_BUFFER);
50        let agent: BaseAgent<T, DirectAgent> =
51            BaseAgent::<T, DirectAgent>::new(self.inner, llm, self.memory, tx, self.stream).await?;
52        let stream = receiver_into_stream(rx);
53        Ok(DirectAgentHandle::new(agent, stream))
54    }
55}
56
57impl<T: AgentDeriveT + AgentExecutor + AgentHooks> BaseAgent<T, DirectAgent> {
58    /// Execute the agent for a single task and return the final agent output.
59    pub async fn run(&self, task: Task) -> Result<<T as AgentDeriveT>::Output, RunnableAgentError>
60    where
61        <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
62    {
63        let context = self.create_context();
64
65        //Run Hook
66        let hook_outcome = self.inner.on_run_start(&task, &context).await;
67        match hook_outcome {
68            HookOutcome::Abort => return Err(RunnableAgentError::Abort),
69            HookOutcome::Continue => {}
70        }
71
72        // Execute the agent's logic using the executor
73        match self.inner().execute(&task, context.clone()).await {
74            Ok(output) => {
75                let output: <T as AgentExecutor>::Output = output;
76
77                //Extract Agent output into the desired type
78                let agent_out: <T as AgentDeriveT>::Output = output.into();
79
80                //Run On complete Hook
81                self.inner
82                    .on_run_complete(&task, &agent_out, &context)
83                    .await;
84                Ok(agent_out)
85            }
86            Err(e) => {
87                // Send error event
88                Err(RunnableAgentError::ExecutorError(e.to_string()))
89            }
90        }
91    }
92
93    /// Execute the agent with streaming enabled and receive a stream of
94    /// partial outputs which culminate in a final chunk with `done=true`.
95    pub async fn run_stream(
96        &self,
97        task: Task,
98    ) -> Result<
99        std::pin::Pin<Box<dyn Stream<Item = Result<<T as AgentDeriveT>::Output, Error>> + Send>>,
100        RunnableAgentError,
101    >
102    where
103        <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
104    {
105        let context = self.create_context();
106
107        //Run Hook
108        let hook_outcome = self.inner.on_run_start(&task, &context).await;
109        match hook_outcome {
110            HookOutcome::Abort => return Err(RunnableAgentError::Abort),
111            HookOutcome::Continue => {}
112        }
113
114        // Execute the agent's streaming logic using the executor
115        match self.inner().execute_stream(&task, context.clone()).await {
116            Ok(stream) => {
117                use futures::StreamExt;
118                // Convert the stream output
119                let transformed_stream = stream.map(move |result| match result {
120                    Ok(output) => Ok(output.into()),
121                    Err(e) => {
122                        let error_msg = e.to_string();
123                        Err(RunnableAgentError::ExecutorError(error_msg).into())
124                    }
125                });
126
127                Ok(Box::pin(transformed_stream))
128            }
129            Err(e) => {
130                // Send error event for stream creation failure
131                Err(RunnableAgentError::ExecutorError(e.to_string()))
132            }
133        }
134    }
135}