autoagents_core/agent/
actor.rs1#[cfg(not(target_arch = "wasm32"))]
2use crate::actor::Topic;
3use crate::agent::base::AgentType;
4use crate::agent::error::{AgentBuildError, RunnableAgentError};
5use crate::agent::hooks::AgentHooks;
6use crate::agent::state::AgentState;
7use crate::agent::task::Task;
8use crate::agent::{AgentBuilder, AgentDeriveT, AgentExecutor, BaseAgent, HookOutcome};
9use crate::channel::Sender;
10use crate::error::Error;
11#[cfg(not(target_arch = "wasm32"))]
12use crate::runtime::TypedRuntime;
13use async_trait::async_trait;
14use autoagents_protocol::Event;
15#[cfg(target_arch = "wasm32")]
16use futures::SinkExt;
17use futures::Stream;
18#[cfg(not(target_arch = "wasm32"))]
19use ractor::Actor;
20#[cfg(not(target_arch = "wasm32"))]
21use ractor::{ActorProcessingErr, ActorRef};
22use serde_json::Value;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26pub struct ActorAgent {}
31
32impl AgentType for ActorAgent {
33 fn type_name() -> &'static str {
34 "protocol_agent"
35 }
36}
37
38#[cfg(not(target_arch = "wasm32"))]
42#[derive(Clone)]
43pub struct ActorAgentHandle<T: AgentDeriveT + AgentExecutor + AgentHooks + Send + Sync> {
44 pub agent: Arc<BaseAgent<T, ActorAgent>>,
45 pub actor_ref: ActorRef<Task>,
46}
47
48#[cfg(not(target_arch = "wasm32"))]
49impl<T: AgentDeriveT + AgentExecutor + AgentHooks> ActorAgentHandle<T> {
50 pub fn addr(&self) -> ActorRef<Task> {
52 self.actor_ref.clone()
53 }
54
55 pub fn agent(&self) -> Arc<BaseAgent<T, ActorAgent>> {
58 self.agent.clone()
59 }
60}
61
62#[cfg(not(target_arch = "wasm32"))]
63impl<T: AgentDeriveT + AgentExecutor + AgentHooks> Debug for ActorAgentHandle<T> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("AgentHandle")
66 .field("agent", &self.agent)
67 .finish()
68 }
69}
70
71#[cfg(not(target_arch = "wasm32"))]
72#[derive(Debug)]
73pub struct AgentActor<T: AgentDeriveT + AgentExecutor + AgentHooks>(
74 pub Arc<BaseAgent<T, ActorAgent>>,
75);
76
77#[cfg(not(target_arch = "wasm32"))]
78impl<T: AgentDeriveT + AgentExecutor + AgentHooks> AgentActor<T> {}
79
80#[cfg(not(target_arch = "wasm32"))]
81impl<T: AgentDeriveT + AgentExecutor + AgentHooks> AgentBuilder<T, ActorAgent>
82where
83 T: Send + Sync + 'static,
84 serde_json::Value: From<<T as AgentExecutor>::Output>,
85 <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
86{
87 pub async fn build(self) -> Result<ActorAgentHandle<T>, Error> {
89 let llm = self.llm.ok_or(AgentBuildError::BuildFailure(
90 "LLM provider is required".to_string(),
91 ))?;
92 let runtime = self.runtime.ok_or(AgentBuildError::BuildFailure(
93 "Runtime should be defined".into(),
94 ))?;
95 let tx = runtime.tx();
96
97 let agent: Arc<BaseAgent<T, ActorAgent>> = Arc::new(
98 BaseAgent::<T, ActorAgent>::new(self.inner, llm, self.memory, tx, self.stream).await?,
99 );
100
101 let agent_actor = AgentActor(agent.clone());
103 let actor_ref = Actor::spawn(Some(agent_actor.0.name().into()), agent_actor, ())
104 .await
105 .map_err(AgentBuildError::SpawnError)?
106 .0;
107
108 for topic in self.subscribed_topics {
110 runtime.subscribe(&topic, actor_ref.clone()).await?;
111 }
112
113 Ok(ActorAgentHandle { agent, actor_ref })
114 }
115
116 pub fn subscribe(mut self, topic: Topic<Task>) -> Self {
117 self.subscribed_topics.push(topic);
118 self
119 }
120}
121
122#[cfg(not(target_arch = "wasm32"))]
123impl<T: AgentDeriveT + AgentExecutor + AgentHooks> BaseAgent<T, ActorAgent> {
124 pub fn tx(&self) -> Result<Sender<Event>, RunnableAgentError> {
125 self.tx.clone().ok_or(RunnableAgentError::EmptyTx)
126 }
127
128 pub async fn run(
129 self: Arc<Self>,
130 task: Task,
131 ) -> Result<<T as AgentDeriveT>::Output, RunnableAgentError>
132 where
133 Value: From<<T as AgentExecutor>::Output>,
134 <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
135 {
136 let submission_id = task.submission_id;
137 let tx = self.tx().map_err(|_| RunnableAgentError::EmptyTx)?;
138
139 let context = self.create_context();
140
141 let hook_outcome = self.inner.on_run_start(&task, &context).await;
143 match hook_outcome {
144 HookOutcome::Abort => return Err(RunnableAgentError::Abort),
145 HookOutcome::Continue => {}
146 }
147
148 match self.inner().execute(&task, context.clone()).await {
150 Ok(output) => {
151 let value: Value = output.clone().into();
152
153 #[cfg(not(target_arch = "wasm32"))]
154 tx.send(Event::TaskComplete {
155 sub_id: submission_id,
156 actor_id: self.id,
157 actor_name: self.name().to_string(),
158 result: serde_json::to_string_pretty(&value)
159 .map_err(|e| RunnableAgentError::ExecutorError(e.to_string()))?,
160 })
161 .await
162 .map_err(|e| RunnableAgentError::ExecutorError(e.to_string()))?;
163
164 let agent_out: <T as AgentDeriveT>::Output = output.into();
166
167 self.inner
169 .on_run_complete(&task, &agent_out, &context)
170 .await;
171
172 Ok(agent_out)
173 }
174 Err(e) => {
175 #[cfg(not(target_arch = "wasm32"))]
176 tx.send(Event::TaskError {
177 sub_id: submission_id,
178 actor_id: self.id,
179 error: e.to_string(),
180 })
181 .await
182 .map_err(|e| RunnableAgentError::ExecutorError(e.to_string()))?;
183 Err(RunnableAgentError::ExecutorError(e.to_string()))
184 }
185 }
186 }
187
188 pub async fn run_stream(
189 self: Arc<Self>,
190 task: Task,
191 ) -> Result<
192 std::pin::Pin<
193 Box<dyn Stream<Item = Result<<T as AgentDeriveT>::Output, RunnableAgentError>> + Send>,
194 >,
195 RunnableAgentError,
196 >
197 where
198 <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
199 {
200 let context = self.create_context();
202
203 match self.inner().execute_stream(&task, context).await {
205 Ok(stream) => {
206 use futures::StreamExt;
207 let transformed_stream = stream.map(move |result| {
209 match result {
210 Ok(output) => Ok(output.into()),
211 Err(e) => {
212 let error_msg = e.to_string();
214 Err(RunnableAgentError::ExecutorError(error_msg))
215 }
216 }
217 });
218
219 Ok(Box::pin(transformed_stream))
220 }
221 Err(e) => {
222 Err(RunnableAgentError::ExecutorError(e.to_string()))
224 }
225 }
226 }
227}
228
229#[cfg(not(target_arch = "wasm32"))]
230#[async_trait]
231impl<T: AgentDeriveT + AgentExecutor + AgentHooks> Actor for AgentActor<T>
232where
233 T: Send + Sync + 'static,
234 serde_json::Value: From<<T as AgentExecutor>::Output>,
235 <T as AgentDeriveT>::Output: From<<T as AgentExecutor>::Output>,
236{
237 type Msg = Task;
238 type State = AgentState;
239 type Arguments = ();
240
241 async fn pre_start(
242 &self,
243 _myself: ActorRef<Self::Msg>,
244 _args: Self::Arguments,
245 ) -> Result<Self::State, ActorProcessingErr> {
246 Ok(AgentState::new())
247 }
248
249 async fn post_stop(
250 &self,
251 _myself: ActorRef<Self::Msg>,
252 _state: &mut Self::State,
253 ) -> Result<(), ActorProcessingErr> {
254 self.0.inner().on_agent_shutdown().await;
256 Ok(())
257 }
258
259 async fn handle(
260 &self,
261 _myself: ActorRef<Self::Msg>,
262 message: Self::Msg,
263 _state: &mut Self::State,
264 ) -> Result<(), ActorProcessingErr> {
265 let agent = self.0.clone();
266 let task = message;
267
268 if agent.stream() {
270 let _ = agent.run_stream(task).await?;
271 Ok(())
272 } else {
273 let _ = agent.run(task).await?;
274 Ok(())
275 }
276 }
277}