openwire 0.1.1

OkHttp-inspired async HTTP client for Rust built on hyper and tower
Documentation
use std::sync::{Arc, Mutex};
use std::time::Instant;

use openwire_core::{next_connection_id, CoalescingInfo, ConnectionId};

use super::{Address, ConnectionPermit, Route};
use crate::sync_util::lock_mutex;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ConnectionProtocol {
    Http1,
    Http2,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ConnectionHealth {
    Healthy,
    Unhealthy,
    Closed,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ConnectionAllocationState {
    Idle,
    InUse { allocations: usize },
    Closed,
}

#[derive(Clone, Debug)]
pub(crate) struct RealConnectionSnapshot {
    pub(crate) id: ConnectionId,
    pub(crate) protocol: ConnectionProtocol,
    pub(crate) health: ConnectionHealth,
    pub(crate) allocation: ConnectionAllocationState,
    pub(crate) idle_since: Option<Instant>,
    pub(crate) completed_exchanges: usize,
}

#[derive(Clone, Debug)]
pub(crate) struct RealConnection {
    inner: Arc<RealConnectionInner>,
}

#[derive(Debug)]
struct RealConnectionInner {
    id: ConnectionId,
    address: Address,
    route: Route,
    protocol: ConnectionProtocol,
    coalescing: CoalescingInfo,
    _permit: Option<ConnectionPermit>,
    state: Mutex<RealConnectionState>,
}

#[derive(Debug)]
struct RealConnectionState {
    health: ConnectionHealth,
    allocations: usize,
    idle_since: Option<Instant>,
    completed_exchanges: usize,
}

impl RealConnection {
    pub(crate) fn new(route: Route, protocol: ConnectionProtocol) -> Self {
        Self::with_id(next_connection_id(), route, protocol)
    }

    pub(crate) fn with_id(id: ConnectionId, route: Route, protocol: ConnectionProtocol) -> Self {
        Self::with_id_and_coalescing(id, route, protocol, CoalescingInfo::default())
    }

    pub(crate) fn with_id_and_coalescing(
        id: ConnectionId,
        route: Route,
        protocol: ConnectionProtocol,
        coalescing: CoalescingInfo,
    ) -> Self {
        Self::with_id_permit_and_coalescing(id, route, protocol, None, coalescing)
    }

    pub(crate) fn with_id_permit_and_coalescing(
        id: ConnectionId,
        route: Route,
        protocol: ConnectionProtocol,
        permit: Option<ConnectionPermit>,
        coalescing: CoalescingInfo,
    ) -> Self {
        Self {
            inner: Arc::new(RealConnectionInner {
                id,
                address: route.address().clone(),
                route,
                protocol,
                coalescing,
                _permit: permit,
                state: Mutex::new(RealConnectionState {
                    health: ConnectionHealth::Healthy,
                    allocations: 0,
                    idle_since: Some(Instant::now()),
                    completed_exchanges: 0,
                }),
            }),
        }
    }

    pub(crate) fn id(&self) -> ConnectionId {
        self.inner.id
    }

    pub(crate) fn address(&self) -> &Address {
        &self.inner.address
    }

    pub(crate) fn route(&self) -> &Route {
        &self.inner.route
    }

    pub(crate) fn protocol(&self) -> ConnectionProtocol {
        self.inner.protocol
    }

    pub(crate) fn coalescing(&self) -> &CoalescingInfo {
        &self.inner.coalescing
    }

    pub(crate) fn snapshot(&self) -> RealConnectionSnapshot {
        let state = lock_mutex(&self.inner.state);
        RealConnectionSnapshot {
            id: self.inner.id,
            protocol: self.inner.protocol,
            health: state.health,
            allocation: allocation_state(&state),
            idle_since: state.idle_since,
            completed_exchanges: state.completed_exchanges,
        }
    }

    pub(crate) fn try_acquire(&self) -> bool {
        let mut state = lock_mutex(&self.inner.state);
        if state.health != ConnectionHealth::Healthy {
            return false;
        }

        match self.inner.protocol {
            ConnectionProtocol::Http1 if state.allocations > 0 => return false,
            ConnectionProtocol::Http1 | ConnectionProtocol::Http2 => {}
        }

        state.allocations += 1;
        state.idle_since = None;
        true
    }

    pub(crate) fn release(&self) -> bool {
        let mut state = lock_mutex(&self.inner.state);
        if state.health == ConnectionHealth::Closed || state.allocations == 0 {
            return false;
        }

        state.allocations -= 1;
        state.completed_exchanges += 1;
        if state.allocations == 0 {
            state.idle_since = Some(Instant::now());
        }
        true
    }

    pub(crate) fn mark_unhealthy(&self) {
        let mut state = lock_mutex(&self.inner.state);
        if state.health != ConnectionHealth::Closed {
            state.health = ConnectionHealth::Unhealthy;
        }
    }

    pub(crate) fn is_healthy(&self) -> bool {
        let state = lock_mutex(&self.inner.state);
        state.health == ConnectionHealth::Healthy
    }

    pub(crate) fn close(&self) {
        let mut state = lock_mutex(&self.inner.state);
        state.health = ConnectionHealth::Closed;
        state.allocations = 0;
        state.idle_since = None;
    }

    pub(crate) fn is_closed(&self) -> bool {
        let state = lock_mutex(&self.inner.state);
        state.health == ConnectionHealth::Closed
    }

    #[cfg(test)]
    pub(crate) fn set_idle_since_for_test(&self, idle_since: Option<Instant>) {
        let mut state = lock_mutex(&self.inner.state);
        state.idle_since = idle_since;
    }
}

fn allocation_state(state: &RealConnectionState) -> ConnectionAllocationState {
    if state.health == ConnectionHealth::Closed {
        ConnectionAllocationState::Closed
    } else if state.allocations == 0 {
        ConnectionAllocationState::Idle
    } else {
        ConnectionAllocationState::InUse {
            allocations: state.allocations,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::net::{Ipv4Addr, SocketAddr};
    use std::panic::{self, AssertUnwindSafe};

    use super::{ConnectionAllocationState, ConnectionHealth, ConnectionProtocol, RealConnection};
    use crate::connection::{Address, AuthorityKey, DnsPolicy, ProtocolPolicy, Route, UriScheme};

    fn test_connection(protocol: ConnectionProtocol) -> RealConnection {
        let address = Address::new(
            UriScheme::Http,
            AuthorityKey::new("example.com", 80),
            None,
            None,
            ProtocolPolicy::Http1Only,
            DnsPolicy::System,
        );
        let route = Route::direct(
            address,
            SocketAddr::from((Ipv4Addr::new(192, 0, 2, 10), 80)),
        );
        RealConnection::new(route, protocol)
    }

    #[test]
    fn real_connection_transitions_between_idle_in_use_and_closed() {
        let connection = test_connection(ConnectionProtocol::Http1);

        let snapshot = connection.snapshot();
        assert_eq!(snapshot.protocol, ConnectionProtocol::Http1);
        assert_eq!(snapshot.health, ConnectionHealth::Healthy);
        assert_eq!(snapshot.allocation, ConnectionAllocationState::Idle);
        assert!(snapshot.idle_since.is_some());

        assert!(connection.try_acquire());
        let snapshot = connection.snapshot();
        assert_eq!(
            snapshot.allocation,
            ConnectionAllocationState::InUse { allocations: 1 }
        );
        assert!(snapshot.idle_since.is_none());

        assert!(connection.release());
        let snapshot = connection.snapshot();
        assert_eq!(snapshot.allocation, ConnectionAllocationState::Idle);
        assert_eq!(snapshot.completed_exchanges, 1);
        assert!(snapshot.idle_since.is_some());

        connection.mark_unhealthy();
        assert_eq!(connection.snapshot().health, ConnectionHealth::Unhealthy);
        assert!(!connection.is_healthy());

        connection.close();
        let snapshot = connection.snapshot();
        assert_eq!(snapshot.health, ConnectionHealth::Closed);
        assert_eq!(snapshot.allocation, ConnectionAllocationState::Closed);
        assert!(!connection.is_healthy());
        assert!(snapshot.idle_since.is_none());
    }

    #[test]
    fn http2_connection_tracks_parallel_allocations() {
        let connection = test_connection(ConnectionProtocol::Http2);

        assert!(connection.try_acquire());
        assert!(connection.try_acquire());
        assert_eq!(
            connection.snapshot().allocation,
            ConnectionAllocationState::InUse { allocations: 2 }
        );

        assert!(connection.release());
        let snapshot = connection.snapshot();
        assert_eq!(
            snapshot.allocation,
            ConnectionAllocationState::InUse { allocations: 1 }
        );
        assert_eq!(snapshot.completed_exchanges, 1);
        assert!(snapshot.idle_since.is_none());

        assert!(connection.release());
        let snapshot = connection.snapshot();
        assert_eq!(snapshot.allocation, ConnectionAllocationState::Idle);
        assert_eq!(snapshot.completed_exchanges, 2);
        assert!(snapshot.idle_since.is_some());
    }

    #[test]
    fn http2_connection_does_not_apply_a_local_stream_cap() {
        let connection = test_connection(ConnectionProtocol::Http2);

        for _ in 0..128 {
            assert!(connection.try_acquire());
        }
        assert_eq!(
            connection.snapshot().allocation,
            ConnectionAllocationState::InUse { allocations: 128 }
        );

        assert!(connection.release());
        assert!(connection.try_acquire());
    }

    #[test]
    fn real_connection_recovers_after_state_mutex_poisoning() {
        let connection = test_connection(ConnectionProtocol::Http1);

        let _ = panic::catch_unwind(AssertUnwindSafe(|| {
            let _guard = connection
                .inner
                .state
                .lock()
                .expect("poison real connection lock for test");
            panic!("poison real connection state");
        }));

        assert!(connection.try_acquire());
        assert_eq!(
            connection.snapshot().allocation,
            ConnectionAllocationState::InUse { allocations: 1 }
        );
        assert!(connection.release());
    }
}