Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::{
    cell::RefCell,
    future::{ready, Ready},
    rc::Rc,
};

use super::{
    body::BodyExchange,
    body_stream::{BodyStream, BodyStreamExchange},
    dynamic_exchange::DynamicExchange,
    entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState},
    headers::HeadersExchange,
    BodyState, EntityState, HeadersState, Response,
};
#[cfg(feature = "enable_stop_iteration")]
use super::{
    entity::{HeadersBodyState, IntoHeadersBodyState},
    headers_body::HeadersBodyExchange,
};
#[cfg(feature = "experimental")]
use super::{BodyError, Chunk};
use crate::http_constants::HEADER_STATUS;
use crate::{
    event::{EventKind, ResponseBody, ResponseHeaders},
    BoxFuture,
};

/// Initial state of the Response state-machine.
/// This type is intended to be injected as a parameter in Response filters.
pub struct ResponseState {
    contains_body: bool,
    exchange: Rc<RefCell<DynamicExchange>>,
}

/// Error that represents an invalid state in the Response state-machine.
#[derive(thiserror::Error, Debug)]
pub enum InvalidResponseState {
    /// Unexpected low-level event.
    #[error("Invalid response state: {0:?}")]
    Event(EventKind),

    /// Unexpected early response.
    #[error("Early response")]
    EarlyResponse,
}

impl ResponseState {
    #[allow(clippy::await_holding_refcell_ref)]
    pub(crate) async fn new(
        exchange: Rc<RefCell<DynamicExchange>>,
    ) -> Result<Self, InvalidResponseState> {
        let contains_body;
        {
            let mut e = exchange.borrow_mut();
            let Some(exchange) = e.wait_for_event::<ResponseHeaders>().await else {
                let error = exchange
                    .borrow()
                    .current_event()
                    .map(InvalidResponseState::Event)
                    .unwrap_or(InvalidResponseState::EarlyResponse);
                return Err(error);
            };
            contains_body = !exchange
                .event_data()
                .expect("Must contain headers")
                .event
                .end_of_stream;
        }

        Ok(Self {
            contains_body,
            exchange,
        })
    }
}

impl IntoBodyState for ResponseState {
    type BodyState = ResponseBodyState;
    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;

    fn into_body_state(self) -> Self::BodyStateFuture {
        Box::pin(async {
            ResponseBodyState {
                inner: BodyExchange::new(self.exchange, self.contains_body).await,
            }
        })
    }
}

impl IntoBodyStreamState for ResponseState {
    type BodyStreamState = ResponseBodyStreamState;

    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;

    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
        Box::pin(async {
            ResponseBodyStreamState {
                inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
            }
        })
    }
}

#[cfg(feature = "enable_stop_iteration")]
impl IntoHeadersBodyState for ResponseState {
    type HeadersBodyState = ResponseHeadersBodyState;
    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;

    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
        Box::pin(async {
            ResponseHeadersBodyState {
                inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
            }
        })
    }
}

impl EntityState for ResponseState {
    type HeadersState = ResponseHeadersState;
    type HeadersStateFuture = Ready<Self::HeadersState>;

    fn into_headers_state(self) -> Self::HeadersStateFuture {
        ready(ResponseHeadersState {
            inner: HeadersExchange::new(self.exchange, self.contains_body),
        })
    }
}

/// Headers state of the Response state-machine.
pub struct ResponseHeadersState {
    inner: HeadersExchange<ResponseHeaders, ResponseBody>,
}

impl ResponseHeadersState {
    pub fn status_code(&self) -> u32 {
        self.handler()
            .header(HEADER_STATUS)
            .and_then(|status| status.parse::<u32>().ok())
            .unwrap_or_default()
    }

    pub fn send_response(self, response: Response) {
        self.inner
            .send_response(response.status_code(), response.headers(), response.body());
    }
}

impl IntoBodyState for ResponseHeadersState {
    type BodyState = ResponseBodyState;

    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;

    fn into_body_state(self) -> Self::BodyStateFuture {
        Box::pin(async {
            ResponseBodyState {
                inner: self.inner.into_body_state().await,
            }
        })
    }
}

impl IntoBodyStreamState for ResponseHeadersState {
    type BodyStreamState = ResponseBodyStreamState;

    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;

    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
        Box::pin(async {
            ResponseBodyStreamState {
                inner: self.inner.into_body_stream_state().await,
            }
        })
    }
}

#[cfg(feature = "enable_stop_iteration")]
impl IntoHeadersBodyState for ResponseHeadersState {
    type HeadersBodyState = ResponseHeadersBodyState;
    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;

    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
        Box::pin(async {
            ResponseHeadersBodyState {
                inner: self.inner.into_headers_body_state().await,
            }
        })
    }
}

impl HeadersState for ResponseHeadersState {
    fn handler(&self) -> &dyn super::HeadersHandler {
        self.inner.handler()
    }

    fn contains_body(&self) -> bool {
        self.inner.contains_body
    }
}

/// Body state of the Response state-machine.
pub struct ResponseBodyState {
    inner: BodyExchange<ResponseBody>,
}

impl BodyState for ResponseBodyState {
    fn handler(&self) -> &dyn super::BodyHandler {
        &self.inner
    }

    fn contains_body(&self) -> bool {
        self.inner.contains_body()
    }
}

pub struct ResponseBodyStreamState {
    inner: BodyStreamExchange<ResponseBody>,
}

impl BodyStreamState for ResponseBodyStreamState {
    type Stream<'b>
        = BodyStream<'b>
    where
        Self: 'b;

    fn contains_body(&self) -> bool {
        self.inner.contains_body()
    }

    fn stream(&self) -> Self::Stream<'_> {
        self.inner.stream()
    }

    #[cfg(feature = "experimental")]
    fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
        self.inner.write_chunk(chunk)
    }
}

#[cfg(feature = "enable_stop_iteration")]
/// HeadersBody state of the Response state-machine.
pub struct ResponseHeadersBodyState {
    inner: HeadersBodyExchange<ResponseBody, ResponseHeaders>,
}

#[cfg(feature = "enable_stop_iteration")]
impl HeadersBodyState for ResponseHeadersBodyState {
    fn handler(&self) -> &dyn super::HeadersBodyHandler {
        &self.inner
    }

    fn contains_body(&self) -> bool {
        self.inner.contains_body()
    }
}