mod peer_settings_wait;
mod ping;
mod response;
mod submit;
#[cfg(feature = "unstable")]
use super::H2Initiator;
use super::{H2Driver, H2Settings, role::Role, transport::StreamState};
use crate::{Conn, HttpContext};
use atomic_waker::AtomicWaker;
use event_listener::Event;
use futures_lite::io::{AsyncRead, AsyncWrite};
use ping::PendingPing;
use std::{
collections::{HashMap, VecDeque},
future::Future,
io,
sync::{
Arc, Mutex, MutexGuard,
atomic::{AtomicBool, Ordering},
},
};
use swansong::{ShutdownCompletion, Swansong};
#[cfg(feature = "unstable")]
#[allow(unused_imports)]
pub use {response::ResponseHeaders, submit::SubmitSend};
#[derive(Debug)]
pub struct H2Connection {
pub(super) context: Arc<HttpContext>,
pub(super) swansong: Swansong,
pub(super) outbound_waker: AtomicWaker,
pub(super) streams: Mutex<HashMap<u32, Arc<StreamState>>>,
pub(super) peer_settings: Mutex<H2Settings>,
pub(super) peer_settings_received: AtomicBool,
pub(super) peer_settings_event: Event,
#[cfg(feature = "unstable")]
pub(super) next_client_stream_id: std::sync::atomic::AtomicU32,
pub(super) pending_pings: Mutex<HashMap<[u8; 8], PendingPing>>,
pub(super) pending_ping_outbound: Mutex<VecDeque<[u8; 8]>>,
}
impl H2Connection {
pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
let swansong = context.swansong().child();
Arc::new(Self {
context,
swansong,
outbound_waker: AtomicWaker::new(),
streams: Mutex::new(HashMap::new()),
peer_settings: Mutex::new(H2Settings::default()),
peer_settings_received: AtomicBool::new(false),
peer_settings_event: Event::new(),
#[cfg(feature = "unstable")]
next_client_stream_id: std::sync::atomic::AtomicU32::new(1),
pending_pings: Mutex::new(HashMap::new()),
pending_ping_outbound: Mutex::new(VecDeque::new()),
})
}
pub fn context(&self) -> Arc<HttpContext> {
self.context.clone()
}
pub fn swansong(&self) -> &Swansong {
&self.swansong
}
pub fn shut_down(&self) -> ShutdownCompletion {
self.swansong.shut_down()
}
#[cfg(feature = "unstable")]
pub fn can_open_stream(&self) -> bool {
if !self.swansong.state().is_running() {
return false;
}
if self.next_client_stream_id.load(Ordering::Relaxed) >= (1u32 << 31) {
return false;
}
let inflight: u32 = self
.streams_lock()
.values()
.filter(|s| {
!(s.send.completed.load(Ordering::Acquire) && s.recv.eof.load(Ordering::Acquire))
})
.count()
.try_into()
.unwrap_or(u32::MAX);
let cap = self
.current_peer_settings()
.effective_max_concurrent_streams();
inflight < cap
}
pub(super) fn outbound_waker(&self) -> &AtomicWaker {
&self.outbound_waker
}
pub(super) fn streams_lock(&self) -> MutexGuard<'_, HashMap<u32, Arc<StreamState>>> {
self.streams
.lock()
.expect("connection streams mutex poisoned")
}
pub(super) fn current_peer_settings(&self) -> MutexGuard<'_, H2Settings> {
self.peer_settings
.lock()
.expect("peer_settings mutex poisoned")
}
pub(crate) fn release_stream(&self, stream_id: u32) {
let stream = self.streams_lock().get(&stream_id).cloned();
if let Some(stream) = stream {
stream.pending_release.store(true, Ordering::Release);
stream.needs_servicing.store(true, Ordering::Release);
self.outbound_waker.wake();
}
}
pub(crate) fn stream_error(&self, stream_id: u32, code: super::H2ErrorCode) {
let Some(stream) = self.streams_lock().get(&stream_id).cloned() else {
return;
};
let mut slot = stream
.pending_reset
.lock()
.expect("pending_reset mutex poisoned");
if slot.is_none() {
*slot = Some(code);
drop(slot);
stream.needs_servicing.store(true, Ordering::Release);
self.outbound_waker.wake();
}
}
pub fn run<T>(self: Arc<Self>, transport: T) -> H2Driver<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
{
H2Driver::new(self, transport, Role::Server)
}
#[cfg(feature = "unstable")]
pub fn run_client<T>(self: Arc<Self>, transport: T) -> H2Initiator<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
{
H2Initiator::new(H2Driver::new(self, transport, Role::Client))
}
pub async fn process_inbound<Transport, Handler, Fut>(
conn: Conn<Transport>,
handler: Handler,
) -> io::Result<Conn<Transport>>
where
Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
Handler: FnOnce(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>>,
{
handler(conn).await.send_h2().await
}
}