use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use crate::ListenAddr;
use crate::services::router::pipeline_handle::PipelineRef;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) enum ConnectionState {
Active,
Terminating,
}
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub(crate) struct ConnectionRef {
pub(crate) pipeline_ref: Arc<PipelineRef>,
pub(crate) address: ListenAddr,
pub(crate) state: ConnectionState,
}
pub(crate) struct ConnectionHandle {
pub(crate) connection_ref: ConnectionRef,
}
static CONNECTION_COUNTS: OnceLock<Mutex<HashMap<ConnectionRef, u64>>> = OnceLock::new();
pub(crate) fn connection_counts() -> MutexGuard<'static, HashMap<ConnectionRef, u64>> {
CONNECTION_COUNTS.get_or_init(Default::default).lock()
}
impl ConnectionHandle {
pub(crate) fn new(pipeline_ref: Arc<PipelineRef>, address: ListenAddr) -> Self {
let connection_ref = ConnectionRef {
pipeline_ref,
address,
state: ConnectionState::Active,
};
Self::increment(&mut connection_counts(), &connection_ref);
ConnectionHandle { connection_ref }
}
pub(crate) fn shutdown(&mut self) {
let mut connections = connection_counts();
Self::decrement(&mut connections, &self.connection_ref);
self.connection_ref.state = ConnectionState::Terminating;
Self::increment(&mut connections, &self.connection_ref);
}
fn increment(
connections: &mut MutexGuard<HashMap<ConnectionRef, u64>>,
connection_ref: &ConnectionRef,
) {
connections
.entry(connection_ref.clone())
.and_modify(|p| *p += 1)
.or_insert(1);
}
fn decrement(
connections: &mut MutexGuard<HashMap<ConnectionRef, u64>>,
connection_ref: &ConnectionRef,
) {
let value = connections
.get_mut(connection_ref)
.expect("connection_ref MUST be greater than zero");
*value -= 1;
if *value == 0 {
connections.remove(connection_ref);
}
}
}
impl Drop for ConnectionHandle {
fn drop(&mut self) {
Self::decrement(&mut connection_counts(), &self.connection_ref);
}
}
pub(crate) const OPEN_CONNECTIONS_METRIC: &str = "apollo.router.open_connections";