use std::path::PathBuf;
use std::sync::{Arc, Mutex};
pub trait PhaseSink: Send + Sync {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError>;
fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
fn flush(&self) -> Result<(), StreamError>;
fn stats(&self) -> StreamStats;
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub items_emitted: u64,
pub bytes_sent: u64,
pub errors: u64,
pub phases_completed: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Connection error: {0}")]
Connection(String),
#[error("Backpressure: buffer full")]
BackpressureFull,
}
#[derive(Debug, Clone)]
pub enum StreamTarget {
Http {
url: String,
api_key: Option<String>,
batch_size: usize,
},
File {
path: PathBuf,
},
None,
}
#[derive(Debug, Clone, Default)]
pub enum BackpressureStrategy {
#[default]
Block,
DropOldest,
Buffer {
max_items: usize,
},
}
pub struct StreamPipeline {
target: StreamTarget,
stats: Arc<Mutex<StreamStats>>,
writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
}
impl StreamPipeline {
pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
StreamTarget::File { path } => {
let file = std::fs::File::create(path)?;
Some(Box::new(std::io::BufWriter::new(file)))
}
StreamTarget::Http { .. } => None,
StreamTarget::None => None,
};
Ok(Self {
target,
stats: Arc::new(Mutex::new(StreamStats::default())),
writer: Mutex::new(writer),
})
}
pub fn none() -> Self {
Self {
target: StreamTarget::None,
stats: Arc::new(Mutex::new(StreamStats::default())),
writer: Mutex::new(None),
}
}
pub fn is_active(&self) -> bool {
!matches!(self.target, StreamTarget::None)
}
}
impl PhaseSink for StreamPipeline {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError> {
if !self.is_active() {
return Ok(());
}
let envelope = serde_json::json!({
"phase": phase,
"item_type": item_type,
"data": item,
});
let json = serde_json::to_string(&envelope)
.map_err(|e| StreamError::Serialization(e.to_string()))?;
let bytes = json.len() as u64 + 1;
if let Ok(mut writer_guard) = self.writer.lock() {
if let Some(writer) = writer_guard.as_mut() {
use std::io::Write;
writeln!(writer, "{json}")?;
}
}
if let Ok(mut stats) = self.stats.lock() {
stats.items_emitted += 1;
stats.bytes_sent += bytes;
}
Ok(())
}
fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
if let Ok(mut stats) = self.stats.lock() {
stats.phases_completed += 1;
}
self.flush()
}
fn flush(&self) -> Result<(), StreamError> {
if let Ok(mut writer_guard) = self.writer.lock() {
if let Some(writer) = writer_guard.as_mut() {
use std::io::Write;
writer.flush()?;
}
}
Ok(())
}
fn stats(&self) -> StreamStats {
self.stats.lock().map(|s| s.clone()).unwrap_or_default()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_none_pipeline_is_inactive() {
let pipeline = StreamPipeline::none();
assert!(!pipeline.is_active());
}
#[test]
fn test_none_pipeline_emit_is_noop() {
let pipeline = StreamPipeline::none();
let item = serde_json::json!({"id": "noop"});
pipeline.emit("phase", "Type", &item).unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 0);
}
#[test]
fn test_file_pipeline_writes_jsonl() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
assert!(pipeline.is_active());
let item = serde_json::json!({"id": "test-001", "amount": 100.0});
pipeline
.emit("journal_entries", "JournalEntry", &item)
.unwrap();
pipeline.flush().unwrap();
let content = std::fs::read_to_string(&tmp).unwrap();
assert!(content.contains("test-001"));
assert!(content.contains("journal_entries"));
assert!(content.contains("JournalEntry"));
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_stats_increment() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item = serde_json::json!({"id": 1});
pipeline.emit("phase1", "Item", &item).unwrap();
pipeline.emit("phase1", "Item", &item).unwrap();
pipeline.phase_complete("phase1").unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 1);
assert!(stats.bytes_sent > 0);
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_multiple_phases() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item = serde_json::json!({"id": 1});
pipeline.emit("phase1", "A", &item).unwrap();
pipeline.phase_complete("phase1").unwrap();
pipeline.emit("phase2", "B", &item).unwrap();
pipeline.phase_complete("phase2").unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 2);
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_file_output_is_valid_jsonl() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item1 = serde_json::json!({"id": "a"});
let item2 = serde_json::json!({"id": "b"});
pipeline.emit("p", "T", &item1).unwrap();
pipeline.emit("p", "T", &item2).unwrap();
pipeline.flush().unwrap();
let content = std::fs::read_to_string(&tmp).unwrap();
for line in content.lines() {
let parsed: serde_json::Value =
serde_json::from_str(line).expect("each line should be valid JSON");
assert!(parsed.get("phase").is_some());
assert!(parsed.get("item_type").is_some());
assert!(parsed.get("data").is_some());
}
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_backpressure_strategy_default() {
let strategy = BackpressureStrategy::default();
assert!(matches!(strategy, BackpressureStrategy::Block));
}
pub struct MockPhaseSink {
pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
pub completed_phases: Mutex<Vec<String>>,
pub flushed: Mutex<bool>,
}
impl MockPhaseSink {
pub fn new() -> Self {
Self {
items: Mutex::new(Vec::new()),
completed_phases: Mutex::new(Vec::new()),
flushed: Mutex::new(false),
}
}
}
impl PhaseSink for MockPhaseSink {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError> {
self.items.lock().unwrap().push((
phase.to_string(),
item_type.to_string(),
item.clone(),
));
Ok(())
}
fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
self.completed_phases
.lock()
.unwrap()
.push(phase.to_string());
Ok(())
}
fn flush(&self) -> Result<(), StreamError> {
*self.flushed.lock().unwrap() = true;
Ok(())
}
fn stats(&self) -> StreamStats {
let items = self.items.lock().unwrap();
let phases = self.completed_phases.lock().unwrap();
StreamStats {
items_emitted: items.len() as u64,
phases_completed: phases.len() as u64,
bytes_sent: 0,
errors: 0,
}
}
}
#[test]
fn test_mock_phase_sink_records_emissions() {
let mock = MockPhaseSink::new();
let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
mock.emit("master_data", "Vendor", &item1).unwrap();
mock.emit("master_data", "Vendor", &item2).unwrap();
mock.phase_complete("master_data").unwrap();
let items = mock.items.lock().unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].0, "master_data");
assert_eq!(items[0].1, "Vendor");
assert_eq!(items[1].2["name"], "Global Parts");
let phases = mock.completed_phases.lock().unwrap();
assert_eq!(phases.len(), 1);
assert_eq!(phases[0], "master_data");
}
#[test]
fn test_mock_phase_sink_multi_phase_emission() {
let mock = MockPhaseSink::new();
let je = serde_json::json!({"entry_id": "JE-001"});
let anomaly = serde_json::json!({"label": "DuplicateEntry"});
mock.emit("journal_entries", "JournalEntry", &je).unwrap();
mock.phase_complete("journal_entries").unwrap();
mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
.unwrap();
mock.phase_complete("anomaly_injection").unwrap();
mock.flush().unwrap();
let stats = mock.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 2);
assert!(*mock.flushed.lock().unwrap());
let items = mock.items.lock().unwrap();
assert_eq!(items[0].0, "journal_entries");
assert_eq!(items[1].0, "anomaly_injection");
}
}