use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Instant;
pub trait PhaseSink: Send + Sync {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError>;
fn phase_complete(&self, phase: &str) -> Result<(), StreamError>;
fn flush(&self) -> Result<(), StreamError>;
fn stats(&self) -> StreamStats;
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub items_emitted: u64,
pub bytes_sent: u64,
pub errors: u64,
pub phases_completed: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Connection error: {0}")]
Connection(String),
#[error("Backpressure: buffer full")]
BackpressureFull,
}
#[derive(Debug, Clone)]
pub enum StreamTarget {
Http {
url: String,
api_key: Option<String>,
batch_size: usize,
},
File {
path: PathBuf,
},
None,
}
#[derive(Debug, Clone, Default)]
pub enum BackpressureStrategy {
#[default]
Block,
DropOldest,
Buffer {
max_items: usize,
},
}
pub struct StreamPipeline {
target: StreamTarget,
stats: Arc<Mutex<StreamStats>>,
writer: Mutex<Option<Box<dyn std::io::Write + Send>>>,
}
impl StreamPipeline {
pub fn new(target: StreamTarget) -> Result<Self, StreamError> {
let writer: Option<Box<dyn std::io::Write + Send>> = match &target {
StreamTarget::File { path } => {
let file = std::fs::File::create(path)?;
Some(Box::new(std::io::BufWriter::new(file)))
}
StreamTarget::Http { .. } => None,
StreamTarget::None => None,
};
Ok(Self {
target,
stats: Arc::new(Mutex::new(StreamStats::default())),
writer: Mutex::new(writer),
})
}
pub fn none() -> Self {
Self {
target: StreamTarget::None,
stats: Arc::new(Mutex::new(StreamStats::default())),
writer: Mutex::new(None),
}
}
pub fn is_active(&self) -> bool {
!matches!(self.target, StreamTarget::None)
}
}
impl PhaseSink for StreamPipeline {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError> {
if !self.is_active() {
return Ok(());
}
let envelope = serde_json::json!({
"phase": phase,
"item_type": item_type,
"data": item,
});
let json = serde_json::to_string(&envelope)
.map_err(|e| StreamError::Serialization(e.to_string()))?;
let bytes = json.len() as u64 + 1;
if let Ok(mut writer_guard) = self.writer.lock() {
if let Some(writer) = writer_guard.as_mut() {
use std::io::Write;
writeln!(writer, "{json}")?;
}
}
if let Ok(mut stats) = self.stats.lock() {
stats.items_emitted += 1;
stats.bytes_sent += bytes;
}
Ok(())
}
fn phase_complete(&self, _phase: &str) -> Result<(), StreamError> {
if let Ok(mut stats) = self.stats.lock() {
stats.phases_completed += 1;
}
self.flush()
}
fn flush(&self) -> Result<(), StreamError> {
if let Ok(mut writer_guard) = self.writer.lock() {
if let Some(writer) = writer_guard.as_mut() {
use std::io::Write;
writer.flush()?;
}
}
Ok(())
}
fn stats(&self) -> StreamStats {
self.stats.lock().map(|s| s.clone()).unwrap_or_default()
}
}
pub struct RateLimitedPipeline {
inner: Box<dyn PhaseSink>,
limiter: Mutex<datasynth_core::rate_limit::RateLimiter>,
sequence: std::sync::atomic::AtomicU64,
progress_interval: u64,
start_time: Instant,
}
impl RateLimitedPipeline {
pub fn new(
inner: Box<dyn PhaseSink>,
events_per_second: f64,
burst_size: u32,
progress_interval: u64,
) -> Self {
let config = if events_per_second > 0.0 {
datasynth_core::rate_limit::RateLimitConfig {
entities_per_second: events_per_second,
burst_size,
backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
enabled: true,
}
} else {
datasynth_core::rate_limit::RateLimitConfig {
enabled: false,
..Default::default()
}
};
Self {
inner,
limiter: Mutex::new(datasynth_core::rate_limit::RateLimiter::new(config)),
sequence: std::sync::atomic::AtomicU64::new(0),
progress_interval,
start_time: Instant::now(),
}
}
pub fn set_rate(&self, events_per_second: f64) {
if let Ok(mut limiter) = self.limiter.lock() {
*limiter = datasynth_core::rate_limit::RateLimiter::new(
datasynth_core::rate_limit::RateLimitConfig {
entities_per_second: events_per_second,
burst_size: 100,
backpressure: datasynth_core::rate_limit::RateLimitBackpressure::Block,
enabled: events_per_second > 0.0,
},
);
}
}
}
impl PhaseSink for RateLimitedPipeline {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError> {
if let Ok(mut limiter) = self.limiter.lock() {
limiter.acquire();
}
let seq = self
.sequence
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.inner.emit(phase, item_type, item)?;
if self.progress_interval > 0 && seq > 0 && seq.is_multiple_of(self.progress_interval) {
let elapsed = self.start_time.elapsed();
let rate = if elapsed.as_secs_f64() > 0.0 {
seq as f64 / elapsed.as_secs_f64()
} else {
0.0
};
let progress = serde_json::json!({
"type": "_progress",
"items_emitted": seq,
"rate_actual": (rate * 100.0).round() / 100.0,
"elapsed_ms": elapsed.as_millis() as u64,
});
self.inner.emit("_progress", "StreamProgress", &progress)?;
}
Ok(())
}
fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
self.inner.phase_complete(phase)
}
fn flush(&self) -> Result<(), StreamError> {
self.inner.flush()
}
fn stats(&self) -> StreamStats {
let mut stats = self.inner.stats();
stats.items_emitted = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
stats
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_none_pipeline_is_inactive() {
let pipeline = StreamPipeline::none();
assert!(!pipeline.is_active());
}
#[test]
fn test_none_pipeline_emit_is_noop() {
let pipeline = StreamPipeline::none();
let item = serde_json::json!({"id": "noop"});
pipeline.emit("phase", "Type", &item).unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 0);
}
#[test]
fn test_file_pipeline_writes_jsonl() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_writes.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
assert!(pipeline.is_active());
let item = serde_json::json!({"id": "test-001", "amount": 100.0});
pipeline
.emit("journal_entries", "JournalEntry", &item)
.unwrap();
pipeline.flush().unwrap();
let content = std::fs::read_to_string(&tmp).unwrap();
assert!(content.contains("test-001"));
assert!(content.contains("journal_entries"));
assert!(content.contains("JournalEntry"));
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_stats_increment() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_stats.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item = serde_json::json!({"id": 1});
pipeline.emit("phase1", "Item", &item).unwrap();
pipeline.emit("phase1", "Item", &item).unwrap();
pipeline.phase_complete("phase1").unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 1);
assert!(stats.bytes_sent > 0);
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_multiple_phases() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_phases.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item = serde_json::json!({"id": 1});
pipeline.emit("phase1", "A", &item).unwrap();
pipeline.phase_complete("phase1").unwrap();
pipeline.emit("phase2", "B", &item).unwrap();
pipeline.phase_complete("phase2").unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 2);
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_file_output_is_valid_jsonl() {
let tmp = std::env::temp_dir().join("test_stream_pipeline_valid_jsonl.jsonl");
let pipeline = StreamPipeline::new(StreamTarget::File { path: tmp.clone() }).unwrap();
let item1 = serde_json::json!({"id": "a"});
let item2 = serde_json::json!({"id": "b"});
pipeline.emit("p", "T", &item1).unwrap();
pipeline.emit("p", "T", &item2).unwrap();
pipeline.flush().unwrap();
let content = std::fs::read_to_string(&tmp).unwrap();
for line in content.lines() {
let parsed: serde_json::Value =
serde_json::from_str(line).expect("each line should be valid JSON");
assert!(parsed.get("phase").is_some());
assert!(parsed.get("item_type").is_some());
assert!(parsed.get("data").is_some());
}
let _ = std::fs::remove_file(&tmp);
}
#[test]
fn test_backpressure_strategy_default() {
let strategy = BackpressureStrategy::default();
assert!(matches!(strategy, BackpressureStrategy::Block));
}
pub struct MockPhaseSink {
pub items: Mutex<Vec<(String, String, serde_json::Value)>>,
pub completed_phases: Mutex<Vec<String>>,
pub flushed: Mutex<bool>,
}
impl MockPhaseSink {
pub fn new() -> Self {
Self {
items: Mutex::new(Vec::new()),
completed_phases: Mutex::new(Vec::new()),
flushed: Mutex::new(false),
}
}
}
impl PhaseSink for MockPhaseSink {
fn emit(
&self,
phase: &str,
item_type: &str,
item: &serde_json::Value,
) -> Result<(), StreamError> {
self.items.lock().unwrap().push((
phase.to_string(),
item_type.to_string(),
item.clone(),
));
Ok(())
}
fn phase_complete(&self, phase: &str) -> Result<(), StreamError> {
self.completed_phases
.lock()
.unwrap()
.push(phase.to_string());
Ok(())
}
fn flush(&self) -> Result<(), StreamError> {
*self.flushed.lock().unwrap() = true;
Ok(())
}
fn stats(&self) -> StreamStats {
let items = self.items.lock().unwrap();
let phases = self.completed_phases.lock().unwrap();
StreamStats {
items_emitted: items.len() as u64,
phases_completed: phases.len() as u64,
bytes_sent: 0,
errors: 0,
}
}
}
#[test]
fn test_mock_phase_sink_records_emissions() {
let mock = MockPhaseSink::new();
let item1 = serde_json::json!({"id": "V001", "name": "Acme Corp"});
let item2 = serde_json::json!({"id": "V002", "name": "Global Parts"});
mock.emit("master_data", "Vendor", &item1).unwrap();
mock.emit("master_data", "Vendor", &item2).unwrap();
mock.phase_complete("master_data").unwrap();
let items = mock.items.lock().unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].0, "master_data");
assert_eq!(items[0].1, "Vendor");
assert_eq!(items[1].2["name"], "Global Parts");
let phases = mock.completed_phases.lock().unwrap();
assert_eq!(phases.len(), 1);
assert_eq!(phases[0], "master_data");
}
#[test]
fn test_mock_phase_sink_multi_phase_emission() {
let mock = MockPhaseSink::new();
let je = serde_json::json!({"entry_id": "JE-001"});
let anomaly = serde_json::json!({"label": "DuplicateEntry"});
mock.emit("journal_entries", "JournalEntry", &je).unwrap();
mock.phase_complete("journal_entries").unwrap();
mock.emit("anomaly_injection", "LabeledAnomaly", &anomaly)
.unwrap();
mock.phase_complete("anomaly_injection").unwrap();
mock.flush().unwrap();
let stats = mock.stats();
assert_eq!(stats.items_emitted, 2);
assert_eq!(stats.phases_completed, 2);
assert!(*mock.flushed.lock().unwrap());
let items = mock.items.lock().unwrap();
assert_eq!(items[0].0, "journal_entries");
assert_eq!(items[1].0, "anomaly_injection");
}
#[test]
fn test_rate_limited_pipeline_emits_and_tracks_sequence() {
let mock = MockPhaseSink::new();
let pipeline = RateLimitedPipeline::new(
Box::new(mock),
0.0, 100,
0, );
let item = serde_json::json!({"id": "test"});
pipeline.emit("phase", "Type", &item).unwrap();
pipeline.emit("phase", "Type", &item).unwrap();
pipeline.emit("phase", "Type", &item).unwrap();
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 3);
}
#[test]
fn test_rate_limited_pipeline_emits_progress() {
let mock = MockPhaseSink::new();
let pipeline = RateLimitedPipeline::new(
Box::new(mock),
0.0, 100,
5, );
let item = serde_json::json!({"id": "test"});
for _ in 0..10 {
pipeline.emit("phase", "Type", &item).unwrap();
}
let stats = pipeline.stats();
assert_eq!(stats.items_emitted, 10);
}
#[test]
fn test_rate_limited_pipeline_respects_rate() {
let mock = MockPhaseSink::new();
let pipeline = RateLimitedPipeline::new(
Box::new(mock),
100.0, 10,
0,
);
let item = serde_json::json!({"id": "test"});
let start = Instant::now();
for _ in 0..15 {
pipeline.emit("phase", "Type", &item).unwrap();
}
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() >= 30,
"expected rate limiting, got {:?}",
elapsed
);
}
#[test]
fn test_rate_limited_pipeline_dynamic_rate_change() {
let mock = MockPhaseSink::new();
let pipeline = RateLimitedPipeline::new(
Box::new(mock),
0.0, 100,
0,
);
let item = serde_json::json!({"id": "test"});
pipeline.emit("phase", "Type", &item).unwrap();
pipeline.set_rate(50.0);
pipeline.emit("phase", "Type", &item).unwrap();
assert_eq!(pipeline.stats().items_emitted, 2);
}
}