Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::{cell::RefCell, collections::BTreeMap, task::Waker};

use crate::{
    event::{EventKind, FiniteEvent, Start},
    types::HttpCid,
};

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum ExchangePhase {
    Request,
    Response,
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum FlowStatus {
    Unsuspended,
    Suspended,
}

impl FlowStatus {
    pub fn is_suspended(&self) -> bool {
        matches!(self, Self::Suspended)
    }
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct WakerId(usize);

impl PartialOrd for WakerId {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.0.cmp(&other.0))
    }
}

impl Ord for WakerId {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.0.cmp(&other.0)
    }
}

struct WakerIdGenerator {
    last_id: WakerId,
}

impl WakerIdGenerator {
    fn new() -> Self {
        Self {
            last_id: WakerId(0),
        }
    }

    fn generate(&mut self) -> WakerId {
        let last_id = self.last_id;
        self.last_id = WakerId(last_id.0 + 1);
        last_id
    }
}

struct RawHttpReactor {
    id_generator: WakerIdGenerator,
    cancelled_request: bool,
    cancelled_response: bool,
    paused_request: bool,
    paused_response: bool,
    headers_paused_request: bool,
    headers_paused_response: bool,
    eos_paused_request: bool,
    eos_paused_response: bool,
    current_event: FiniteEvent,
    event_count: usize,
    request_status: FlowStatus,
    response_status: FlowStatus,
    wakers: BTreeMap<(EventKind, WakerId), Waker>,
}

impl RawHttpReactor {
    fn update_exchange_status(&mut self, event: &FiniteEvent) {
        // Determine if the current flow has been suspended
        match (&self.current_event, event) {
            (_, FiniteEvent::ExchangeComplete(_b)) => {
                // Abort all execution if the request was dropped.
                self.request_status = FlowStatus::Suspended;
                self.response_status = FlowStatus::Suspended;
            }
            (FiniteEvent::Start(_a), b) if b.kind() >= EventKind::ResponseHeaders => {
                self.request_status = FlowStatus::Suspended;
            }
            (FiniteEvent::RequestHeaders(a), b)
                if !a.end_of_stream && b.kind() >= EventKind::ResponseHeaders =>
            {
                self.request_status = FlowStatus::Suspended;
            }
            (FiniteEvent::RequestBody(a), b)
                if !a.end_of_stream && b.kind() > EventKind::RequestBody =>
            {
                self.request_status = FlowStatus::Suspended;
            }
            (FiniteEvent::ResponseBody(a), b)
                if !a.end_of_stream && b.kind() > EventKind::ResponseBody =>
            {
                self.response_status = FlowStatus::Suspended;
            }
            (_, _) => {}
        }
    }

    pub fn notify(&mut self, event: FiniteEvent) {
        self.update_exchange_status(&event);

        self.event_count += 1;
        let kind = event.kind();
        self.current_event = event;

        self.wakers
            .iter()
            .filter(|((e, _), _)| e <= &kind)
            .for_each(|((_, _id), w)| w.wake_by_ref());
    }

    pub fn insert_waker(&mut self, event: EventKind, waker: Waker) -> WakerId {
        let id = self.id_generator.generate();
        self.wakers.insert((event, id), waker);
        id
    }

    pub fn remove_waker(&mut self, event: EventKind, id: WakerId) {
        self.wakers.remove(&(event, id));
    }
}

pub struct HttpReactor {
    context_id: HttpCid,
    raw: RefCell<RawHttpReactor>,
}

impl HttpReactor {
    pub fn new(context_id: HttpCid) -> Self {
        HttpReactor {
            context_id,
            raw: RefCell::new(RawHttpReactor {
                id_generator: WakerIdGenerator::new(),
                cancelled_request: false,
                cancelled_response: false,
                paused_request: false,
                paused_response: false,
                headers_paused_request: false,
                headers_paused_response: false,
                eos_paused_request: false,
                eos_paused_response: false,
                current_event: FiniteEvent::from(Start {
                    _context_id: context_id,
                }),
                event_count: 0,
                request_status: FlowStatus::Unsuspended,
                response_status: FlowStatus::Unsuspended,
                wakers: BTreeMap::new(),
            }),
        }
    }

