turn-server 4.0.1

A pure rust-implemented turn server.
Documentation
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};

use ahash::HashMap;
use parking_lot::RwLock;

use crate::{server::transport::Transport, service::session::Identifier};

/// The type of information passed in the statistics channel
#[derive(Debug, Clone, Copy)]
pub enum Stats {
    ReceivedBytes(usize),
    SendBytes(usize),
    ReceivedPkts(usize),
    SendPkts(usize),
    ErrorPkts(usize),
}

pub trait Number {
    fn add(&self, value: usize);
    fn get(&self) -> usize;
}

#[derive(Default)]
pub struct Count(AtomicUsize);

impl Number for Count {
    fn add(&self, value: usize) {
        self.0.fetch_add(value, Ordering::Relaxed);
    }

    fn get(&self) -> usize {
        self.0.load(Ordering::Relaxed)
    }
}

/// Worker independent statistics
pub struct Counts<T> {
    pub received_bytes: T,
    pub send_bytes: T,
    pub received_pkts: T,
    pub send_pkts: T,
    pub error_pkts: T,
}

impl<T: Number> Counts<T> {
    /// # Example
    ///
    /// ```
    /// use turn_server::statistics::*;
    ///
    /// let counts = Counts {
    ///     received_bytes: Count::default(),
    ///     send_bytes: Count::default(),
    ///     received_pkts: Count::default(),
    ///     send_pkts: Count::default(),
    ///     error_pkts: Count::default(),
    /// };
    ///
    /// counts.add(&Stats::ReceivedBytes(1));
    /// assert_eq!(counts.received_bytes.get(), 1);
    ///
    /// counts.add(&Stats::ReceivedPkts(1));
    /// assert_eq!(counts.received_pkts.get(), 1);
    ///
    /// counts.add(&Stats::SendBytes(1));
    /// assert_eq!(counts.send_bytes.get(), 1);
    ///
    /// counts.add(&Stats::SendPkts(1));
    /// assert_eq!(counts.send_pkts.get(), 1);
    /// ```
    pub fn add(&self, payload: &Stats) {
        match payload {
            Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
            Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
            Stats::SendBytes(v) => self.send_bytes.add(*v),
            Stats::SendPkts(v) => self.send_pkts.add(*v),
            Stats::ErrorPkts(v) => self.error_pkts.add(*v),
        }
    }
}

/// worker cluster statistics
#[derive(Clone)]
pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);

impl Default for Statistics {
    #[cfg(feature = "api")]
    fn default() -> Self {
        use ahash::HashMapExt;

        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
    }

    // There's no need to take up so much memory when you don't have stats enabled.
    #[cfg(not(feature = "api"))]
    fn default() -> Self {
        Self(Default::default())
    }
}

impl Statistics {
    /// get signal sender
    ///
    /// The signal sender can notify the statistics instance to update
    /// internal statistics.
    ///
    /// # Example
    ///
    /// ```
    /// use turn_server::statistics::*;
    /// use turn_server::service::session::Identifier;
    /// use turn_server::server::transport::Transport;
    ///
    /// let statistics = Statistics::default();
    /// let sender = statistics.get_reporter(Transport::Tcp);
    ///
    /// let identifier = Identifier::new(
    ///     "127.0.0.1:8080".parse().unwrap(),
    ///     "127.0.0.1:3478".parse().unwrap(),
    /// );
    ///
    /// sender.send(&identifier, &[Stats::ReceivedBytes(100)]);
    /// ```
    pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
        StatisticsReporter {
            table: self.0.clone(),
            transport,
        }
    }

    /// Add an address to the watch list
    ///
    /// # Example
    ///
    /// ```
    /// use turn_server::statistics::*;
    /// use turn_server::service::session::Identifier;
    ///
    /// let statistics = Statistics::default();
    ///
    /// let identifier = Identifier::new(
    ///     "127.0.0.1:8080".parse().unwrap(),
    ///     "127.0.0.1:3478".parse().unwrap(),
    /// );
    ///
    /// statistics.register(identifier.clone());
    /// assert_eq!(statistics.get(&identifier).is_some(), true);
    /// ```
    pub fn register(&self, identifier: Identifier) {
        #[cfg(feature = "prometheus")]
        {
            crate::prometheus::METRICS.allocated.inc();
        }

        self.0.write().insert(
            identifier,
            Counts {
                received_bytes: Count::default(),
                send_bytes: Count::default(),
                received_pkts: Count::default(),
                send_pkts: Count::default(),
                error_pkts: Count::default(),
            },
        );
    }

    /// Remove an address from the watch list
    ///
    /// # Example
    ///
    /// ```
    /// use turn_server::statistics::*;
    /// use turn_server::service::session::Identifier;
    ///
    /// let statistics = Statistics::default();
    ///
    /// let identifier = Identifier::new(
    ///     "127.0.0.1:8080".parse().unwrap(),
    ///     "127.0.0.1:3478".parse().unwrap(),
    /// );
    ///
    /// statistics.register(identifier.clone());
    /// assert_eq!(statistics.get(&identifier).is_some(), true);
    ///
    /// statistics.unregister(&identifier);
    /// assert_eq!(statistics.get(&identifier).is_some(), false);
    /// ```
    pub fn unregister(&self, identifier: &Identifier) {
        #[cfg(feature = "prometheus")]
        {
            crate::prometheus::METRICS.allocated.dec();
        }

        self.0.write().remove(identifier);
    }

    /// Obtain a list of statistics from statistics
    ///
    /// The obtained list is in the same order as it was added.
    ///
    /// # Example
    ///
    /// ```
    /// use turn_server::statistics::*;
    /// use turn_server::service::session::Identifier;
    ///
    /// let statistics = Statistics::default();
    ///
    /// let identifier = Identifier::new(
    ///     "127.0.0.1:8080".parse().unwrap(),
    ///     "127.0.0.1:3478".parse().unwrap(),
    /// );
    ///
    /// statistics.register(identifier.clone());
    /// assert_eq!(statistics.get(&identifier).is_some(), true);
    /// ```
    pub fn get(&self, identifier: &Identifier) -> Option<Counts<usize>> {
        self.0.read().get(identifier).map(|counts| Counts {
            received_bytes: counts.received_bytes.get(),
            received_pkts: counts.received_pkts.get(),
            send_bytes: counts.send_bytes.get(),
            send_pkts: counts.send_pkts.get(),
            error_pkts: counts.error_pkts.get(),
        })
    }
}

/// statistics reporter
///
/// It is held by each worker, and status information can be sent to the
/// statistics instance through this instance to update the internal
/// statistical information of the statistics.
#[derive(Clone)]
#[allow(unused)]
pub struct StatisticsReporter {
    table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
    transport: Transport,
}

impl StatisticsReporter {
    #[allow(unused_variables)]
    pub fn send(&self, identifier: &Identifier, reports: &[Stats]) {
        #[cfg(feature = "api")]
        {
            #[cfg(feature = "prometheus")]
            {
                for report in reports {
                    crate::prometheus::METRICS.add(self.transport, report);
                }
            }

            if let Some(counts) = self.table.read().get(identifier) {
                for item in reports {
                    counts.add(item);
                }
            }
        }
    }
}