shotover 0.7.2

Shotover API for building custom transforms
Documentation
//! Sources used to listen for connections and send/recieve with the client.

use crate::hot_reload::protocol::{GradualShutdownRequest, HotReloadListenerRequest};
#[cfg(feature = "cassandra")]
use crate::sources::cassandra::CassandraConfig;
#[cfg(feature = "kafka")]
use crate::sources::kafka::KafkaConfig;
#[cfg(feature = "opensearch")]
use crate::sources::opensearch::OpenSearchConfig;
#[cfg(feature = "valkey")]
use crate::sources::valkey::ValkeyConfig;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::net::TcpListener;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch;
use tokio::task::{JoinError, JoinHandle};

#[cfg(feature = "cassandra")]
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "valkey")]
pub mod valkey;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(deny_unknown_fields)]
pub enum Transport {
    Tcp,
    WebSocket,
}

#[derive(Debug)]
pub struct Source {
    pub listener_task: JoinHandle<()>,
    /// This value must remain alive for as long as the Source is in use.
    pub hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
    pub gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
    pub name: String,
}

impl Source {
    pub fn new(
        join_handle: JoinHandle<()>,
        hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
        gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
        name: String,
    ) -> Self {
        Self {
            listener_task: join_handle,
            hot_reload_tx,
            gradual_shutdown_tx,
            name,
        }
    }

    pub async fn join(self) -> Result<(), JoinError> {
        self.listener_task.await?;
        // explicitly drop hot_reload_tx here, to show that it occurs after the listener_task has shutdown.
        std::mem::drop(self.hot_reload_tx);
        Ok(())
    }

    pub fn get_hot_reload_tx(&self) -> UnboundedSender<HotReloadListenerRequest> {
        self.hot_reload_tx.clone()
    }
    pub fn get_gradual_shutdown_tx(&self) -> UnboundedSender<GradualShutdownRequest> {
        self.gradual_shutdown_tx.clone()
    }
    pub fn name(&self) -> &str {
        &self.name
    }
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub enum SourceConfig {
    #[cfg(feature = "cassandra")]
    Cassandra(CassandraConfig),
    #[cfg(feature = "valkey")]
    Valkey(ValkeyConfig),
    #[cfg(feature = "kafka")]
    Kafka(KafkaConfig),
    #[cfg(feature = "opensearch")]
    OpenSearch(OpenSearchConfig),
}

impl SourceConfig {
    pub(crate) async fn build(
        &self,
        trigger_shutdown_rx: watch::Receiver<bool>,
        hot_reload_listeners: &mut HashMap<u16, TcpListener>,
    ) -> Result<Source, Vec<String>> {
        match self {
            #[cfg(feature = "cassandra")]
            SourceConfig::Cassandra(c) => c.build(trigger_shutdown_rx, hot_reload_listeners).await,
            #[cfg(feature = "valkey")]
            SourceConfig::Valkey(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
            #[cfg(feature = "kafka")]
            SourceConfig::Kafka(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
            #[cfg(feature = "opensearch")]
            SourceConfig::OpenSearch(r) => r.build(trigger_shutdown_rx, hot_reload_listeners).await,
        }
    }

    pub(crate) fn get_name(&self) -> &str {
        match self {
            #[cfg(feature = "cassandra")]
            SourceConfig::Cassandra(c) => &c.name,
            #[cfg(feature = "valkey")]
            SourceConfig::Valkey(r) => &r.name,
            #[cfg(feature = "kafka")]
            SourceConfig::Kafka(r) => &r.name,
            #[cfg(feature = "opensearch")]
            SourceConfig::OpenSearch(r) => &r.name,
        }
    }
}