use crate::hot_reload::protocol::{GradualShutdownRequest, HotReloadListenerRequest};
#[cfg(feature = "cassandra")]
use crate::sources::cassandra::CassandraConfig;
#[cfg(feature = "kafka")]
use crate::sources::kafka::KafkaConfig;
#[cfg(feature = "opensearch")]
use crate::sources::opensearch::OpenSearchConfig;
#[cfg(feature = "valkey")]
use crate::sources::valkey::ValkeyConfig;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::net::TcpListener;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch;
use tokio::task::{JoinError, JoinHandle};
#[cfg(feature = "cassandra")]
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "valkey")]
pub mod valkey;
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(deny_unknown_fields)]
pub enum Transport {
Tcp,
WebSocket,
}
#[derive(Debug)]
pub struct Source {
pub listener_task: JoinHandle<()>,
pub hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
pub gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
pub name: String,
}
impl Source {
pub fn new(
join_handle: JoinHandle<()>,
hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
name: String,
) -> Self {
Self {
listener_task: join_handle,
hot_reload_tx,
gradual_shutdown_tx,
name,
}
}
pub async fn join(self) -> Result<(), JoinError> {
self.listener_task.await?;
std::mem::drop(self.hot_reload_tx);
Ok(())
}
pub fn get_hot_reload_tx(&self) -> UnboundedSender<HotReloadListenerRequest> {
self.hot_reload_tx.clone()
}
pub fn get_gradual_shutdown_tx(&self) -> UnboundedSender<GradualShutdownRequest> {
self.gradual_shutdown_tx.clone()
}
pub fn name(&self) -> &str {
&self.name
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub enum SourceConfig {
#[cfg(feature = "cassandra")]
Cassandra(CassandraConfig),
#[cfg(feature = "valkey")]
Valkey(ValkeyConfig),
#[cfg(feature = "kafka")]
Kafka(KafkaConfig),
#[cfg(feature = "opensearch")]
OpenSearch(OpenSearchConfig),
}
impl SourceConfig {
pub(crate) async fn build(
&self,
trigger_shutdown_rx: watch::Receiver<bool>,
hot_reload_listeners: &mut HashMap<u16, TcpListener>,
) -> Result<Source, Vec<String>> {
match self {
#[cfg(feature = "cassandra")]
SourceConfig::Cassandra(c) => c.build(trigger_shutdown_rx, hot_reload_listeners).await,
#[cfg(feature = "valkey")]
SourceConfig::Valkey(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
#[cfg(feature = "kafka")]
SourceConfig::Kafka(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
#[cfg(feature = "opensearch")]
SourceConfig::OpenSearch(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
}
}
pub(crate) fn get_name(&self) -> &str {
match self {
#[cfg(feature = "cassandra")]
SourceConfig::Cassandra(c) => &c.name,
#[cfg(feature = "valkey")]
SourceConfig::Valkey(r) => &r.name,
#[cfg(feature = "kafka")]
SourceConfig::Kafka(r) => &r.name,
#[cfg(feature = "opensearch")]
SourceConfig::OpenSearch(r) => &r.name,
}
}
}