use std::sync::Arc;
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NodeInfo {
pub id: String,
pub node_type: String,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProgressOutputFile {
pub name: String,
#[serde(skip)]
pub data: crate::processor::FileData,
pub mime_type: String,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum PipelineEvent {
#[serde(rename_all = "camelCase")]
PipelineStarted {
total_nodes: usize,
total_files: usize,
nodes: Vec<NodeInfo>,
},
#[serde(rename_all = "camelCase")]
NodeStarted {
node_id: String,
node_index: usize,
total_nodes: usize,
node_type: String,
parent_node_id: Option<String>,
},
#[serde(rename_all = "camelCase")]
IterationStarted {
node_id: String,
iteration: usize,
total_iterations: usize,
},
#[serde(rename_all = "camelCase")]
IterationCompleted {
node_id: String,
iteration: usize,
total_iterations: usize,
duration_ms: u64,
files_produced: usize,
#[serde(skip)]
output_files: Vec<ProgressOutputFile>,
},
#[serde(rename_all = "camelCase")]
IterationFailed {
node_id: String,
iteration: usize,
total_iterations: usize,
error: String,
},
#[serde(rename_all = "camelCase")]
FileProgress {
node_id: String,
file_index: usize,
total_files: usize,
percent: u32,
message: String,
},
#[serde(rename_all = "camelCase")]
NodeCompleted {
node_id: String,
duration_ms: u64,
files_processed: usize,
parent_node_id: Option<String>,
},
#[serde(rename_all = "camelCase")]
NodeFailed {
node_id: String,
error: String,
parent_node_id: Option<String>,
},
#[serde(rename_all = "camelCase")]
PipelineCompleted {
duration_ms: u64,
total_files_processed: usize,
},
#[serde(rename_all = "camelCase")]
PipelineFailed {
node_id: String,
error: String,
},
#[serde(rename_all = "camelCase")]
CommandOutput {
node_id: String,
line: String,
},
}
pub struct PipelineReporter {
callback: Option<Arc<dyn Fn(PipelineEvent)>>,
}
impl Clone for PipelineReporter {
fn clone(&self) -> Self {
Self {
callback: self.callback.clone(),
}
}
}
impl PipelineReporter {
pub fn new(callback: impl Fn(PipelineEvent) + 'static) -> Self {
Self {
callback: Some(Arc::new(callback)),
}
}
pub fn new_noop() -> Self {
Self { callback: None }
}
pub fn emit(&self, event: PipelineEvent) {
if let Some(cb) = &self.callback {
cb(event);
}
}
}
#[cfg(test)]
pub struct RecordingReporter {
events: std::sync::Arc<std::sync::Mutex<Vec<PipelineEvent>>>,
}
#[cfg(test)]
impl Default for RecordingReporter {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
impl RecordingReporter {
pub fn new() -> Self {
Self {
events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub fn reporter(&self) -> PipelineReporter {
let events = std::sync::Arc::clone(&self.events);
PipelineReporter::new(move |event| {
events.lock().unwrap().push(event);
})
}
pub fn events(&self) -> Vec<PipelineEvent> {
self.events.lock().unwrap().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_started_serializes_correctly() {
let event = PipelineEvent::PipelineStarted {
total_nodes: 3,
total_files: 10,
nodes: vec![
NodeInfo {
id: "n1".into(),
node_type: "image-compress".into(),
},
NodeInfo {
id: "n2".into(),
node_type: "file-rename".into(),
},
NodeInfo {
id: "n3".into(),
node_type: "image-resize".into(),
},
],
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "PipelineStarted");
assert_eq!(json["totalNodes"], 3);
assert_eq!(json["totalFiles"], 10);
let nodes = json["nodes"].as_array().unwrap();
assert_eq!(nodes.len(), 3);
assert_eq!(nodes[0]["id"], "n1");
assert_eq!(nodes[0]["nodeType"], "image-compress");
}
#[test]
fn test_node_started_serializes_correctly() {
let event = PipelineEvent::NodeStarted {
node_id: "node-1".to_string(),
node_index: 0,
total_nodes: 3,
node_type: "image-compress".to_string(),
parent_node_id: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "NodeStarted");
assert_eq!(json["nodeId"], "node-1");
assert_eq!(json["nodeIndex"], 0);
assert_eq!(json["totalNodes"], 3);
assert_eq!(json["nodeType"], "image-compress");
assert_eq!(json["parentNodeId"], serde_json::Value::Null);
}
#[test]
fn test_node_started_with_parent_serializes_correctly() {
let event = PipelineEvent::NodeStarted {
node_id: "child-1".to_string(),
node_index: 0,
total_nodes: 1,
node_type: "shell-command".to_string(),
parent_node_id: Some("loop-1".to_string()),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "NodeStarted");
assert_eq!(json["parentNodeId"], "loop-1");
}
#[test]
fn test_iteration_started_serializes_correctly() {
let event = PipelineEvent::IterationStarted {
node_id: "loop-1".to_string(),
iteration: 2,
total_iterations: 15,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "IterationStarted");
assert_eq!(json["nodeId"], "loop-1");
assert_eq!(json["iteration"], 2);
assert_eq!(json["totalIterations"], 15);
}
#[test]
fn test_iteration_completed_serializes_correctly() {
let event = PipelineEvent::IterationCompleted {
node_id: "loop-1".to_string(),
iteration: 2,
total_iterations: 7,
duration_ms: 1500,
files_produced: 1,
output_files: Vec::new(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "IterationCompleted");
assert_eq!(json["nodeId"], "loop-1");
assert_eq!(json["iteration"], 2);
assert_eq!(json["totalIterations"], 7);
assert_eq!(json["durationMs"], 1500);
assert_eq!(json["filesProduced"], 1);
}
#[test]
fn test_iteration_completed_skips_output_files_in_json() {
let event = PipelineEvent::IterationCompleted {
node_id: "loop-1".to_string(),
iteration: 0,
total_iterations: 1,
duration_ms: 100,
files_produced: 1,
output_files: vec![ProgressOutputFile {
name: "video.mp4".to_string(),
data: crate::processor::FileData::Bytes(vec![0u8; 100]),
mime_type: "video/mp4".to_string(),
}],
};
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("outputFiles").is_none());
assert_eq!(json["filesProduced"], 1);
}
#[test]
fn test_progress_output_file_serializes_without_data() {
let file = ProgressOutputFile {
name: "video.mp4".to_string(),
data: crate::processor::FileData::Bytes(vec![0u8; 1024]),
mime_type: "video/mp4".to_string(),
};
let json = serde_json::to_value(&file).unwrap();
assert_eq!(json["name"], "video.mp4");
assert_eq!(json["mimeType"], "video/mp4");
assert!(json.get("data").is_none());
}
#[test]
fn test_iteration_failed_serializes_correctly() {
let event = PipelineEvent::IterationFailed {
node_id: "loop-1".to_string(),
iteration: 3,
total_iterations: 10,
error: "Download timed out".to_string(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "IterationFailed");
assert_eq!(json["nodeId"], "loop-1");
assert_eq!(json["iteration"], 3);
assert_eq!(json["totalIterations"], 10);
assert_eq!(json["error"], "Download timed out");
}
#[test]
fn test_file_progress_serializes_correctly() {
let event = PipelineEvent::FileProgress {
node_id: "node-2".to_string(),
file_index: 2,
total_files: 5,
percent: 75,
message: "Compressing photo.jpg...".to_string(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "FileProgress");
assert_eq!(json["nodeId"], "node-2");
assert_eq!(json["fileIndex"], 2);
assert_eq!(json["totalFiles"], 5);
assert_eq!(json["percent"], 75);
assert_eq!(json["message"], "Compressing photo.jpg...");
}
#[test]
fn test_node_completed_serializes_correctly() {
let event = PipelineEvent::NodeCompleted {
node_id: "node-1".to_string(),
duration_ms: 1234,
files_processed: 5,
parent_node_id: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "NodeCompleted");
assert_eq!(json["nodeId"], "node-1");
assert_eq!(json["durationMs"], 1234);
assert_eq!(json["filesProcessed"], 5);
}
#[test]
fn test_node_failed_serializes_correctly() {
let event = PipelineEvent::NodeFailed {
node_id: "node-3".to_string(),
error: "Unsupported format: BMP".to_string(),
parent_node_id: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "NodeFailed");
assert_eq!(json["nodeId"], "node-3");
assert_eq!(json["error"], "Unsupported format: BMP");
}
#[test]
fn test_pipeline_completed_serializes_correctly() {
let event = PipelineEvent::PipelineCompleted {
duration_ms: 5678,
total_files_processed: 10,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "PipelineCompleted");
assert_eq!(json["durationMs"], 5678);
assert_eq!(json["totalFilesProcessed"], 10);
}
#[test]
fn test_pipeline_failed_serializes_correctly() {
let event = PipelineEvent::PipelineFailed {
node_id: "node-2".to_string(),
error: "Processing failed: out of memory".to_string(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "PipelineFailed");
assert_eq!(json["nodeId"], "node-2");
assert_eq!(json["error"], "Processing failed: out of memory");
}
#[test]
fn test_command_output_serializes_correctly() {
let event = PipelineEvent::CommandOutput {
node_id: "node-0".to_string(),
line: "[download] 34.2% of ~150MiB".to_string(),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "CommandOutput");
assert_eq!(json["nodeId"], "node-0");
assert_eq!(json["line"], "[download] 34.2% of ~150MiB");
}
#[test]
fn test_pipeline_reporter_is_cloneable() {
let received = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let received_clone = std::sync::Arc::clone(&received);
let reporter = PipelineReporter::new(move |event| {
received_clone.lock().unwrap().push(event);
});
let reporter2 = reporter.clone();
reporter.emit(PipelineEvent::PipelineStarted {
total_nodes: 1,
total_files: 1,
nodes: vec![],
});
reporter2.emit(PipelineEvent::CommandOutput {
node_id: "n".to_string(),
line: "hello".to_string(),
});
let events = received.lock().unwrap();
assert_eq!(events.len(), 2);
}
#[test]
fn test_noop_reporter_doesnt_panic() {
let reporter = PipelineReporter::new_noop();
reporter.emit(PipelineEvent::PipelineStarted {
total_nodes: 1,
total_files: 1,
nodes: vec![],
});
reporter.emit(PipelineEvent::PipelineCompleted {
duration_ms: 100,
total_files_processed: 1,
});
}
#[test]
fn test_recording_reporter_captures_events() {
let recorder = RecordingReporter::new();
let reporter = recorder.reporter();
reporter.emit(PipelineEvent::PipelineStarted {
total_nodes: 2,
total_files: 3,
nodes: vec![],
});
reporter.emit(PipelineEvent::NodeStarted {
node_id: "n1".to_string(),
node_index: 0,
total_nodes: 2,
node_type: "image-compress".to_string(),
parent_node_id: None,
});
reporter.emit(PipelineEvent::PipelineCompleted {
duration_ms: 500,
total_files_processed: 3,
});
let events = recorder.events();
assert_eq!(events.len(), 3);
assert!(matches!(events[0], PipelineEvent::PipelineStarted { .. }));
assert!(matches!(events[1], PipelineEvent::NodeStarted { .. }));
assert!(matches!(events[2], PipelineEvent::PipelineCompleted { .. }));
}
#[test]
fn test_reporter_calls_callback() {
let received = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let received_clone = std::sync::Arc::clone(&received);
let reporter = PipelineReporter::new(move |event| {
received_clone.lock().unwrap().push(event);
});
reporter.emit(PipelineEvent::PipelineStarted {
total_nodes: 1,
total_files: 1,
nodes: vec![],
});
let events = received.lock().unwrap();
assert_eq!(events.len(), 1);
if let PipelineEvent::PipelineStarted {
total_nodes,
total_files,
..
} = &events[0]
{
assert_eq!(*total_nodes, 1);
assert_eq!(*total_files, 1);
} else {
panic!("Expected PipelineStarted event");
}
}
}