use crate::agent::OutputLine;
use crate::events::ExecutionEvent as ParallelEvent;
use crate::execution::apply::ApplyEventHandler;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
#[derive(Clone)]
pub struct ParallelApplyEventHandler {
#[allow(dead_code)]
change_id: String,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
}
impl ParallelApplyEventHandler {
pub fn new(change_id: String, event_tx: Option<mpsc::Sender<ParallelEvent>>) -> Self {
Self {
change_id,
event_tx,
}
}
}
impl ApplyEventHandler for ParallelApplyEventHandler {
fn on_apply_started(&self, change_id: &str, command: &str) {
info!("Apply started for {}: {}", change_id, command);
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::ApplyStarted {
change_id: change_id.to_string(),
command: command.to_string(),
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
fn on_progress_updated(&self, change_id: &str, completed: u32, total: u32) {
debug!(
"Progress updated for {}: {}/{}",
change_id, completed, total
);
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::ProgressUpdated {
change_id: change_id.to_string(),
completed,
total,
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
fn on_hook_started(&self, change_id: &str, hook_type: &str) {
debug!("Hook started for {}: {}", change_id, hook_type);
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::HookStarted {
change_id: change_id.to_string(),
hook_type: hook_type.to_string(),
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
fn on_hook_completed(&self, change_id: &str, hook_type: &str) {
debug!("Hook completed for {}: {}", change_id, hook_type);
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::HookCompleted {
change_id: change_id.to_string(),
hook_type: hook_type.to_string(),
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
fn on_hook_failed(&self, change_id: &str, hook_type: &str, error: &str) {
error!("Hook failed for {} ({}): {}", change_id, hook_type, error);
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::HookFailed {
change_id: change_id.to_string(),
hook_type: hook_type.to_string(),
error: error.to_string(),
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
fn on_apply_output(&self, change_id: &str, line: &OutputLine, iteration: u32) {
let output = match line {
OutputLine::Stdout(s) | OutputLine::Stderr(s) => s.clone(),
};
if let Some(ref tx) = self.event_tx {
let tx = tx.clone();
let event = ParallelEvent::ApplyOutput {
change_id: change_id.to_string(),
output,
iteration: Some(iteration),
};
tokio::spawn(async move {
let _ = tx.send(event).await;
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_parallel_apply_event_handler_with_channel() {
let (tx, mut rx) = mpsc::channel(10);
let handler = ParallelApplyEventHandler::new("test-change".to_string(), Some(tx));
handler.on_apply_started("test-change", "test command");
handler.on_progress_updated("test-change", 5, 10);
handler.on_hook_started("test-change", "pre_apply");
handler.on_hook_completed("test-change", "pre_apply");
for _ in 0..4 {
timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for event")
.expect("channel closed unexpectedly");
}
}
#[tokio::test]
async fn test_parallel_apply_event_handler_without_channel() {
let handler = ParallelApplyEventHandler::new("test-change".to_string(), None);
handler.on_apply_started("test-change", "test command");
handler.on_progress_updated("test-change", 5, 10);
handler.on_hook_started("test-change", "pre_apply");
handler.on_hook_completed("test-change", "pre_apply");
handler.on_hook_failed("test-change", "pre_apply", "test error");
}
}