use super::{EventStream, Runner, RunnerError};
use crate::framework::analyzers::Analyzer;
use async_trait::async_trait;
use futures::stream::select_all;
pub struct AgentRunner {
runners: Vec<Box<dyn Runner>>,
analyzers: Vec<Box<dyn Analyzer>>,
}
impl AgentRunner {
pub fn new() -> Self {
Self {
runners: Vec::new(),
analyzers: Vec::new(),
}
}
pub fn add_runner(mut self, runner: Box<dyn Runner>) -> Self {
self.runners.push(runner);
self
}
pub fn add_global_analyzer(mut self, analyzer: Box<dyn Analyzer>) -> Self {
self.analyzers.push(analyzer);
self
}
pub fn runner_count(&self) -> usize {
self.runners.len()
}
pub fn analyzer_count(&self) -> usize {
self.analyzers.len()
}
}
#[async_trait]
impl Runner for AgentRunner {
async fn run(&mut self) -> Result<EventStream, RunnerError> {
if self.runners.is_empty() {
return Err("No runners configured for AgentRunner".into());
}
let mut streams = Vec::new();
for runner in &mut self.runners {
let stream = runner.run().await?;
streams.push(stream);
}
let merged_stream = select_all(streams);
let mut final_stream = Box::pin(merged_stream) as EventStream;
for analyzer in &mut self.analyzers {
final_stream = analyzer
.process(final_stream)
.await
.map_err(|e| format!("Global analyzer error: {}", e))?;
}
Ok(final_stream)
}
fn add_analyzer(mut self, analyzer: Box<dyn Analyzer>) -> Self {
self.analyzers.push(analyzer);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::framework::analyzers::{FileLogger, HTTPParser, SSEProcessor};
use crate::framework::runners::FakeRunner;
use futures::stream::StreamExt;
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::time::timeout;
#[tokio::test]
async fn test_agent_runner_basic_composition() {
let fake_runner1 = FakeRunner::new().event_count(2).delay_ms(10);
let fake_runner2 = FakeRunner::new().event_count(3).delay_ms(15);
let mut agent = AgentRunner::new()
.add_runner(Box::new(fake_runner1))
.add_runner(Box::new(fake_runner2));
assert_eq!(agent.runner_count(), 2);
assert_eq!(agent.analyzer_count(), 0);
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
assert_eq!(events.len(), 10);
let ssl_events = events.iter().filter(|e| e.source == "ssl").count();
assert_eq!(ssl_events, 10); }
#[tokio::test]
async fn test_agent_runner_with_global_analyzers() {
let temp_file = NamedTempFile::new().unwrap();
let fake_runner = FakeRunner::new().event_count(2).delay_ms(10);
let mut agent = AgentRunner::new()
.add_runner(Box::new(fake_runner))
.add_global_analyzer(Box::new(FileLogger::new(temp_file.path()).unwrap()));
assert_eq!(agent.runner_count(), 1);
assert_eq!(agent.analyzer_count(), 1);
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
assert_eq!(events.len(), 4);
let file_size = std::fs::metadata(temp_file.path()).unwrap().len();
assert!(file_size > 0, "Log file should have content");
}
#[tokio::test]
async fn test_agent_runner_multiple_runners_with_analyzers() {
let fake_runner1 = FakeRunner::new()
.event_count(1)
.delay_ms(10)
.add_analyzer(Box::new(SSEProcessor::new_with_timeout(5000)));
let fake_runner2 = FakeRunner::new()
.event_count(1)
.delay_ms(10)
.add_analyzer(Box::new(HTTPParser::new()));
let mut agent = AgentRunner::new()
.add_runner(Box::new(fake_runner1))
.add_runner(Box::new(fake_runner2));
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
assert!(events.len() >= 4, "Should have at least 4 events");
let sources: std::collections::HashSet<_> = events.iter().map(|e| &e.source).collect();
assert!(
!sources.is_empty(),
"Should have events from at least one source"
);
}
#[tokio::test]
async fn test_agent_runner_empty_runners() {
let mut agent = AgentRunner::new();
assert_eq!(agent.runner_count(), 0);
let result = agent.run().await;
assert!(result.is_err(), "Should fail with no runners configured");
if let Err(e) = result {
let error_msg = format!("{}", e);
assert!(
error_msg.contains("No runners configured"),
"Should have descriptive error message"
);
}
}
#[tokio::test]
async fn test_agent_runner_streaming_behavior() {
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
let event_timestamps = Arc::new(Mutex::new(Vec::new()));
struct TimestampRecorder {
timestamps: Arc<Mutex<Vec<Instant>>>,
}
impl TimestampRecorder {
fn new(timestamps: Arc<Mutex<Vec<Instant>>>) -> Self {
Self { timestamps }
}
}
#[async_trait]
impl Analyzer for TimestampRecorder {
async fn process(
&mut self,
stream: EventStream,
) -> Result<EventStream, Box<dyn std::error::Error + Send + Sync>> {
let timestamps = self.timestamps.clone();
let recorded_stream = stream.map(move |event| {
let timestamps_clone = timestamps.clone();
tokio::spawn(async move {
let mut guard = timestamps_clone.lock().await;
guard.push(Instant::now());
});
event
});
Ok(Box::pin(recorded_stream))
}
}
let fake_runner = FakeRunner::new().event_count(3).delay_ms(50);
let mut agent = AgentRunner::new()
.add_runner(Box::new(fake_runner))
.add_global_analyzer(Box::new(TimestampRecorder::new(Arc::clone(
&event_timestamps,
))));
let start_time = Instant::now();
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
let total_time = start_time.elapsed();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(events.len(), 6);
let timestamps_guard = event_timestamps.lock().await;
assert!(
timestamps_guard.len() >= 3,
"Should have recorded multiple timestamps"
);
assert!(
total_time >= Duration::from_millis(100),
"Should demonstrate streaming behavior"
);
}
#[tokio::test]
async fn test_agent_runner_concurrent_processing() {
let mut handles = Vec::new();
for i in 0..3 {
let handle = tokio::spawn(async move {
let fake_runner = FakeRunner::new().event_count(2).delay_ms(10);
let mut agent = AgentRunner::new().add_runner(Box::new(fake_runner));
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
(i, events.len())
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
assert_eq!(results.len(), 3);
for (agent_id, event_count) in results {
assert_eq!(
event_count, 4,
"Agent {} should have 4 events (2 event_count × 2 events)",
agent_id
);
}
}
#[tokio::test]
async fn test_agent_runner_error_handling() {
struct FailingRunner;
#[async_trait]
impl Runner for FailingRunner {
async fn run(&mut self) -> Result<EventStream, RunnerError> {
Err("Simulated runner failure".into())
}
fn add_analyzer(self, _analyzer: Box<dyn Analyzer>) -> Self {
self
}
}
let mut agent = AgentRunner::new().add_runner(Box::new(FailingRunner));
let result = agent.run().await;
assert!(result.is_err(), "Should propagate runner error");
if let Err(e) = result {
let error_msg = format!("{}", e);
assert!(
error_msg.contains("Simulated runner failure"),
"Should contain original error message"
);
}
}
#[tokio::test]
async fn test_agent_runner_with_timeout() {
let fake_runner = FakeRunner::new().event_count(5).delay_ms(10);
let mut agent = AgentRunner::new().add_runner(Box::new(fake_runner));
let result = timeout(Duration::from_secs(5), async {
let stream = agent.run().await.unwrap();
let events: Vec<_> = stream.collect().await;
events.len()
})
.await;
assert!(result.is_ok(), "AgentRunner should complete within timeout");
assert_eq!(
result.unwrap(),
10,
"Should process all events (5 event_count × 2 events)"
);
}
#[tokio::test]
async fn test_agent_runner_fluent_interface() {
let fake_runner1 = FakeRunner::new().event_count(1).delay_ms(10);
let fake_runner2 = FakeRunner::new().event_count(1).delay_ms(10);
let temp_file1 = NamedTempFile::new().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let agent = AgentRunner::new()
.add_runner(Box::new(fake_runner1))
.add_runner(Box::new(fake_runner2))
.add_global_analyzer(Box::new(FileLogger::new(temp_file1.path()).unwrap()))
.add_analyzer(Box::new(FileLogger::new(temp_file2.path()).unwrap()));
assert_eq!(agent.runner_count(), 2);
assert_eq!(agent.analyzer_count(), 2); }
}