use crate::Result;
use flume::{Receiver, Sender};
use std::{
sync::Arc,
task::{Poll, Wake, Waker},
};
use tracing::{error, trace};
pub(crate) struct SocketState {
readable: bool,
writable: bool,
events: Receiver<SocketEvent>,
handle: SocketStateHandle,
}
impl Default for SocketState {
fn default() -> Self {
let (sender, receiver) = flume::unbounded();
Self {
readable: true,
writable: true,
events: receiver,
handle: SocketStateHandle { sender },
}
}
}
#[derive(Clone)]
pub(crate) struct SocketStateHandle {
sender: Sender<SocketEvent>,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum SocketEvent {
Readable,
Writable,
Wake,
}
pub(crate) struct SocketStateWaker {
handle: SocketStateHandle,
event: SocketEvent,
}
impl SocketState {
pub(crate) fn readable(&mut self) -> bool {
self.readable
}
pub(crate) fn writable(&mut self) -> bool {
self.writable
}
pub(crate) fn wait(&mut self) {
match self.events.recv() {
Ok(event) => self.handle_event(event),
Err(err) => error!(?err, "waiting for socket event failed"),
}
}
pub(crate) fn reset(&mut self) {
self.readable = true;
self.writable = true;
}
pub(crate) fn handle(&self) -> SocketStateHandle {
self.handle.clone()
}
pub(crate) fn handle_read_poll(&mut self, poll: Poll<usize>) -> Option<usize> {
match poll {
Poll::Ready(sz) => Some(sz),
Poll::Pending => {
self.readable = false;
None
}
}
}
pub(crate) fn handle_write_poll<T>(&mut self, poll: Poll<T>) -> Option<T> {
match poll {
Poll::Ready(sz) => Some(sz),
Poll::Pending => {
self.writable = false;
None
}
}
}
pub(crate) fn handle_io_result(&mut self, result: Result<()>) -> Result<()> {
if let Err(err) = result {
if err.interrupted() {
self.handle.wake();
} else if err.wouldblock() {
} else {
if err.is_io_error() {
self.readable = false;
self.writable = false;
}
return Err(err);
}
}
Ok(())
}
pub(crate) fn poll_events(&mut self) {
while let Ok(event) = self.events.try_recv() {
self.handle_event(event);
}
}
fn handle_event(&mut self, event: SocketEvent) {
trace!(?event, "Got event for socket");
match event {
SocketEvent::Readable => self.readable = true,
SocketEvent::Writable => self.writable = true,
SocketEvent::Wake => {}
}
}
pub(crate) fn readable_waker(&self) -> Waker {
self.waker(SocketEvent::Readable)
}
pub(crate) fn writable_waker(&self) -> Waker {
self.waker(SocketEvent::Writable)
}
fn waker(&self, event: SocketEvent) -> Waker {
let handle = self.handle();
let waker = SocketStateWaker { handle, event };
Waker::from(Arc::new(waker))
}
}
impl SocketStateHandle {
pub(crate) fn send(&self, event: SocketEvent) {
let _ = self.sender.send(event);
}
pub(crate) fn wake(&self) {
self.send(SocketEvent::Wake);
}
}
impl Wake for SocketStateWaker {
fn wake(self: Arc<Self>) {
self.handle.send(self.event)
}
fn wake_by_ref(self: &Arc<Self>) {
self.handle.send(self.event)
}
}