1use crate::agent::capabilities::AgentCapabilities;
73use crate::agent::context::{AgentContext, AgentEvent};
74use crate::agent::core::{AgentLifecycle, AgentMessage, AgentMessaging, MoFAAgent};
75use crate::agent::error::{AgentError, AgentResult};
76use crate::agent::types::{AgentInput, AgentOutput, AgentState, InterruptResult};
77use std::sync::Arc;
78use tokio::sync::RwLock;
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum RunnerState {
83 Created,
85 Initializing,
87 Running,
89 Paused,
91 Stopping,
93 Stopped,
95 Error,
97}
98
99#[derive(Debug, Clone, Default)]
101pub struct RunnerStats {
102 pub total_executions: u64,
104 pub successful_executions: u64,
106 pub failed_executions: u64,
108 pub avg_execution_time_ms: f64,
110 pub last_execution_time_ms: Option<u64>,
112}
113
114pub struct AgentRunner<T: MoFAAgent> {
118 agent: T,
120 context: AgentContext,
122 state: Arc<RwLock<RunnerState>>,
124 stats: Arc<RwLock<RunnerStats>>,
126}
127
128impl<T: MoFAAgent> AgentRunner<T> {
129 pub async fn new(mut agent: T) -> AgentResult<Self> {
133 let context = AgentContext::new(agent.id().to_string());
134
135 agent
137 .initialize(&context)
138 .await
139 .map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
140
141 Ok(Self {
142 agent,
143 context,
144 state: Arc::new(RwLock::new(RunnerState::Created)),
145 stats: Arc::new(RwLock::new(RunnerStats::default())),
146 })
147 }
148
149 pub async fn with_context(mut agent: T, context: AgentContext) -> AgentResult<Self> {
151 agent
152 .initialize(&context)
153 .await
154 .map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
155
156 Ok(Self {
157 agent,
158 context,
159 state: Arc::new(RwLock::new(RunnerState::Created)),
160 stats: Arc::new(RwLock::new(RunnerStats::default())),
161 })
162 }
163
164 pub fn agent(&self) -> &T {
166 &self.agent
167 }
168
169 pub fn agent_mut(&mut self) -> &mut T {
171 &mut self.agent
172 }
173
174 pub fn context(&self) -> &AgentContext {
176 &self.context
177 }
178
179 pub async fn state(&self) -> RunnerState {
181 *self.state.read().await
182 }
183
184 pub async fn stats(&self) -> RunnerStats {
186 self.stats.read().await.clone()
187 }
188
189 pub async fn is_running(&self) -> bool {
191 matches!(
192 *self.state.read().await,
193 RunnerState::Running | RunnerState::Paused
194 )
195 }
196
197 pub async fn execute(&mut self, input: AgentInput) -> AgentResult<AgentOutput> {
207 let current_state = self.state().await;
209 if !matches!(
210 current_state,
211 RunnerState::Running | RunnerState::Created | RunnerState::Stopped
212 ) {
213 return Err(AgentError::ValidationFailed(format!(
214 "Cannot execute in state: {:?}",
215 current_state
216 )));
217 }
218
219 *self.state.write().await = RunnerState::Running;
221
222 let start = std::time::Instant::now();
223
224 let result = self.agent.execute(input, &self.context).await;
226
227 let duration = start.elapsed().as_millis() as u64;
228
229 let mut stats = self.stats.write().await;
231 stats.total_executions += 1;
232 stats.last_execution_time_ms = Some(duration);
233
234 match &result {
235 Ok(_) => {
236 stats.successful_executions += 1;
237 }
238 Err(_) => {
239 stats.failed_executions += 1;
240 }
241 }
242
243 let n = stats.total_executions as f64;
245 stats.avg_execution_time_ms =
246 (stats.avg_execution_time_ms * (n - 1.0) + duration as f64) / n;
247
248 result
249 }
250
251 pub async fn execute_batch(
261 &mut self,
262 inputs: Vec<AgentInput>,
263 ) -> Vec<AgentResult<AgentOutput>> {
264 let mut results = Vec::with_capacity(inputs.len());
265 for input in inputs {
266 results.push(self.execute(input).await);
267 }
268 results
269 }
270
271 pub async fn pause(&mut self) -> AgentResult<()>
275 where
276 T: AgentLifecycle,
277 {
278 *self.state.write().await = RunnerState::Stopping;
279
280 self.agent
281 .pause()
282 .await
283 .map_err(|e| AgentError::Other(format!("Pause failed: {}", e)))?;
284
285 *self.state.write().await = RunnerState::Paused;
286 Ok(())
287 }
288
289 pub async fn resume(&mut self) -> AgentResult<()>
293 where
294 T: AgentLifecycle,
295 {
296 *self.state.write().await = RunnerState::Running;
297
298 self.agent
299 .resume()
300 .await
301 .map_err(|e| AgentError::Other(format!("Resume failed: {}", e)))?;
302
303 Ok(())
304 }
305
306 pub async fn shutdown(mut self) -> AgentResult<()> {
310 *self.state.write().await = RunnerState::Stopping;
311
312 self.agent
313 .shutdown()
314 .await
315 .map_err(|e| AgentError::ShutdownFailed(e.to_string()))?;
316
317 *self.state.write().await = RunnerState::Stopped;
318 Ok(())
319 }
320
321 pub async fn interrupt(&mut self) -> AgentResult<InterruptResult> {
323 self.agent
324 .interrupt()
325 .await
326 .map_err(|e| AgentError::Other(format!("Interrupt failed: {}", e)))
327 }
328
329 pub fn into_inner(self) -> T {
331 self.agent
332 }
333
334 pub fn id(&self) -> &str {
336 self.agent.id()
337 }
338
339 pub fn name(&self) -> &str {
341 self.agent.name()
342 }
343
344 pub fn capabilities(&self) -> &AgentCapabilities {
346 self.agent.capabilities()
347 }
348
349 pub fn agent_state(&self) -> AgentState {
351 self.agent.state()
352 }
353}
354
355impl<T: MoFAAgent + AgentMessaging> AgentRunner<T> {
357 pub async fn handle_event(&mut self, event: AgentEvent) -> AgentResult<()> {
359 self.agent.handle_event(event).await
360 }
361
362 pub async fn send_message(&mut self, msg: AgentMessage) -> AgentResult<AgentMessage> {
364 self.agent.handle_message(msg).await
365 }
366}
367
368pub struct AgentRunnerBuilder<T: MoFAAgent> {
374 agent: Option<T>,
375 context: Option<AgentContext>,
376}
377
378impl<T: MoFAAgent> AgentRunnerBuilder<T> {
379 pub fn new() -> Self {
381 Self {
382 agent: None,
383 context: None,
384 }
385 }
386
387 pub fn with_agent(mut self, agent: T) -> Self {
389 self.agent = Some(agent);
390 self
391 }
392
393 pub fn with_context(mut self, context: AgentContext) -> Self {
395 self.context = Some(context);
396 self
397 }
398
399 pub async fn build(self) -> AgentResult<AgentRunner<T>> {
401 let agent = self
402 .agent
403 .ok_or_else(|| AgentError::ValidationFailed("Agent not set".to_string()))?;
404
405 if let Some(context) = self.context {
406 AgentRunner::with_context(agent, context).await
407 } else {
408 AgentRunner::new(agent).await
409 }
410 }
411}
412
413impl<T: MoFAAgent> Default for AgentRunnerBuilder<T> {
414 fn default() -> Self {
415 Self::new()
416 }
417}
418
419pub async fn run_agents<T: MoFAAgent>(
425 agent: T,
426 inputs: Vec<AgentInput>,
427) -> AgentResult<Vec<AgentOutput>> {
428 let mut runner = AgentRunner::new(agent).await?;
429 let results = runner.execute_batch(inputs).await;
430 runner.shutdown().await?;
431 results.into_iter().collect()
432}
433
434#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::agent::capabilities::AgentCapabilitiesBuilder;
442
443 struct TestAgent {
444 id: String,
445 name: String,
446 state: AgentState,
447 }
448
449 impl TestAgent {
450 fn new(id: &str, name: &str) -> Self {
451 Self {
452 id: id.to_string(),
453 name: name.to_string(),
454 state: AgentState::Created,
455 }
456 }
457 }
458
459 #[async_trait::async_trait]
460 impl MoFAAgent for TestAgent {
461 fn id(&self) -> &str {
462 &self.id
463 }
464
465 fn name(&self) -> &str {
466 &self.name
467 }
468
469 fn capabilities(&self) -> &AgentCapabilities {
470 static CAPS: std::sync::OnceLock<AgentCapabilities> = std::sync::OnceLock::new();
471 CAPS.get_or_init(|| AgentCapabilitiesBuilder::new().build())
472 }
473
474 async fn initialize(&mut self, _ctx: &AgentContext) -> AgentResult<()> {
475 self.state = AgentState::Ready;
476 Ok(())
477 }
478
479 async fn execute(
480 &mut self,
481 input: AgentInput,
482 _ctx: &AgentContext,
483 ) -> AgentResult<AgentOutput> {
484 self.state = AgentState::Executing;
485 let text = input.to_text();
486 Ok(AgentOutput::text(format!("Echo: {}", text)))
487 }
488
489 async fn shutdown(&mut self) -> AgentResult<()> {
490 self.state = AgentState::Shutdown;
491 Ok(())
492 }
493
494 fn state(&self) -> AgentState {
495 self.state.clone()
496 }
497 }
498
499 #[tokio::test]
500 async fn test_agent_runner_new() {
501 let agent = TestAgent::new("test-001", "Test Agent");
502 let runner = AgentRunner::new(agent).await.unwrap();
503
504 assert_eq!(runner.id(), "test-001");
505 assert_eq!(runner.name(), "Test Agent");
506 assert_eq!(runner.state().await, RunnerState::Created);
508 }
509
510 #[tokio::test]
511 async fn test_agent_runner_execute() {
512 let agent = TestAgent::new("test-002", "Test Agent");
513 let mut runner = AgentRunner::new(agent).await.unwrap();
514
515 let input = AgentInput::text("Hello");
516 let output = runner.execute(input).await.unwrap();
517
518 assert_eq!(output.to_text(), "Echo: Hello");
519
520 let stats = runner.stats().await;
521 assert_eq!(stats.total_executions, 1);
522 assert_eq!(stats.successful_executions, 1);
523 }
524
525 #[tokio::test]
526 async fn test_run_agents_function() {
527 let agent = TestAgent::new("test-003", "Test Agent");
528 let inputs = vec![AgentInput::text("Test")];
529 let outputs = run_agents(agent, inputs).await.unwrap();
530
531 assert_eq!(outputs[0].to_text(), "Echo: Test");
532 }
533}