rustfs-kafka 1.2.0

Rust client for Apache Kafka
Documentation
use crate::error::Result;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::{debug, warn};

use super::Pooled;
#[cfg(feature = "security")]
use super::SecurityConfig;
use super::connection::KafkaConnection;

#[derive(Debug)]
pub struct PoolConfig {
    rw_timeout: Option<Duration>,
    idle_timeout: Duration,
    #[cfg(feature = "security")]
    security_config: Option<SecurityConfig>,
}

impl PoolConfig {
    #[cfg(not(feature = "security"))]
    fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> {
        KafkaConnection::new(id, host, self.rw_timeout).map(|c| {
            debug!("Established: {:?}", c);
            c
        })
    }

    #[cfg(feature = "security")]
    fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> {
        KafkaConnection::new(id, host, self.rw_timeout, self.security_config.as_ref()).map(|c| {
            debug!("Established: {:?}", c);
            c
        })
    }
}

#[derive(Debug)]
struct State {
    num_conns: u32,
}

impl State {
    fn new() -> State {
        State { num_conns: 0 }
    }

    fn next_conn_id(&mut self) -> u32 {
        let c = self.num_conns;
        self.num_conns = self.num_conns.wrapping_add(1);
        c
    }
}

#[derive(Debug)]
pub struct Connections {
    conns: Vec<Pooled<KafkaConnection>>,
    host_index: HashMap<String, usize>,
    free_indices: Vec<usize>,
    state: State,
    config: PoolConfig,
}

impl Connections {
    #[cfg(not(feature = "security"))]
    pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections {
        Connections {
            conns: Vec::new(),
            host_index: HashMap::new(),
            free_indices: Vec::new(),
            state: State::new(),
            config: PoolConfig {
                rw_timeout,
                idle_timeout,
            },
        }
    }

    #[cfg(feature = "security")]
    pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections {
        Self::new_with_security(rw_timeout, idle_timeout, None)
    }

    #[cfg(feature = "security")]
    pub fn new_with_security(
        rw_timeout: Option<Duration>,
        idle_timeout: Duration,
        security: Option<SecurityConfig>,
    ) -> Connections {
        Connections {
            conns: Vec::new(),
            host_index: HashMap::new(),
            free_indices: Vec::new(),
            state: State::new(),
            config: PoolConfig {
                rw_timeout,
                idle_timeout,
                security_config: security,
            },
        }
    }

    pub fn set_idle_timeout(&mut self, idle_timeout: Duration) {
        self.config.idle_timeout = idle_timeout;
    }

    pub fn idle_timeout(&self) -> Duration {
        self.config.idle_timeout
    }

    fn allocate_slot(&mut self, host: &str, now: Instant) -> Result<usize> {
        let cid = self.state.next_conn_id();
        let conn = Pooled::new(now, self.config.new_conn(cid, host)?);

        if let Some(idx) = self.free_indices.pop() {
            self.conns[idx] = conn;
            self.host_index.insert(host.to_owned(), idx);
            Ok(idx)
        } else {
            let idx = self.conns.len();
            self.conns.push(conn);
            self.host_index.insert(host.to_owned(), idx);
            Ok(idx)
        }
    }

    fn ensure_connected(&mut self, idx: usize, host: &str, now: Instant) -> Result<()> {
        let conn = &mut self.conns[idx];
        let needs_reconnect = now.duration_since(conn.last_checkout) >= self.config.idle_timeout
            || conn.item.is_terminated();
        if needs_reconnect {
            let reason = if conn.item.is_terminated() {
                "connection terminated"
            } else {
                "idle timeout"
            };
            debug!("Reconnecting ({}) to: {:?}", reason, conn.item);
            let new_conn = self.config.new_conn(self.state.next_conn_id(), host)?;
            let _ = conn.item.shutdown();
            conn.item = new_conn;
        }
        conn.last_checkout = now;
        Ok(())
    }

    #[tracing::instrument(skip(self, now), fields(broker = %host))]
    pub fn get_conn(&mut self, host: &str, now: Instant) -> Result<&mut KafkaConnection> {
        let idx = if let Some(&idx) = self.host_index.get(host) {
            idx
        } else {
            self.allocate_slot(host, now)?
        };

        let result = self.ensure_connected(idx, host, now);

        #[cfg(feature = "metrics")]
        {
            crate::metrics::update_connection_count(self.conns.len());
            if let Err(ref e) = result {
                crate::metrics::record_connection_error(host, &e.to_string());
            }
        }

        result?;
        Ok(&mut self.conns[idx].item)
    }

    pub fn get_conn_any(&mut self, now: Instant) -> Option<&mut KafkaConnection> {
        let mut best_idx: Option<usize> = None;
        let mut best_checkout: Option<Instant> = None;

        let hosts: Vec<String> = self.host_index.keys().cloned().collect();
        for host in &hosts {
            let idx = self.host_index[host];
            if let Err(e) = self.ensure_connected(idx, host, now) {
                warn!("Failed to reconnect to {}: {:?}", host, e);
                continue;
            }
            let conn = &self.conns[idx];
            if best_checkout.is_none() || conn.last_checkout < best_checkout.unwrap() {
                best_idx = Some(idx);
                best_checkout = Some(conn.last_checkout);
            }
        }

        best_idx.map(|idx| &mut self.conns[idx].item)
    }
}