use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{Error, Message, MessageBatch};
pub mod batch;
pub mod sql;
#[async_trait]
pub trait Processor: Send + Sync {
async fn process(&self, msg: Message) -> Result<Vec<Message>, Error>;
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait ProcessorBatch: Send + Sync {
async fn process(&self, msg: MessageBatch) -> Result<Vec<MessageBatch>, Error>;
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
impl<T> ProcessorBatch for T
where
T: Processor,
{
async fn process(&self, msg: MessageBatch) -> Result<Vec<MessageBatch>, Error> {
let mut vec: Vec<MessageBatch> = vec![];
for x in msg.0 {
vec.push(self.process(x).await?.into())
}
Ok(vec.into())
}
async fn close(&self) -> Result<(), Error> {
self.close().await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ProcessorConfig {
Batch(batch::BatchProcessorConfig),
Sql(sql::SqlProcessorConfig),
}
impl ProcessorConfig {
pub fn build(&self) -> Result<Arc<dyn ProcessorBatch>, Error> {
match self {
ProcessorConfig::Batch(config) => Ok(Arc::new(batch::BatchProcessor::new(config)?)),
ProcessorConfig::Sql(config) => Ok(Arc::new(sql::SqlProcessor::new(config)?)),
}
}
}