Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

//! WebSocket frame handling for the high-level framework.
//!
//! Provides state machines for processing WebSocket frames:
//! - [`UpstreamState`] - Handles client→server WebSocket frames
//! - [`DownstreamState`] - Handles server→client WebSocket frames
//!
//! # Example
//!
//! ```ignore
//! use pdk_websockets_lib::{Decoder, Encoder};
//!
//! async fn handle_upstream(mut state: UpstreamState) -> Result<(), BoxError> {
//!     let mut remainder = Vec::new();
//!     loop {
//!         state = state.next().await;
//!         let bytes = state.bytes();
//!
//!         let full_buffer = [&remainder[..], &bytes[..]].concat();
//!         let (frames, new_remainder) = Decoder::parse(full_buffer)?;
//!         remainder = new_remainder;
//!
//!         // Process frames...
//!         let encoded = Encoder::default().encode_server(frames);
//!         state.set_body(&encoded);
//!     }
//! }
//! ```

use std::rc::Rc;

use crate::extract::context::{UpgradeDownstreamContext, UpgradeUpstreamContext};
use crate::extract::{Extract, FromContext};
use crate::host::Host;
use crate::reactor::websocket::WebSocketReactor;
use crate::BoxError;

/// State machine for upstream (client→server) WebSocket frames.
///
/// Provides access to frame buffers and async event handling.
pub struct UpstreamState {
    host: Rc<dyn Host>,
    reactor: Rc<WebSocketReactor>,
}

/// State machine for downstream (server→client) WebSocket frames.
pub struct DownstreamState {
    host: Rc<dyn Host>,
    reactor: Rc<WebSocketReactor>,
}

impl UpstreamState {
    pub(crate) fn new(host: Rc<dyn Host>, reactor: Rc<WebSocketReactor>) -> Self {
        Self { host, reactor }
    }

    /// Pauses and waits for more data to accumulate.
    ///
    /// Use when a frame is incomplete and you need more bytes before processing.
    pub async fn accumulate(self) -> Self {
        #[cfg(feature = "debug-logs")]
        log::debug!("UpstreamState::accumulate()");
        let reactor = Rc::clone(&self.reactor);

        // Accumulate should buffer
        reactor.set_upstream_paused(true);

        // Clear the data_ready flag to force waiting for new client data
        reactor.set_upstream_data_ready(false);

        // Await for the next call to http_request_body
        std::future::poll_fn(move |cx| {
            if reactor.poll_upstream_data_ready() {
                #[cfg(feature = "debug-logs")]
                log::debug!("UpstreamState::accumulate: data ready");
                std::task::Poll::Ready(())
            } else {
                #[cfg(feature = "debug-logs")]
                log::debug!("UpstreamState::accumulate: pending");
                reactor.register_upstream_waker(cx.waker().clone());
                std::task::Poll::Pending
            }
        })
        .await;

        self
    }

    /// Releases the buffer and waits for the next event.
    ///
    /// Use after processing a complete frame and writing output via `set_body()`.
    pub async fn next(self) -> Self {
        #[cfg(feature = "debug-logs")]
        log::debug!("UpstreamState::next()");
        let reactor = Rc::clone(&self.reactor);

        // Next releases the buffer and awaits for new body events
        reactor.set_upstream_paused(false);

        // Clear the data_ready flag to force waiting for new client data
        reactor.set_upstream_data_ready(false);

        // Await for the next call to http_request_body
        std::future::poll_fn(move |cx| {
            if reactor.poll_upstream_data_ready() {
                #[cfg(feature = "debug-logs")]
                log::debug!("UpstreamState::next: data ready");
                std::task::Poll::Ready(())
            } else {
                #[cfg(feature = "debug-logs")]
                log::debug!("UpstreamState::next: pending");
                reactor.register_upstream_waker(cx.waker().clone());
                std::task::Poll::Pending
            }
        })
        .await;

        self
    }

    /// Returns the current request body buffer as raw bytes.
    ///
    /// The returned buffer contains accumulated WebSocket frame data from the client.
    /// User code is responsible for parsing frames and managing remainder bytes.
    pub fn bytes(&self) -> Vec<u8> {
        self.host
            .get_http_request_body(0, usize::MAX)
            .unwrap_or_default()
    }

    /// Writes encoded frame data to the request body.
    pub fn set_body(&self, data: &[u8]) {
        #[cfg(feature = "debug-logs")]
        log::debug!("UpstreamState::set_body: {} bytes", data.len());
        self.host.set_http_request_body(0, usize::MAX, data);
    }
}

