use std::{cell::RefCell, collections::BTreeMap, task::Waker};
use crate::{
event::{EventKind, FiniteEvent, Start},
types::HttpCid,
};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum ExchangePhase {
Request,
Response,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum FlowStatus {
Unsuspended,
Suspended,
}
impl FlowStatus {
pub fn is_suspended(&self) -> bool {
matches!(self, Self::Suspended)
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct WakerId(usize);
impl PartialOrd for WakerId {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.cmp(&other.0))
}
}
impl Ord for WakerId {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.cmp(&other.0)
}
}
struct WakerIdGenerator {
last_id: WakerId,
}
impl WakerIdGenerator {
fn new() -> Self {
Self {
last_id: WakerId(0),
}
}
fn generate(&mut self) -> WakerId {
let last_id = self.last_id;
self.last_id = WakerId(last_id.0 + 1);
last_id
}
}
struct RawHttpReactor {
id_generator: WakerIdGenerator,
cancelled_request: bool,
cancelled_response: bool,
paused_request: bool,
paused_response: bool,
headers_paused_request: bool,
headers_paused_response: bool,
eos_paused_request: bool,
eos_paused_response: bool,
current_event: FiniteEvent,
event_count: usize,
request_status: FlowStatus,
response_status: FlowStatus,
wakers: BTreeMap<(EventKind, WakerId), Waker>,
}
impl RawHttpReactor {
fn update_exchange_status(&mut self, event: &FiniteEvent) {
match (&self.current_event, event) {
(_, FiniteEvent::ExchangeComplete(_b)) => {
self.request_status = FlowStatus::Suspended;
self.response_status = FlowStatus::Suspended;
}
(FiniteEvent::Start(_a), b) if b.kind() >= EventKind::ResponseHeaders => {
self.request_status = FlowStatus::Suspended;
}
(FiniteEvent::RequestHeaders(a), b)
if !a.end_of_stream && b.kind() >= EventKind::ResponseHeaders =>
{
self.request_status = FlowStatus::Suspended;
}
(FiniteEvent::RequestBody(a), b)
if !a.end_of_stream && b.kind() > EventKind::RequestBody =>
{
self.request_status = FlowStatus::Suspended;
}
(FiniteEvent::ResponseBody(a), b)
if !a.end_of_stream && b.kind() > EventKind::ResponseBody =>
{
self.response_status = FlowStatus::Suspended;
}
(_, _) => {}
}
}
pub fn notify(&mut self, event: FiniteEvent) {
self.update_exchange_status(&event);
self.event_count += 1;
let kind = event.kind();
self.current_event = event;
self.wakers
.iter()
.filter(|((e, _), _)| e <= &kind)
.for_each(|((_, _id), w)| w.wake_by_ref());
}
pub fn insert_waker(&mut self, event: EventKind, waker: Waker) -> WakerId {
let id = self.id_generator.generate();
self.wakers.insert((event, id), waker);
id
}
pub fn remove_waker(&mut self, event: EventKind, id: WakerId) {
self.wakers.remove(&(event, id));
}
}
pub struct HttpReactor {
context_id: HttpCid,
raw: RefCell<RawHttpReactor>,
}
impl HttpReactor {
pub fn new(context_id: HttpCid) -> Self {
HttpReactor {
context_id,
raw: RefCell::new(RawHttpReactor {
id_generator: WakerIdGenerator::new(),
cancelled_request: false,
cancelled_response: false,
paused_request: false,
paused_response: false,
headers_paused_request: false,
headers_paused_response: false,
eos_paused_request: false,
eos_paused_response: false,
current_event: FiniteEvent::from(Start {
_context_id: context_id,
}),
event_count: 0,
request_status: FlowStatus::Unsuspended,
response_status: FlowStatus::Unsuspended,
wakers: BTreeMap::new(),
}),
}
}
pub fn context_id(&self) -> HttpCid {
self.context_id
}
pub fn request_status(&self) -> FlowStatus {
self.raw.borrow().request_status
}
pub fn response_status(&self) -> FlowStatus {
self.raw.borrow().response_status
}
pub fn notify(&self, event: FiniteEvent) {
self.raw.borrow_mut().notify(event);
}
pub fn cancel_request(&self) {
self.raw.borrow_mut().cancelled_request = true;
}
pub fn cancel_response(&self) {
self.raw.borrow_mut().cancelled_response = true;
}
pub fn cancelled_request(&self) -> bool {
self.raw.borrow().cancelled_request
}
pub fn cancelled_response(&self) -> bool {
self.raw.borrow().cancelled_response
}
pub fn paused(&self) -> bool {
match self.phase() {
ExchangePhase::Request => self.raw.borrow().paused_request,
ExchangePhase::Response => self.raw.borrow().paused_response,
}
}
pub fn set_paused(&self, paused: bool) {
#[cfg(feature = "debug-logs")]
log::debug!("set_paused: {paused}");
match self.phase() {
ExchangePhase::Request => self.raw.borrow_mut().paused_request = paused,
ExchangePhase::Response => self.raw.borrow_mut().paused_response = paused,
}
}
pub fn headers_event_paused(&self) -> bool {
match self.phase() {
ExchangePhase::Request => self.raw.borrow().headers_paused_request,
ExchangePhase::Response => self.raw.borrow().headers_paused_response,
}
}
pub fn headers_paused(&self) -> bool {
match self.current_event() {
EventKind::RequestHeaders => self.raw.borrow().headers_paused_request,
EventKind::ResponseHeaders => self.raw.borrow().headers_paused_response,
_ => false,
}
}
pub fn set_headers_paused(&self, headers_paused: bool) {
match self.current_event() {
EventKind::RequestHeaders => {
self.raw.borrow_mut().headers_paused_request = headers_paused
}
EventKind::ResponseHeaders => {
self.raw.borrow_mut().headers_paused_response = headers_paused
}
_ => {}
}
}
pub fn set_eos_paused(&self, paused: bool) {
match self.phase() {
ExchangePhase::Request => self.raw.borrow_mut().eos_paused_request = paused,
ExchangePhase::Response => self.raw.borrow_mut().eos_paused_response = paused,
}
}
pub fn eos_paused(&self) -> bool {
match self.phase() {
ExchangePhase::Request => self.raw.borrow_mut().eos_paused_request,
ExchangePhase::Response => self.raw.borrow_mut().eos_paused_response,
}
}
pub fn current_event(&self) -> EventKind {
self.raw.borrow().current_event.kind()
}
pub fn event_count(&self) -> usize {
self.raw.borrow().event_count
}
pub fn cloned_finite_event(&self) -> FiniteEvent {
self.raw.borrow().current_event.clone()
}
pub fn insert_waker(&self, event: EventKind, waker: Waker) -> WakerId {
self.raw.borrow_mut().insert_waker(event, waker)
}
pub fn remove_waker(&self, event: EventKind, id: WakerId) {
self.raw.borrow_mut().remove_waker(event, id)
}
pub fn is_done(&self) -> bool {
matches!(self.current_event(), EventKind::ExchangeComplete)
}
pub fn phase(&self) -> ExchangePhase {
match self.current_event() {
EventKind::Start
| EventKind::RequestHeaders
| EventKind::RequestBody
| EventKind::RequestTrailers => ExchangePhase::Request,
EventKind::ResponseHeaders
| EventKind::ResponseBody
| EventKind::ResponseTrailers
| EventKind::ExchangeComplete => ExchangePhase::Response,
}
}
}