use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{Error, Message, MessageBatch};
pub mod file;
pub mod http;
pub mod kafka;
pub mod memory;
#[async_trait]
pub trait Input: Send + Sync {
async fn connect(&self) -> Result<(), Error>;
async fn read(&self) -> Result<Message, Error>;
async fn acknowledge(&self, msg: &Message) -> Result<(), Error>;
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait InputBatch: Send + Sync {
async fn connect(&self) -> Result<(), Error>;
async fn read(&self) -> Result<MessageBatch, Error>;
async fn acknowledge(&self, msg: &[Message]) -> Result<(), Error>;
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
impl<T> InputBatch for T
where
T: Input,
{
async fn connect(&self) -> Result<(), Error> {
self.connect().await
}
async fn read(&self) -> Result<MessageBatch, Error> {
let result = self.read().await?;
Ok(MessageBatch::new_single(result))
}
async fn acknowledge(&self, msg: &[Message]) -> Result<(), Error> {
for x in msg {
self.acknowledge(x).await?
}
Ok(())
}
async fn close(&self) -> Result<(), Error> {
self.close().await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum InputConfig {
File(file::FileInputConfig),
Http(http::HttpInputConfig),
Memory(memory::MemoryInputConfig),
}
impl InputConfig {
pub fn build(&self) -> Result<Arc<dyn InputBatch>, Error> {
match self {
InputConfig::File(config) => Ok(Arc::new(file::FileInput::new(config)?)),
InputConfig::Http(config) => Ok(Arc::new(http::HttpInput::new(config)?)),
InputConfig::Memory(config) => Ok(Arc::new(memory::MemoryInput::new(config)?)),
}
}
}