use super::{H2Driver, Role, StreamEntry, send::SendCursor};
use crate::h2::{H2ErrorCode, transport::StreamState};
use futures_lite::io::{AsyncRead, AsyncWrite};
use std::{
io,
sync::{Arc, atomic::Ordering},
};
impl<T> H2Driver<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
{
pub(super) fn service_handler_signals(&mut self) {
self.pick_up_new_client_streams();
for opaque in self.connection.drain_pending_ping_outbound() {
self.queue_active_ping(opaque);
}
let mut stream_updates: Vec<(u32, u32)> = Vec::new();
let mut connection_credit: u64 = 0;
let mut resets: Vec<(u32, H2ErrorCode)> = Vec::new();
let mut releases: Vec<u32> = Vec::new();
let max_stream_recv_window = self.config.max_stream_recv_window_size();
for (&id, entry) in &mut self.streams {
if !entry.shared.needs_servicing.swap(false, Ordering::AcqRel) {
continue;
}
if entry.peer_recv_window <= 0 && entry.shared.recv.is_reading.load(Ordering::Acquire) {
stream_updates.push((id, max_stream_recv_window));
entry.peer_recv_window += i64::from(max_stream_recv_window);
}
let consumed = entry.shared.recv.bytes_consumed.swap(0, Ordering::AcqRel);
if consumed > 0 {
let credit = u32::try_from(consumed).unwrap_or(u32::MAX);
stream_updates.push((id, credit));
entry.peer_recv_window += i64::from(credit);
connection_credit = connection_credit.saturating_add(u64::from(credit));
}
if entry.send.is_none() {
let submission = entry
.shared
.send
.submission
.lock()
.expect("send submission mutex poisoned")
.take();
if let Some(submission) = submission {
log::trace!("h2 stream {id}: driver picked up submission");
entry.send = Some(SendCursor::new(submission, &mut self.hpack_encoder));
}
}
if let Some(code) = entry
.shared
.pending_reset
.lock()
.expect("pending_reset mutex poisoned")
.take()
{
resets.push((id, code));
}
if entry.shared.pending_release.swap(false, Ordering::AcqRel) {
releases.push(id);
}
}
for (stream_id, increment) in stream_updates {
self.queue_window_update(stream_id, increment);
}
if connection_credit > 0 {
let credit = u32::try_from(connection_credit).unwrap_or(u32::MAX);
self.queue_window_update(0, credit);
self.connection_recv_window += i64::from(credit);
}
for (stream_id, code) in resets {
log::debug!("h2 stream {stream_id}: conn-task-requested RST_STREAM({code:?})");
self.queue_rst_stream(stream_id, code);
self.complete_and_remove_stream(
stream_id,
Err(io::Error::other(format!(
"stream reset requested by conn task: {code:?}"
))),
);
}
for stream_id in releases {
log::trace!("h2 stream {stream_id}: application released held stream — removing");
self.remove_from_stream_maps(stream_id);
}
}
pub(super) fn has_pending_handler_signals(&self) -> bool {
if self.role == Role::Client {
let shared = self.connection.streams_lock();
if shared.keys().any(|id| !self.streams.contains_key(id)) {
return true;
}
}
self.streams
.values()
.any(|e| e.shared.needs_servicing.load(Ordering::Acquire))
}
fn pick_up_new_client_streams(&mut self) {
if self.role != Role::Client {
return;
}
let new_streams: Vec<(u32, Arc<StreamState>)> = {
let shared = self.connection.streams_lock();
shared
.iter()
.filter(|(id, _)| !self.streams.contains_key(*id))
.map(|(&id, s)| (id, Arc::clone(s)))
.collect()
};
if new_streams.is_empty() {
return;
}
let send_window = i64::from(
self.connection
.current_peer_settings()
.effective_initial_window_size(),
);
let peer_recv_window = i64::from(self.config.initial_stream_window_size());
for (id, shared) in new_streams {
log::trace!("h2 client: driver picked up new client-opened stream {id}");
self.streams
.insert(id, StreamEntry::new(shared, send_window, peer_recv_window));
}
}
}