use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::{Error, MessageBatch};
pub mod file;
mod generate;
pub mod http;
pub mod kafka;
pub mod memory;
pub mod mqtt;
mod sql;
#[async_trait]
pub trait Ack: Send + Sync {
async fn ack(&self);
}
#[async_trait]
pub trait Input: Send + Sync {
async fn connect(&self) -> Result<(), Error>;
async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error>;
async fn close(&self) -> Result<(), Error>;
}
pub struct NoopAck;
#[async_trait]
impl Ack for NoopAck {
async fn ack(&self) {}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InputConfig {
File(file::FileInputConfig),
Http(http::HttpInputConfig),
Kafka(kafka::KafkaInputConfig),
Generate(generate::GenerateConfig),
Memory(memory::MemoryInputConfig),
Mqtt(mqtt::MqttInputConfig),
Sql(sql::SqlConfig),
}
impl InputConfig {
pub fn build(&self) -> Result<Arc<dyn Input>, Error> {
match self {
InputConfig::File(config) => Ok(Arc::new(file::FileInput::new(config)?)),
InputConfig::Http(config) => Ok(Arc::new(http::HttpInput::new(config)?)),
InputConfig::Kafka(config) => Ok(Arc::new(kafka::KafkaInput::new(config)?)),
InputConfig::Memory(config) => Ok(Arc::new(memory::MemoryInput::new(config)?)),
InputConfig::Mqtt(config) => Ok(Arc::new(mqtt::MqttInput::new(config)?)),
InputConfig::Generate(config) => {
Ok(Arc::new(generate::GenerateInput::new(config.clone())?))
}
InputConfig::Sql(config) => Ok(Arc::new(sql::SqlInput::new(config)?)),
}
}
}