use crate::agent::capabilities::AgentCapabilities;
use crate::agent::context::{AgentContext, AgentEvent};
use crate::agent::core::{AgentLifecycle, AgentMessage, AgentMessaging, MoFAAgent};
use crate::agent::error::{AgentError, AgentResult};
use crate::agent::types::{AgentInput, AgentOutput, AgentState, InterruptResult};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RunnerState {
Created,
Initializing,
Running,
Paused,
Stopping,
Stopped,
Error,
}
#[derive(Debug, Clone, Default)]
pub struct RunnerStats {
pub total_executions: u64,
pub successful_executions: u64,
pub failed_executions: u64,
pub avg_execution_time_ms: f64,
pub last_execution_time_ms: Option<u64>,
}
pub struct AgentRunner<T: MoFAAgent> {
agent: T,
context: AgentContext,
state: Arc<RwLock<RunnerState>>,
stats: Arc<RwLock<RunnerStats>>,
}
impl<T: MoFAAgent> AgentRunner<T> {
pub async fn new(mut agent: T) -> AgentResult<Self> {
let context = AgentContext::new(agent.id().to_string());
agent
.initialize(&context)
.await
.map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
Ok(Self {
agent,
context,
state: Arc::new(RwLock::new(RunnerState::Created)),
stats: Arc::new(RwLock::new(RunnerStats::default())),
})
}
pub async fn with_context(mut agent: T, context: AgentContext) -> AgentResult<Self> {
agent
.initialize(&context)
.await
.map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
Ok(Self {
agent,
context,
state: Arc::new(RwLock::new(RunnerState::Created)),
stats: Arc::new(RwLock::new(RunnerStats::default())),
})
}
pub fn agent(&self) -> &T {
&self.agent
}
pub fn agent_mut(&mut self) -> &mut T {
&mut self.agent
}
pub fn context(&self) -> &AgentContext {
&self.context
}
pub async fn state(&self) -> RunnerState {
*self.state.read().await
}
pub async fn stats(&self) -> RunnerStats {
self.stats.read().await.clone()
}
pub async fn is_running(&self) -> bool {
matches!(
*self.state.read().await,
RunnerState::Running | RunnerState::Paused
)
}
pub async fn execute(&mut self, input: AgentInput) -> AgentResult<AgentOutput> {
let current_state = self.state().await;
if !matches!(
current_state,
RunnerState::Running | RunnerState::Created | RunnerState::Stopped
) {
return Err(AgentError::ValidationFailed(format!(
"Cannot execute in state: {:?}",
current_state
)));
}
*self.state.write().await = RunnerState::Running;
let start = std::time::Instant::now();
let result = self.agent.execute(input, &self.context).await;
let duration = start.elapsed().as_millis() as u64;
let mut stats = self.stats.write().await;
stats.total_executions += 1;
stats.last_execution_time_ms = Some(duration);
match &result {
Ok(_) => {
stats.successful_executions += 1;
}
Err(_) => {
stats.failed_executions += 1;
}
}
let n = stats.total_executions as f64;
stats.avg_execution_time_ms =
(stats.avg_execution_time_ms * (n - 1.0) + duration as f64) / n;
result
}
pub async fn execute_batch(
&mut self,
inputs: Vec<AgentInput>,
) -> Vec<AgentResult<AgentOutput>> {
let mut results = Vec::with_capacity(inputs.len());
for input in inputs {
results.push(self.execute(input).await);
}
results
}
pub async fn pause(&mut self) -> AgentResult<()>
where
T: AgentLifecycle,
{
*self.state.write().await = RunnerState::Stopping;
self.agent
.pause()
.await
.map_err(|e| AgentError::Other(format!("Pause failed: {}", e)))?;
*self.state.write().await = RunnerState::Paused;
Ok(())
}
pub async fn resume(&mut self) -> AgentResult<()>
where
T: AgentLifecycle,
{
*self.state.write().await = RunnerState::Running;
self.agent
.resume()
.await
.map_err(|e| AgentError::Other(format!("Resume failed: {}", e)))?;
Ok(())
}
pub async fn shutdown(mut self) -> AgentResult<()> {
*self.state.write().await = RunnerState::Stopping;
self.agent
.shutdown()
.await
.map_err(|e| AgentError::ShutdownFailed(e.to_string()))?;
*self.state.write().await = RunnerState::Stopped;
Ok(())
}
pub async fn interrupt(&mut self) -> AgentResult<InterruptResult> {
self.agent
.interrupt()
.await
.map_err(|e| AgentError::Other(format!("Interrupt failed: {}", e)))
}
pub fn into_inner(self) -> T {
self.agent
}
pub fn id(&self) -> &str {
self.agent.id()
}
pub fn name(&self) -> &str {
self.agent.name()
}
pub fn capabilities(&self) -> &AgentCapabilities {
self.agent.capabilities()
}
pub fn agent_state(&self) -> AgentState {
self.agent.state()
}
}
impl<T: MoFAAgent + AgentMessaging> AgentRunner<T> {
pub async fn handle_event(&mut self, event: AgentEvent) -> AgentResult<()> {
self.agent.handle_event(event).await
}
pub async fn send_message(&mut self, msg: AgentMessage) -> AgentResult<AgentMessage> {
self.agent.handle_message(msg).await
}
}
pub struct AgentRunnerBuilder<T: MoFAAgent> {
agent: Option<T>,
context: Option<AgentContext>,
}
impl<T: MoFAAgent> AgentRunnerBuilder<T> {
pub fn new() -> Self {
Self {
agent: None,
context: None,
}
}
pub fn with_agent(mut self, agent: T) -> Self {
self.agent = Some(agent);
self
}
pub fn with_context(mut self, context: AgentContext) -> Self {
self.context = Some(context);
self
}
pub async fn build(self) -> AgentResult<AgentRunner<T>> {
let agent = self
.agent
.ok_or_else(|| AgentError::ValidationFailed("Agent not set".to_string()))?;
if let Some(context) = self.context {
AgentRunner::with_context(agent, context).await
} else {
AgentRunner::new(agent).await
}
}
}
impl<T: MoFAAgent> Default for AgentRunnerBuilder<T> {
fn default() -> Self {
Self::new()
}
}
pub async fn run_agents<T: MoFAAgent>(
agent: T,
inputs: Vec<AgentInput>,
) -> AgentResult<Vec<AgentOutput>> {
let mut runner = AgentRunner::new(agent).await?;
let results = runner.execute_batch(inputs).await;
runner.shutdown().await?;
results.into_iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::capabilities::AgentCapabilitiesBuilder;
struct TestAgent {
id: String,
name: String,
state: AgentState,
}
impl TestAgent {
fn new(id: &str, name: &str) -> Self {
Self {
id: id.to_string(),
name: name.to_string(),
state: AgentState::Created,
}
}
}
#[async_trait::async_trait]
impl MoFAAgent for TestAgent {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
&self.name
}
fn capabilities(&self) -> &AgentCapabilities {
static CAPS: std::sync::OnceLock<AgentCapabilities> = std::sync::OnceLock::new();
CAPS.get_or_init(|| AgentCapabilitiesBuilder::new().build())
}
async fn initialize(&mut self, _ctx: &AgentContext) -> AgentResult<()> {
self.state = AgentState::Ready;
Ok(())
}
async fn execute(
&mut self,
input: AgentInput,
_ctx: &AgentContext,
) -> AgentResult<AgentOutput> {
self.state = AgentState::Executing;
let text = input.to_text();
Ok(AgentOutput::text(format!("Echo: {}", text)))
}
async fn shutdown(&mut self) -> AgentResult<()> {
self.state = AgentState::Shutdown;
Ok(())
}
fn state(&self) -> AgentState {
self.state.clone()
}
}
#[tokio::test]
async fn test_agent_runner_new() {
let agent = TestAgent::new("test-001", "Test Agent");
let runner = AgentRunner::new(agent).await.unwrap();
assert_eq!(runner.id(), "test-001");
assert_eq!(runner.name(), "Test Agent");
assert_eq!(runner.state().await, RunnerState::Created);
}
#[tokio::test]
async fn test_agent_runner_execute() {
let agent = TestAgent::new("test-002", "Test Agent");
let mut runner = AgentRunner::new(agent).await.unwrap();
let input = AgentInput::text("Hello");
let output = runner.execute(input).await.unwrap();
assert_eq!(output.to_text(), "Echo: Hello");
let stats = runner.stats().await;
assert_eq!(stats.total_executions, 1);
assert_eq!(stats.successful_executions, 1);
}
#[tokio::test]
async fn test_run_agents_function() {
let agent = TestAgent::new("test-003", "Test Agent");
let inputs = vec![AgentInput::text("Test")];
let outputs = run_agents(agent, inputs).await.unwrap();
assert_eq!(outputs[0].to_text(), "Echo: Test");
}
}