use std::rc::Rc;
use crate::extract::context::{UpgradeDownstreamContext, UpgradeUpstreamContext};
use crate::extract::{Extract, FromContext};
use crate::host::Host;
use crate::reactor::websocket::WebSocketReactor;
use crate::BoxError;
pub struct UpstreamState {
host: Rc<dyn Host>,
reactor: Rc<WebSocketReactor>,
}
pub struct DownstreamState {
host: Rc<dyn Host>,
reactor: Rc<WebSocketReactor>,
}
impl UpstreamState {
pub(crate) fn new(host: Rc<dyn Host>, reactor: Rc<WebSocketReactor>) -> Self {
Self { host, reactor }
}
pub async fn accumulate(self) -> Self {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::accumulate()");
let reactor = Rc::clone(&self.reactor);
reactor.set_upstream_paused(true);
reactor.set_upstream_data_ready(false);
std::future::poll_fn(move |cx| {
if reactor.poll_upstream_data_ready() {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::accumulate: data ready");
std::task::Poll::Ready(())
} else {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::accumulate: pending");
reactor.register_upstream_waker(cx.waker().clone());
std::task::Poll::Pending
}
})
.await;
self
}
pub async fn next(self) -> Self {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::next()");
let reactor = Rc::clone(&self.reactor);
reactor.set_upstream_paused(false);
reactor.set_upstream_data_ready(false);
std::future::poll_fn(move |cx| {
if reactor.poll_upstream_data_ready() {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::next: data ready");
std::task::Poll::Ready(())
} else {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::next: pending");
reactor.register_upstream_waker(cx.waker().clone());
std::task::Poll::Pending
}
})
.await;
self
}
pub fn bytes(&self) -> Vec<u8> {
self.host
.get_http_request_body(0, usize::MAX)
.unwrap_or_default()
}
pub fn set_body(&self, data: &[u8]) {
#[cfg(feature = "debug-logs")]
log::debug!("UpstreamState::set_body: {} bytes", data.len());
self.host.set_http_request_body(0, usize::MAX, data);
}
}
impl DownstreamState {
pub(crate) fn new(host: Rc<dyn Host>, reactor: Rc<WebSocketReactor>) -> Self {
Self { host, reactor }
}
pub async fn accumulate(self) -> Self {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::accumulate()");
let reactor = Rc::clone(&self.reactor);
reactor.set_downstream_paused(true);
reactor.set_downstream_data_ready(false);
std::future::poll_fn(move |cx| {
if reactor.poll_downstream_data_ready() {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::accumulate: data ready");
std::task::Poll::Ready(())
} else {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::accumulate: pending");
reactor.register_downstream_waker(cx.waker().clone());
std::task::Poll::Pending
}
})
.await;
self
}
pub async fn next(self) -> Self {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::next()");
let reactor = Rc::clone(&self.reactor);
reactor.set_downstream_paused(false);
reactor.set_downstream_data_ready(false);
std::future::poll_fn(move |cx| {
if reactor.poll_downstream_data_ready() {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::next: data ready");
std::task::Poll::Ready(())
} else {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::next: pending");
reactor.register_downstream_waker(cx.waker().clone());
std::task::Poll::Pending
}
})
.await;
self
}
pub fn bytes(&self) -> Vec<u8> {
self.host
.get_http_response_body(0, usize::MAX)
.unwrap_or_default()
}
pub fn set_body(&self, data: &[u8]) {
#[cfg(feature = "debug-logs")]
log::debug!("DownstreamState::set_body: {} bytes", data.len());
self.host.set_http_response_body(0, usize::MAX, data);
}
}
impl<S> FromContext<UpgradeUpstreamContext<S>> for UpstreamState {
type Error = BoxError;
fn from_context(context: &UpgradeUpstreamContext<S>) -> Result<Self, Self::Error> {
let host: Rc<dyn Host> = context.extract()?;
let reactor = Rc::clone(context.reactor());
Ok(UpstreamState::new(host, reactor))
}
}
impl<S> FromContext<UpgradeDownstreamContext<S>> for DownstreamState {
type Error = BoxError;
fn from_context(context: &UpgradeDownstreamContext<S>) -> Result<Self, Self::Error> {
let host: Rc<dyn Host> = context.extract()?;
let reactor = Rc::clone(context.reactor());
Ok(DownstreamState::new(host, reactor))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn upstream_state_can_be_constructed() {
use crate::reactor::websocket::WebSocketReactor;
use crate::types::HttpCid;
let reactor = Rc::new(WebSocketReactor::new(HttpCid::from(1)));
let host: Rc<dyn crate::host::Host> = Rc::new(crate::host::DefaultHost);
let _state = UpstreamState::new(host, reactor);
}
#[test]
fn downstream_state_can_be_constructed() {
use crate::reactor::websocket::WebSocketReactor;
use crate::types::HttpCid;
let reactor = Rc::new(WebSocketReactor::new(HttpCid::from(1)));
let host: Rc<dyn crate::host::Host> = Rc::new(crate::host::DefaultHost);
let _state = DownstreamState::new(host, reactor);
}
}