    pub fn context_id(&self) -> HttpCid {
        self.context_id
    }

    pub fn request_status(&self) -> FlowStatus {
        self.raw.borrow().request_status
    }

    pub fn response_status(&self) -> FlowStatus {
        self.raw.borrow().response_status
    }

    pub fn notify(&self, event: FiniteEvent) {
        self.raw.borrow_mut().notify(event);
    }

    pub fn cancel_request(&self) {
        self.raw.borrow_mut().cancelled_request = true;
    }

    pub fn cancel_response(&self) {
        self.raw.borrow_mut().cancelled_response = true;
    }

    pub fn cancelled_request(&self) -> bool {
        self.raw.borrow().cancelled_request
    }

    pub fn cancelled_response(&self) -> bool {
        self.raw.borrow().cancelled_response
    }

    pub fn paused(&self) -> bool {
        match self.phase() {
            ExchangePhase::Request => self.raw.borrow().paused_request,
            ExchangePhase::Response => self.raw.borrow().paused_response,
        }
    }

    pub fn set_paused(&self, paused: bool) {
        #[cfg(feature = "debug-logs")]
        log::debug!("set_paused: {paused}");
        match self.phase() {
            ExchangePhase::Request => self.raw.borrow_mut().paused_request = paused,
            ExchangePhase::Response => self.raw.borrow_mut().paused_response = paused,
        }
    }

    pub fn headers_event_paused(&self) -> bool {
        match self.phase() {
            ExchangePhase::Request => self.raw.borrow().headers_paused_request,
            ExchangePhase::Response => self.raw.borrow().headers_paused_response,
        }
    }

    pub fn headers_paused(&self) -> bool {
        match self.current_event() {
            EventKind::RequestHeaders => self.raw.borrow().headers_paused_request,
            EventKind::ResponseHeaders => self.raw.borrow().headers_paused_response,
            _ => false,
        }
    }

    pub fn set_headers_paused(&self, headers_paused: bool) {
        match self.current_event() {
            EventKind::RequestHeaders => {
                self.raw.borrow_mut().headers_paused_request = headers_paused
            }
            EventKind::ResponseHeaders => {
                self.raw.borrow_mut().headers_paused_response = headers_paused
            }
            _ => {}
        }
    }

    pub fn set_eos_paused(&self, paused: bool) {
        match self.phase() {
            ExchangePhase::Request => self.raw.borrow_mut().eos_paused_request = paused,
            ExchangePhase::Response => self.raw.borrow_mut().eos_paused_response = paused,
        }
    }

    pub fn eos_paused(&self) -> bool {
        match self.phase() {
            ExchangePhase::Request => self.raw.borrow_mut().eos_paused_request,
            ExchangePhase::Response => self.raw.borrow_mut().eos_paused_response,
        }
    }

    pub fn current_event(&self) -> EventKind {
        self.raw.borrow().current_event.kind()
    }

    pub fn event_count(&self) -> usize {
        self.raw.borrow().event_count
    }

    pub fn cloned_finite_event(&self) -> FiniteEvent {
        self.raw.borrow().current_event.clone()
    }

    pub fn insert_waker(&self, event: EventKind, waker: Waker) -> WakerId {
        self.raw.borrow_mut().insert_waker(event, waker)
    }

    pub fn remove_waker(&self, event: EventKind, id: WakerId) {
        self.raw.borrow_mut().remove_waker(event, id)
    }

    pub fn is_done(&self) -> bool {
        matches!(self.current_event(), EventKind::ExchangeComplete)
    }

    pub fn phase(&self) -> ExchangePhase {
        match self.current_event() {
            EventKind::Start
            | EventKind::RequestHeaders
            | EventKind::RequestBody
            | EventKind::RequestTrailers => ExchangePhase::Request,
            EventKind::ResponseHeaders
            | EventKind::ResponseBody
            | EventKind::ResponseTrailers
            | EventKind::ExchangeComplete => ExchangePhase::Response,
        }
    }
}