use std::{cell::RefCell, task::Waker};
use crate::types::HttpCid;
pub struct WebSocketReactor {
#[allow(dead_code)]
context_id: HttpCid,
state: RefCell<RawWebSocketReactor>,
}
struct RawWebSocketReactor {
upstream_waker: Option<Waker>,
downstream_waker: Option<Waker>,
upstream_data_ready: bool,
downstream_data_ready: bool,
upstream_paused: bool,
downstream_paused: bool,
}
impl WebSocketReactor {
pub fn new(context_id: HttpCid) -> Self {
Self {
context_id,
state: RefCell::new(RawWebSocketReactor {
upstream_waker: None,
downstream_waker: None,
upstream_data_ready: false,
downstream_data_ready: false,
upstream_paused: true, downstream_paused: true, }),
}
}
pub fn register_upstream_waker(&self, waker: Waker) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::register_upstream_waker");
let mut state = self.state.borrow_mut();
state.upstream_waker = Some(waker);
}
pub fn register_downstream_waker(&self, waker: Waker) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::register_downstream_waker");
let mut state = self.state.borrow_mut();
state.downstream_waker = Some(waker);
}
pub fn poll_upstream_data_ready(&self) -> bool {
let mut state = self.state.borrow_mut();
if state.upstream_data_ready {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::poll_upstream_data_ready: true, clearing flag");
state.upstream_data_ready = false;
true
} else {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::poll_upstream_data_ready: false");
false
}
}
pub fn set_upstream_data_ready(&self, ready: bool) {
let mut state = self.state.borrow_mut();
state.upstream_data_ready = ready;
}
pub fn poll_downstream_data_ready(&self) -> bool {
let mut state = self.state.borrow_mut();
if state.downstream_data_ready {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::poll_downstream_data_ready: true, clearing flag");
state.downstream_data_ready = false;
true
} else {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::poll_downstream_data_ready: false");
false
}
}
pub fn set_downstream_data_ready(&self, ready: bool) {
let mut state = self.state.borrow_mut();
state.downstream_data_ready = ready;
}
pub fn wake_upstream(&self) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::wake_upstream: setting upstream_data_ready=true");
let mut state = self.state.borrow_mut();
state.upstream_data_ready = true;
if let Some(waker) = state.upstream_waker.take() {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::wake_upstream: waker found, calling wake()");
waker.wake();
} else {
log::debug!(
"WebSocketReactor::wake_upstream: NO waker registered (handler terminated or not started)"
);
}
}
pub fn wake_downstream(&self) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::wake_downstream: setting downstream_data_ready=true");
let mut state = self.state.borrow_mut();
state.downstream_data_ready = true;
if let Some(waker) = state.downstream_waker.take() {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::wake_downstream: waker found, calling wake()");
waker.wake();
} else {
log::debug!(
"WebSocketReactor::wake_downstream: NO waker registered (handler terminated or not started)"
);
}
}
pub fn upstream_paused(&self) -> bool {
let state = self.state.borrow();
state.upstream_paused
}
pub fn set_upstream_paused(&self, paused: bool) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::set_upstream_paused({paused})");
self.state.borrow_mut().upstream_paused = paused;
}
pub fn downstream_paused(&self) -> bool {
let state = self.state.borrow();
state.downstream_paused
}
pub fn set_downstream_paused(&self, paused: bool) {
#[cfg(feature = "debug-logs")]
log::debug!("WebSocketReactor::set_downstream_paused({paused})");
self.state.borrow_mut().downstream_paused = paused;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_reactor() -> WebSocketReactor {
WebSocketReactor::new(HttpCid::from(1))
}
#[test]
fn new_reactor_starts_with_paused_state() {
let reactor = create_reactor();
assert!(reactor.upstream_paused(), "Upstream should start paused");
}
#[test]
fn set_upstream_paused_changes_state() {
let reactor = create_reactor();
reactor.set_upstream_paused(false);
assert!(
!reactor.upstream_paused(),
"Should be unpaused after set_upstream_paused(false)"
);
reactor.set_upstream_paused(true);
assert!(
reactor.upstream_paused(),
"Should be paused after set_upstream_paused(true)"
);
}
#[test]
fn poll_upstream_data_ready_starts_false() {
let reactor = create_reactor();
assert!(
!reactor.poll_upstream_data_ready(),
"Should start with no data ready"
);
}
#[test]
fn poll_upstream_data_ready_clears_flag() {
let reactor = create_reactor();
reactor.state.borrow_mut().upstream_data_ready = true;
assert!(
reactor.poll_upstream_data_ready(),
"First poll should return true"
);
assert!(
!reactor.poll_upstream_data_ready(),
"Second poll should return false"
);
}
#[test]
fn downstream_paused_starts_true() {
let reactor = create_reactor();
assert!(
reactor.downstream_paused(),
"Downstream should start paused"
);
}
#[test]
fn set_downstream_paused_changes_state() {
let reactor = create_reactor();
reactor.set_downstream_paused(false);
assert!(
!reactor.downstream_paused(),
"Should be unpaused after set_downstream_paused(false)"
);
reactor.set_downstream_paused(true);
assert!(
reactor.downstream_paused(),
"Should be paused after set_downstream_paused(true)"
);
}
#[test]
fn poll_downstream_data_ready_starts_false() {
let reactor = create_reactor();
assert!(
!reactor.poll_downstream_data_ready(),
"Should start with no data ready"
);
}
#[test]
fn poll_downstream_data_ready_clears_flag() {
let reactor = create_reactor();
reactor.state.borrow_mut().downstream_data_ready = true;
assert!(
reactor.poll_downstream_data_ready(),
"First poll should return true"
);
assert!(
!reactor.poll_downstream_data_ready(),
"Second poll should return false"
);
}
#[test]
fn set_upstream_data_ready_updates_flag() {
let reactor = create_reactor();
reactor.set_upstream_data_ready(true);
assert!(
reactor.poll_upstream_data_ready(),
"Flag should be set to true"
);
reactor.set_upstream_data_ready(false);
assert!(
!reactor.poll_upstream_data_ready(),
"Flag should be set to false"
);
}
#[test]
fn set_downstream_data_ready_updates_flag() {
let reactor = create_reactor();
reactor.set_downstream_data_ready(true);
assert!(
reactor.poll_downstream_data_ready(),
"Flag should be set to true"
);
reactor.set_downstream_data_ready(false);
assert!(
!reactor.poll_downstream_data_ready(),
"Flag should be set to false"
);
}
#[test]
fn wake_upstream_sets_data_ready() {
let reactor = create_reactor();
assert!(!reactor.poll_upstream_data_ready());
reactor.wake_upstream();
assert!(
reactor.poll_upstream_data_ready(),
"wake_upstream should set data_ready flag"
);
}
#[test]
fn wake_downstream_sets_data_ready() {
let reactor = create_reactor();
assert!(!reactor.poll_downstream_data_ready());
reactor.wake_downstream();
assert!(
reactor.poll_downstream_data_ready(),
"wake_downstream should set data_ready flag"
);
}
#[test]
fn register_upstream_waker_stores_waker() {
use std::sync::{Arc, Mutex};
use std::task::Wake;
struct TestWaker {
woken: Arc<Mutex<bool>>,
}
impl Wake for TestWaker {
fn wake(self: Arc<Self>) {
*self.woken.lock().unwrap() = true;
}
}
let reactor = create_reactor();
let woken = Arc::new(Mutex::new(false));
let test_waker = Arc::new(TestWaker {
woken: woken.clone(),
});
let waker = test_waker.clone().into();
reactor.register_upstream_waker(waker);
reactor.wake_upstream();
assert!(*woken.lock().unwrap(), "Waker should have been called");
}
#[test]
fn register_downstream_waker_stores_waker() {
use std::sync::{Arc, Mutex};
use std::task::Wake;
struct TestWaker {
woken: Arc<Mutex<bool>>,
}
impl Wake for TestWaker {
fn wake(self: Arc<Self>) {
*self.woken.lock().unwrap() = true;
}
}
let reactor = create_reactor();
let woken = Arc::new(Mutex::new(false));
let test_waker = Arc::new(TestWaker {
woken: woken.clone(),
});
let waker = test_waker.clone().into();
reactor.register_downstream_waker(waker);
reactor.wake_downstream();
assert!(*woken.lock().unwrap(), "Waker should have been called");
}
#[test]
fn wake_without_waker_does_not_panic() {
let reactor = create_reactor();
reactor.wake_upstream();
reactor.wake_downstream();
assert!(reactor.poll_upstream_data_ready());
assert!(reactor.poll_downstream_data_ready());
}
}