impl DownstreamState {
    pub(crate) fn new(host: Rc<dyn Host>, reactor: Rc<WebSocketReactor>) -> Self {
        Self { host, reactor }
    }

    /// Pauses and waits for more data to accumulate.
    ///
    /// Use when a frame is incomplete and you need more bytes before processing.
    pub async fn accumulate(self) -> Self {
        #[cfg(feature = "debug-logs")]
        log::debug!("DownstreamState::accumulate()");
        let reactor = Rc::clone(&self.reactor);

        // Accumulate should buffer
        reactor.set_downstream_paused(true);

        // Clear the data_ready flag to force waiting for new server data
        reactor.set_downstream_data_ready(false);

        // Await for the next call to http_response_body
        std::future::poll_fn(move |cx| {
            if reactor.poll_downstream_data_ready() {
                #[cfg(feature = "debug-logs")]
                log::debug!("DownstreamState::accumulate: data ready");
                std::task::Poll::Ready(())
            } else {
                #[cfg(feature = "debug-logs")]
                log::debug!("DownstreamState::accumulate: pending");
                reactor.register_downstream_waker(cx.waker().clone());
                std::task::Poll::Pending
            }
        })
        .await;

        self
    }

    /// Releases the buffer and waits for the next event.
    ///
    /// Use after processing a complete frame and writing output via `set_body()`.
    pub async fn next(self) -> Self {
        #[cfg(feature = "debug-logs")]
        log::debug!("DownstreamState::next()");
        let reactor = Rc::clone(&self.reactor);

        // Next releases the buffer and awaits for new body events
        reactor.set_downstream_paused(false);

        // Clear the data_ready flag to force waiting for new server data
        reactor.set_downstream_data_ready(false);

        // Await for the next call to http_response_body
        std::future::poll_fn(move |cx| {
            if reactor.poll_downstream_data_ready() {
                #[cfg(feature = "debug-logs")]
                log::debug!("DownstreamState::next: data ready");
                std::task::Poll::Ready(())
            } else {
                #[cfg(feature = "debug-logs")]
                log::debug!("DownstreamState::next: pending");
                reactor.register_downstream_waker(cx.waker().clone());
                std::task::Poll::Pending
            }
        })
        .await;

        self
    }

    /// Returns the complete response buffer.
    /// User code manages remainder bytes to track what has been parsed.
    pub fn bytes(&self) -> Vec<u8> {
        self.host
            .get_http_response_body(0, usize::MAX)
            .unwrap_or_default()
    }

    /// Writes encoded frame data to send to client.
    pub fn set_body(&self, data: &[u8]) {
        #[cfg(feature = "debug-logs")]
        log::debug!("DownstreamState::set_body: {} bytes", data.len());
        self.host.set_http_response_body(0, usize::MAX, data);
    }
}

impl<S> FromContext<UpgradeUpstreamContext<S>> for UpstreamState {
    type Error = BoxError;

    fn from_context(context: &UpgradeUpstreamContext<S>) -> Result<Self, Self::Error> {
        let host: Rc<dyn Host> = context.extract()?;
        let reactor = Rc::clone(context.reactor());
        Ok(UpstreamState::new(host, reactor))
    }
}

impl<S> FromContext<UpgradeDownstreamContext<S>> for DownstreamState {
    type Error = BoxError;

    fn from_context(context: &UpgradeDownstreamContext<S>) -> Result<Self, Self::Error> {
        let host: Rc<dyn Host> = context.extract()?;
        let reactor = Rc::clone(context.reactor());
        Ok(DownstreamState::new(host, reactor))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn upstream_state_can_be_constructed() {
        use crate::reactor::websocket::WebSocketReactor;
        use crate::types::HttpCid;

        let reactor = Rc::new(WebSocketReactor::new(HttpCid::from(1)));
        let host: Rc<dyn crate::host::Host> = Rc::new(crate::host::DefaultHost);

        let _state = UpstreamState::new(host, reactor);
    }

    #[test]
    fn downstream_state_can_be_constructed() {
        use crate::reactor::websocket::WebSocketReactor;
        use crate::types::HttpCid;

        let reactor = Rc::new(WebSocketReactor::new(HttpCid::from(1)));
        let host: Rc<dyn crate::host::Host> = Rc::new(crate::host::DefaultHost);

        let _state = DownstreamState::new(host, reactor);
    }
}