use core::convert::Infallible;
use core::fmt::{self, Write};
use core::future::Future;
use core::num::NonZeroU16;
use core::pin::Pin;
use core::task::{Context, Poll};
use alloc::boxed::Box;
use alloc::collections::VecDeque;
use alloc::string::String;
use alloc::vec::Vec;
use bytes::Bytes;
use musli::alloc::Global;
use musli::mode::Binary;
use musli::reader::SliceReader;
use musli::storage;
use musli::{Decode, Encode};
use rand::prelude::*;
use rand::rngs::SmallRng;
use tokio::task::JoinSet;
use tokio::time::{Duration, Instant, Sleep};
use crate::Buf;
use crate::api::{
Broadcast, ChannelId, ErrorMessage, Event, Id, MessageId, RequestHeader, ResponseHeader,
};
use crate::buf::{BufPool, InvalidFrame};
const MAX_CAPACITY: usize = 1048576;
const CLOSE_NORMAL: u16 = 1000;
const CLOSE_PROTOCOL_ERROR: u16 = 1002;
const CLOSE_TIMEOUT: Duration = Duration::from_secs(30);
const PING_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_SEED: u64 = 0xdeadbeef;
#[derive(Debug)]
pub(crate) enum Message {
Text,
Binary(Bytes),
Ping(Bytes),
Pong(Bytes),
Close,
}
pub(crate) mod socket_sealed {
pub trait Sealed {}
}
pub(crate) trait SocketImpl
where
Self: self::socket_sealed::Sealed,
{
#[doc(hidden)]
type Message;
#[doc(hidden)]
type Error: fmt::Debug;
#[doc(hidden)]
fn poll_next(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Result<Message, Self::Error>>>;
#[doc(hidden)]
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
#[doc(hidden)]
fn start_send(self: Pin<&mut Self>, item: Self::Message) -> Result<(), Self::Error>;
#[doc(hidden)]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
pub(crate) mod server_sealed {
pub trait Sealed {}
}
pub trait ServerImpl
where
Self: self::server_sealed::Sealed,
{
#[doc(hidden)]
type Error: fmt::Debug;
#[doc(hidden)]
type Message;
#[doc(hidden)]
#[allow(private_bounds)]
type Socket: SocketImpl<Message = Self::Message, Error = Self::Error>;
#[doc(hidden)]
fn ping(data: Bytes) -> Self::Message;
#[doc(hidden)]
fn pong(data: Bytes) -> Self::Message;
#[doc(hidden)]
fn binary(data: &[u8]) -> Self::Message;
#[doc(hidden)]
fn close(code: u16, reason: &str) -> Self::Message;
}
#[derive(Debug)]
enum ErrorKind {
#[cfg(feature = "axum-core05")]
AxumCore05 {
error: axum_core05::Error,
},
FormatError,
InvalidFrame {
error: InvalidFrame,
},
Incoming {
error: storage::Error,
},
Outgoing {
error: storage::Error,
},
EncodeBroadcastHeader {
error: storage::Error,
},
EncodeBroadcast {
error: storage::Error,
},
EncodeConnectHeader {
error: storage::Error,
},
ErrorMessageHeader {
error: storage::Error,
},
ErrorMessage {
error: storage::Error,
},
OutOfBounds {
offset: usize,
len: usize,
},
}
#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
}
impl Error {
#[inline]
const fn new(kind: ErrorKind) -> Self {
Self { kind }
}
pub(crate) fn incoming(error: storage::Error) -> Self {
Self::new(ErrorKind::Incoming { error })
}
pub(crate) fn outgoing(error: storage::Error) -> Self {
Self::new(ErrorKind::Outgoing { error })
}
pub(crate) fn encode_broadcast_header(error: storage::Error) -> Self {
Self::new(ErrorKind::EncodeBroadcastHeader { error })
}
pub(crate) fn encode_broadcast(error: storage::Error) -> Self {
Self::new(ErrorKind::EncodeBroadcast { error })
}
pub(crate) fn encode_connect_header(error: storage::Error) -> Self {
Self::new(ErrorKind::EncodeConnectHeader { error })
}
pub(crate) fn encode_error_message_header(error: storage::Error) -> Self {
Self::new(ErrorKind::ErrorMessageHeader { error })
}
pub(crate) fn encode_error_message(error: storage::Error) -> Self {
Self::new(ErrorKind::ErrorMessage { error })
}
}
impl fmt::Display for Error {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
#[cfg(feature = "axum-core05")]
ErrorKind::AxumCore05 { .. } => write!(f, "Error in axum-core"),
ErrorKind::FormatError => write!(f, "Error formatting error response"),
ErrorKind::InvalidFrame { error } => error.fmt(f),
ErrorKind::Incoming { .. } => {
write!(f, "Encoding error when decoding incoming message")
}
ErrorKind::Outgoing { .. } => {
write!(f, "Encoding error when encoding outgoing message")
}
ErrorKind::EncodeBroadcastHeader { .. } => {
write!(f, "Encoding error when encoding broadcast header")
}
ErrorKind::EncodeBroadcast { .. } => {
write!(f, "Encoding error when broadcasting message")
}
ErrorKind::EncodeConnectHeader { .. } => {
write!(f, "Encoding error when encoding connect header")
}
ErrorKind::ErrorMessageHeader { .. } => {
write!(f, "Encoding error when encoding error message header")
}
ErrorKind::ErrorMessage { .. } => {
write!(f, "Encoding error when encoding error message")
}
ErrorKind::OutOfBounds { offset, len } => {
write!(
f,
"Error when reading message: offset {} is out of bounds for length {}",
offset, len
)
}
}
}
}
impl core::error::Error for Error {
#[inline]
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match &self.kind {
#[cfg(feature = "axum-core05")]
ErrorKind::AxumCore05 { error } => Some(error),
ErrorKind::Incoming { error } => Some(error),
ErrorKind::Outgoing { error } => Some(error),
ErrorKind::EncodeBroadcastHeader { error } => Some(error),
ErrorKind::EncodeBroadcast { error } => Some(error),
ErrorKind::EncodeConnectHeader { error } => Some(error),
ErrorKind::ErrorMessageHeader { error } => Some(error),
ErrorKind::ErrorMessage { error } => Some(error),
_ => None,
}
}
}
#[cfg(feature = "axum-core05")]
impl From<axum_core05::Error> for Error {
#[inline]
fn from(error: axum_core05::Error) -> Self {
Self::new(ErrorKind::AxumCore05 { error })
}
}
impl From<ErrorKind> for Error {
#[inline]
fn from(kind: ErrorKind) -> Self {
Self::new(kind)
}
}
impl From<InvalidFrame> for Error {
#[inline]
fn from(error: InvalidFrame) -> Self {
Self::new(ErrorKind::InvalidFrame { error })
}
}
type Result<T, E = Error> = core::result::Result<T, E>;
pub struct Response {
handled: bool,
}
pub trait IntoResponse
where
Self: 'static + Send,
{
type Error;
fn into_response(self) -> Result<Response, Self::Error>;
}
impl IntoResponse for () {
type Error = Infallible;
#[inline]
fn into_response(self) -> Result<Response, Self::Error> {
Ok(Response { handled: true })
}
}
impl IntoResponse for bool {
type Error = Infallible;
#[inline]
fn into_response(self) -> Result<Response, Self::Error> {
Ok(Response { handled: self })
}
}
impl<T, E> IntoResponse for Result<T, E>
where
T: IntoResponse<Error = Infallible>,
E: 'static + Send + fmt::Display,
{
type Error = E;
#[inline]
fn into_response(self) -> Result<Response, E> {
match self {
Ok(into_response) => match IntoResponse::into_response(into_response) {
Ok(response) => Ok(response),
Err(error) => match error {},
},
Err(error) => Err(error),
}
}
}
impl<T> IntoResponse for Option<T>
where
T: IntoResponse,
{
type Error = T::Error;
#[inline]
fn into_response(self) -> Result<Response, Self::Error> {
match self {
Some(value) => value.into_response(),
None => Ok(Response { handled: false }),
}
}
}
pub trait Handler
where
Self: 'static + Send + Clone,
{
type Id: Id;
type Response: IntoResponse;
fn open_channel<'this>(
&'this self,
channel: ChannelId,
) -> impl Future<Output = ()> + Send + 'this {
async {
_ = channel;
}
}
fn close_channel<'this>(
&'this self,
channel: ChannelId,
) -> impl Future<Output = ()> + Send + 'this {
async {
_ = channel;
}
}
fn handle<'this>(
&'this self,
id: Self::Id,
incoming: &'this mut Incoming<'_>,
outgoing: &'this mut Outgoing<'_>,
) -> impl Future<Output = Self::Response> + Send + 'this;
}
struct Pinned<S> {
socket: S,
close_sleep: Sleep,
ping_sleep: Sleep,
}
impl<S> Pinned<S> {
#[inline]
fn project(self: Pin<&mut Self>) -> (Pin<&mut Sleep>, Pin<&mut Sleep>, Pin<&mut S>) {
unsafe {
let this = self.get_unchecked_mut();
(
Pin::new_unchecked(&mut this.close_sleep),
Pin::new_unchecked(&mut this.ping_sleep),
Pin::new_unchecked(&mut this.socket),
)
}
}
}
type HandlerOutput<H> = (Result<<H as Handler>::Response, Error>, RequestHeader, Buf);
pub struct Server<S, H>
where
S: ServerImpl,
H: Handler,
{
handler: H,
started: bool,
closing: bool,
pool: BufPool,
outbound: VecDeque<Buf>,
error: String,
last_ping: Option<[u8; 4]>,
rng: SmallRng,
max_capacity: usize,
out: VecDeque<S::Message>,
socket_send: bool,
socket_flush: bool,
pinned: Pin<Box<Pinned<S::Socket>>>,
channels: Channels,
set: JoinSet<HandlerOutput<H>>,
}
impl<S, H> Server<S, H>
where
S: ServerImpl,
H: Handler,
{
#[inline]
pub(crate) fn new(socket: S::Socket, handler: H) -> Self {
let now = Instant::now();
Self {
handler,
started: false,
closing: false,
pool: BufPool::default(),
outbound: VecDeque::new(),
error: String::new(),
last_ping: None,
rng: SmallRng::seed_from_u64(DEFAULT_SEED),
max_capacity: MAX_CAPACITY,
out: VecDeque::new(),
socket_send: false,
socket_flush: false,
pinned: Box::pin(Pinned {
socket,
close_sleep: tokio::time::sleep_until(now + CLOSE_TIMEOUT),
ping_sleep: tokio::time::sleep_until(now + PING_TIMEOUT),
}),
channels: Channels::default(),
set: JoinSet::new(),
}
}
#[inline]
pub fn handler(&self) -> &H {
&self.handler
}
#[inline]
pub fn max_capacity(mut self, max_capacity: usize) -> Self {
self.max_capacity = max_capacity;
self
}
#[inline]
pub fn with_max_capacity(mut self, max_capacity: usize) -> Self {
self.max_capacity = max_capacity;
self
}
}
impl<S, H> Server<S, H>
where
S: ServerImpl,
H: Handler,
{
#[inline]
pub fn seed(mut self, seed: u64) -> Self {
self.rng = SmallRng::seed_from_u64(seed);
self
}
}
impl<S, H> Server<S, H>
where
S: ServerImpl,
Error: From<S::Error>,
H: Handler<Response: IntoResponse<Error: fmt::Display>>,
{
pub async fn run(&mut self) -> Result<(), Error> {
if !self.started {
self.started = true;
self.hello()?;
}
loop {
if self.closing && self.out.is_empty() && self.outbound.is_empty() {
break;
}
self.handle_send()?;
let result = {
let inner = Select::<S::Socket, H> {
pinned: self.pinned.as_mut(),
wants_socket_send: !self.socket_send,
wants_socket_flush: self.socket_flush,
set: &mut self.set,
};
inner.await
};
match result {
Output::Close => {
self.out
.push_back(S::close(CLOSE_NORMAL, "connection timed out"));
self.closing = true;
}
Output::Ping => {
self.handle_ping()?;
}
Output::Recv(message) => {
let Some(message) = message else {
self.closing = true;
continue;
};
match message? {
Message::Text => {
self.out.push_back(S::close(
CLOSE_PROTOCOL_ERROR,
"Unsupported text message",
));
self.closing = true;
}
Message::Binary(bytes) => {
self.handle_message(bytes).await?;
}
Message::Ping(payload) => {
self.out.push_back(S::pong(payload));
}
Message::Pong(data) => {
self.handle_pong(data)?;
}
Message::Close => {
self.closing = true;
}
}
}
Output::Send(result) => {
if let Err(err) = result {
return Err(Error::from(err));
};
self.socket_send = true;
}
Output::Flushed(result) => {
if let Err(err) = result {
return Err(Error::from(err));
};
self.socket_flush = false;
}
Output::Handle(result, header, buf) => {
let err = 'err: {
let res = match result {
Ok(res) => res,
Err(error) => {
self.format_error(error)?;
break 'err true;
}
};
let res = match res.into_response() {
Ok(res) => res,
Err(error) => {
self.format_error_message(format_args!(
"Error in handler: {error:#}"
))?;
break 'err true;
}
};
if !res.handled {
self.format_error_message(format_args!(
"No support for request {}",
header.id
))?;
break 'err true;
}
self.outbound.push_back(buf);
false
};
if err {
self.send_error(&header)?;
}
}
}
}
Ok(())
}
pub fn broadcast<T>(&mut self, message: T) -> Result<(), Error>
where
T: Event,
{
self.broadcast_in(message, ChannelId::NONE)
}
pub fn broadcast_in<T>(&mut self, message: T, channel: ChannelId) -> Result<(), Error>
where
T: Event,
{
tracing::debug!(id = ?<T::Broadcast as Broadcast>::ID, "Broadcast");
let buf = self.pool.with(|buf| {
let mut writer = buf.writer();
writer
.write(ResponseHeader {
serial: 0,
broadcast: <T::Broadcast as Broadcast>::ID.get(),
error: 0,
channel,
})
.map_err(Error::encode_broadcast_header)?;
writer.write(message).map_err(Error::encode_broadcast)?;
writer.flush();
Ok::<_, Error>(())
})?;
self.outbound.push_back(buf);
Ok(())
}
fn hello(&mut self) -> Result<(), Error> {
tracing::debug!("Hello");
let mut buf = self.pool.get();
let result = (|| {
let mut writer = buf.writer();
writer
.write(ResponseHeader {
serial: 0,
broadcast: MessageId::SERVER_HELLO.get(),
error: 0,
channel: ChannelId::NONE,
})
.map_err(Error::encode_broadcast_header)?;
writer.flush();
Ok::<_, Error>(())
})();
if result.is_err() {
self.pool.put(buf);
} else {
self.outbound.push_back(buf);
}
Ok(())
}
fn format_error_message(&mut self, error: impl fmt::Display) -> Result<(), Error> {
self.error.clear();
if write!(self.error, "{error}").is_err() {
self.error.clear();
return Err(Error::new(ErrorKind::FormatError));
}
Ok(())
}
fn format_error(&mut self, error: impl core::error::Error) -> Result<(), Error> {
self.error.clear();
if write!(self.error, "{error:#}").is_err() {
self.error.clear();
return Err(Error::new(ErrorKind::FormatError));
}
Ok(())
}
#[tracing::instrument(skip(self, bytes))]
async fn handle_message(&mut self, bytes: Bytes) -> Result<(), Error> {
let mut reader = SliceReader::new(&bytes);
let header: RequestHeader = match storage::decode(&mut reader) {
Ok(header) => header,
Err(error) => {
tracing::debug!(?error, "Invalid request header");
self.out
.push_back(S::close(CLOSE_PROTOCOL_ERROR, "Invalid request header"));
self.closing = true;
return Ok(());
}
};
let err = 'err: {
let Some(id) = MessageId::new(header.id) else {
self.format_error_message(format_args!("Unsupported message id {}", header.id))?;
break 'err true;
};
match id {
MessageId::CONNECT => {
let Some(channel) = self.channels.next() else {
self.format_error_message(format_args!(
"Failed to allocate connection ID"
))?;
break 'err true;
};
self.handler.open_channel(channel).await;
let mut buf = self.pool.get();
let result = (|| {
let mut writer = buf.writer();
let result = writer.write(ResponseHeader {
serial: header.serial,
broadcast: 0,
error: 0,
channel,
});
result.map_err(Error::encode_connect_header)?;
writer.flush();
Ok::<_, Error>(())
})();
if result.is_err() {
self.pool.put(buf);
} else {
self.outbound.push_back(buf);
}
result?;
break 'err false;
}
MessageId::DISCONNECT => {
self.channels.free(header.channel);
self.handler.close_channel(header.channel).await;
break 'err false;
}
_ => {
let id = <H::Id as Id>::from_id(id);
let offset = bytes.len() - reader.remaining();
self.handle_request(bytes, offset, header, id);
return Ok(());
}
}
};
if err {
self.send_error(&header)?;
}
Ok(())
}
fn send_error(&mut self, header: &RequestHeader) -> Result<(), Error> {
let buf = self.pool.with(|buf| {
let mut writer = buf.writer();
let result = writer.write(ResponseHeader {
serial: header.serial,
broadcast: 0,
error: MessageId::ERROR_MESSAGE.get(),
channel: header.channel,
});
result.map_err(Error::encode_error_message_header)?;
let result = writer.write(ErrorMessage {
message: &self.error,
});
result.map_err(Error::encode_error_message)?;
writer.flush();
Ok::<_, Error>(())
})?;
self.outbound.push_back(buf);
Ok(())
}
#[tracing::instrument(skip(self))]
fn handle_ping(&mut self) -> Result<(), Error> {
let (_, mut ping_sleep, _) = self.pinned.as_mut().project();
let payload = self.rng.random::<u32>();
let payload = payload.to_ne_bytes();
self.last_ping = Some(payload);
tracing::debug!(data = ?&payload[..], "Sending ping");
self.out
.push_back(S::ping(Bytes::from_owner(Vec::from(payload))));
let now = Instant::now();
ping_sleep.as_mut().reset(now + PING_TIMEOUT);
Ok(())
}
#[tracing::instrument(skip(self, payload))]
fn handle_pong(&mut self, payload: Bytes) -> Result<(), Error> {
let (close_sleep, ping_sleep, _) = self.pinned.as_mut().project();
tracing::debug!(payload = ?&payload[..], "Pong");
let Some(expected) = self.last_ping else {
tracing::debug!("No ping sent");
return Ok(());
};
if expected[..] != payload[..] {
tracing::debug!(?expected, ?payload, "Pong doesn't match");
return Ok(());
}
let now = Instant::now();
close_sleep.reset(now + CLOSE_TIMEOUT);
ping_sleep.reset(now + PING_TIMEOUT);
self.last_ping = None;
Ok(())
}
#[tracing::instrument(skip(self))]
fn handle_send(&mut self) -> Result<(), Error> {
let (_, _, mut socket) = self.pinned.as_mut().project();
if self.socket_send
&& let Some(message) = self.out.pop_front()
{
socket.as_mut().start_send(message)?;
self.socket_flush = true;
self.socket_send = false;
}
while self.socket_send
&& let Some(buf) = self.outbound.front_mut()
{
let Some(frame) = buf.read()? else {
if let Some(buf) = self.outbound.pop_front() {
self.pool.put(buf);
}
continue;
};
socket.as_mut().start_send(S::binary(frame))?;
self.socket_flush = true;
self.socket_send = false;
break;
}
Ok(())
}
fn handle_request(&mut self, bytes: Bytes, offset: usize, header: RequestHeader, id: H::Id) {
tracing::debug!(header.serial, ?id, "Got request");
let mut buf = self.pool.get();
let handler = self.handler.clone();
self.set.spawn(async move {
let Some(bytes) = bytes.get(offset..) else {
let kind = ErrorKind::OutOfBounds {
offset,
len: bytes.len(),
};
return (Err(Error::new(kind)), header, buf);
};
let reader = SliceReader::new(bytes);
let mut incoming = Incoming {
error: None,
reader,
channel: header.channel,
};
let mut outgoing = Outgoing {
serial: Some(header.serial),
error: None,
buf: &mut buf,
channel: header.channel,
};
let response = handler.handle(id, &mut incoming, &mut outgoing).await;
if let Some(error) = incoming.error.take() {
return (Err(Error::incoming(error)), header, buf);
}
if let Some(error) = outgoing.error.take() {
return (Err(Error::outgoing(error)), header, buf);
}
(Ok(response), header, buf)
});
}
}
enum Output<E, R> {
Close,
Ping,
Recv(Option<Result<Message, E>>),
Send(Result<(), E>),
Flushed(Result<(), E>),
Handle(Result<R, Error>, RequestHeader, Buf),
}
struct Select<'a, S, H>
where
H: Handler,
{
pinned: Pin<&'a mut Pinned<S>>,
wants_socket_send: bool,
wants_socket_flush: bool,
set: &'a mut JoinSet<HandlerOutput<H>>,
}
impl<S, H> Future for Select<'_, S, H>
where
S: SocketImpl,
H: Handler,
{
type Output = Output<S::Error, H::Response>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let close;
let ping;
let mut socket;
let wants_socket_send;
let wants_socket_flush;
let set;
unsafe {
let this = Pin::get_unchecked_mut(self);
(close, ping, socket) = this.pinned.as_mut().project();
wants_socket_send = this.wants_socket_send;
wants_socket_flush = this.wants_socket_flush;
set = &mut this.set;
};
if close.poll(cx).is_ready() {
return Poll::Ready(Output::Close);
}
if ping.poll(cx).is_ready() {
return Poll::Ready(Output::Ping);
}
if let Poll::Ready(output) = socket.as_mut().poll_next(cx) {
return Poll::Ready(Output::Recv(output));
}
if wants_socket_send && let Poll::Ready(result) = socket.as_mut().poll_ready(cx) {
return Poll::Ready(Output::Send(result));
}
if wants_socket_flush && let Poll::Ready(result) = socket.as_mut().poll_flush(cx) {
return Poll::Ready(Output::Flushed(result));
}
if let Poll::Ready(output) = set.poll_join_next(cx)
&& let Some(output) = output
{
let output = match output {
Ok(output) => output,
Err(error) => {
tracing::debug!(?error, "Join error in handler task");
return Poll::Ready(Output::Close);
}
};
let (result, header, buf) = output;
return Poll::Ready(Output::Handle(result, header, buf));
}
Poll::Pending
}
}
pub struct Incoming<'de> {
error: Option<storage::Error>,
reader: SliceReader<'de>,
channel: ChannelId,
}
impl<'de> Incoming<'de> {
pub fn channel(&self) -> ChannelId {
self.channel
}
#[inline]
pub fn read<T>(&mut self) -> Option<T>
where
T: Decode<'de, Binary, Global>,
{
match storage::decode(&mut self.reader) {
Ok(value) => Some(value),
Err(error) => {
self.error = Some(error);
None
}
}
}
}
pub struct Outgoing<'a> {
serial: Option<u32>,
error: Option<storage::Error>,
buf: &'a mut Buf,
channel: ChannelId,
}
impl Outgoing<'_> {
pub fn write<T>(&mut self, value: T)
where
T: Encode<Binary>,
{
let Some(serial) = self.serial.take() else {
return;
};
let mut writer = self.buf.writer();
let result = writer.write(ResponseHeader {
serial,
broadcast: 0,
error: 0,
channel: self.channel,
});
if let Err(error) = result {
self.error = Some(error);
return;
}
if let Err(error) = writer.write(value) {
self.error = Some(error);
}
writer.flush();
}
}
#[inline]
fn scramble_channel(x: u16) -> u16 {
let x = x.wrapping_mul(0x9285);
x ^ (x >> 8)
}
#[inline]
fn unscramble_channel(x: u16) -> u16 {
let x = x ^ (x >> 8);
x.wrapping_mul(0x964d)
}
#[test]
fn test_scramble() {
assert_eq!(scramble_channel(0), 0);
assert_eq!(unscramble_channel(0), 0);
for i in 1..=u16::MAX {
let scrambled = scramble_channel(i);
let unscrambled = unscramble_channel(scrambled);
assert_eq!(i, unscrambled, "Failed to unscramble channel id");
}
}
#[derive(Default)]
struct Channels {
last: u16,
free: VecDeque<NonZeroU16>,
}
impl Channels {
fn next(&mut self) -> Option<ChannelId> {
if let Some(id) = self.free.pop_front() {
return Some(ChannelId::new(scramble_channel(id.get())));
}
let id = NonZeroU16::new(self.last.wrapping_add(1))?;
self.last = id.get();
tracing::debug!(?id, "Allocated channel id");
Some(ChannelId::new(scramble_channel(id.get())))
}
fn free(&mut self, id: ChannelId) -> bool {
tracing::debug!(?id, "Freeing channel id");
if let Some(id) = NonZeroU16::new(unscramble_channel(id.raw())) {
self.free.push_back(id);
}
true
}
}