use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Mutex as AsyncMutex, mpsc, watch};
use tokio::task::JoinSet;
use tracing::{debug, trace};
use crate::config::Config;
use crate::error::ErrorCode;
use crate::protocol::Frame;
use crate::stream::StreamInner;
use crate::util::id::{Role, StreamIdAllocator};
use super::manager::StreamRegistry;
pub(crate) struct SessionInner {
pub(crate) config: Arc<Config>,
pub(crate) role: Role,
pub(crate) id_alloc: StreamIdAllocator,
pub(crate) registry: StreamRegistry,
pub(crate) out_tx: mpsc::UnboundedSender<Frame>,
pub(crate) closer_tx: mpsc::UnboundedSender<u32>,
pub(crate) accept_tx: mpsc::UnboundedSender<Arc<StreamInner>>,
pub(crate) accept_rx: AsyncMutex<mpsc::UnboundedReceiver<Arc<StreamInner>>>,
pub(crate) shutdown_tx: watch::Sender<bool>,
pub(crate) is_closing: AtomicBool,
pub(crate) peer_gone: AtomicBool,
pub(crate) tasks: AsyncMutex<Option<JoinSet<()>>>,
}
impl SessionInner {
pub(crate) fn shutdown_rx(&self) -> watch::Receiver<bool> {
self.shutdown_tx.subscribe()
}
pub(crate) fn is_closed(&self) -> bool {
self.is_closing.load(Ordering::Acquire)
}
pub(crate) fn initiate_shutdown(&self, code: ErrorCode) {
if self.is_closing.swap(true, Ordering::AcqRel) {
return;
}
debug!(code = %code, "initiating session shutdown");
let _ = self.out_tx.send(Frame::go_away(code));
for stream in self.registry.drain() {
stream.force_close();
}
let _ = self.shutdown_tx.send(true);
}
pub(crate) fn note_peer_gone(&self, code: ErrorCode) {
if !self.peer_gone.swap(true, Ordering::AcqRel) {
trace!(code = %code, "peer GoAway observed");
}
}
}