use std::sync::Arc;
use flume::RecvError;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use crate::{buffer::Buffer, input::Input, output::Output, pipeline::Pipeline, Error, Message, MessageBatch};
use crate::input::InputBatch;
use crate::output::OutputBatch;
pub struct Stream {
input: Arc<dyn InputBatch>,
pipeline: Arc<Pipeline>,
output: Arc<dyn OutputBatch>,
buffer: Option<Arc<dyn Buffer>>,
thread_num: i32,
}
impl Stream {
pub fn new(
input: Arc<dyn InputBatch>,
pipeline: Pipeline,
output: Arc<dyn OutputBatch>,
buffer: Option<Arc<dyn Buffer>>,
thread_num: i32,
) -> Self {
Self {
input,
pipeline: Arc::new(pipeline),
output,
buffer,
thread_num,
}
}
pub async fn run(&mut self) -> Result<(), Error> {
self.input.connect().await?;
self.output.connect().await?;;
let (input_sender, input_receiver) = flume::bounded::<MessageBatch>(1000);
let (output_sender, output_receiver) = flume::bounded::<MessageBatch>(1000);
let input = Arc::clone(&self.input);
tokio::spawn(async move {
loop {
match input.read().await {
Ok(msg) => {
if let Err(e) = input_sender.send_async(msg).await {
error!("Failed to send input message: {}", e);
break;
}
}
Err(e) => {
match e {
Error::Done => {
drop(input_sender);
return;
}
_ => {
error!("1 {}", e);
return;
}
};
}
};
}
});
for i in 0..self.thread_num {
let pipeline = self.pipeline.clone();
let input_receiver = input_receiver.clone();
let output_sender = output_sender.clone();
tokio::spawn(async move {
let i = i + 1;
info!("Worker {} started", i);
loop {
match input_receiver.recv_async().await {
Ok(msg) => {
let processed = pipeline.process(msg).await;
match processed {
Ok(msgs) => {
for msg in msgs {
if let Err(e) = output_sender.send_async(msg).await {
error!("Failed to send processed message: {}", e);
break;
}
}
}
Err(e) => {
error!("2 {}", e)
}
}
}
Err(e) => {
break;
}
}
}
drop(output_sender);
info!("Worker {} stopped", i);
});
}
drop(output_sender);
loop {
let msg = match output_receiver.recv_async().await {
Ok(msg) => msg,
Err(_) => {
return Ok(());
}
};
if let Ok(_) = self.output.write(&msg).await {
self.input.acknowledge(&msg).await?
};
}
}
pub async fn close(&mut self) -> Result<(), Error> {
self.input.close().await?;
self.pipeline.close().await?;
if let Some(buffer) = &mut self.buffer {
buffer.close().await?;
}
self.output.close().await?;
Ok(())
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StreamConfig {
pub input: crate::input::InputConfig,
pub pipeline: crate::pipeline::PipelineConfig,
pub output: crate::output::OutputConfig,
pub buffer: Option<crate::buffer::BufferConfig>,
}
impl StreamConfig {
pub fn build(&self) -> Result<Stream, Error> {
let input = self.input.build()?;
let (pipeline, thread_num) = self.pipeline.build()?;
let output = self.output.build()?;
let buffer = if let Some(buffer_config) = &self.buffer {
Some(buffer_config.build()?)
} else {
None
};
Ok(Stream::new(input, pipeline, output, buffer, thread_num))
}
}