Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::{cell::RefCell, error::Error, rc::Rc};

use crate::{
    grpc::transport::{GrpcCallId, GrpcCallResponse, GrpcResponseParts},
    grpc::GrpcStatusCode,
    host::grpc::GrpcHost,
    proxy_wasm::{
        traits::{Context, HttpContext, RootContext},
        types::ContextType,
    },
};
use futures::{executor::LocalPool, task::LocalSpawnExt};

use crate::{
    client::HttpCallResponse,
    extract::context::ConfigureContext,
    handler::{Handler, IntoHandlerResult},
    host::{clock::Clock, shared_data::SharedData, Host},
    middleware::EventHandlerStack,
    reactor::root::RootReactor,
    types::{HttpCid, RootCid},
};

use super::{error::ErrorContext, http::AsyncHttpContext};

#[derive(Clone)]
enum ConfigurationState {
    Started,
    Finished,
    Failed(Rc<dyn Error>),
}

pub(crate) struct AsyncRootContext<C> {
    context_id: RootCid,
    state: Rc<RefCell<ConfigurationState>>,
    host: Rc<dyn Host>,
    clock: Rc<dyn Clock>,
    grpc_host: Rc<dyn GrpcHost>,
    shared_data: Rc<dyn SharedData>,
    #[cfg(feature = "experimental_metrics")]
    metrics: Rc<dyn crate::host::metrics::MetricsHost>,
    executor: Rc<RefCell<LocalPool>>,
    reactor: Rc<RootReactor>,
    event_handlers: Rc<RefCell<EventHandlerStack>>,
    configure: Rc<C>,
}

impl<C> AsyncRootContext<C> {
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        context_id: RootCid,
        host: Rc<dyn Host>,
        clock: Rc<dyn Clock>,
        grpc_host: Rc<dyn GrpcHost>,
        shared_data: Rc<dyn SharedData>,
        #[cfg(feature = "experimental_metrics")] metrics: Rc<dyn crate::host::metrics::MetricsHost>,
        event_handlers: EventHandlerStack,
        configure: C,
    ) -> Self {
        Self {
            context_id,
            state: Rc::new(RefCell::new(ConfigurationState::Started)),
            host,
            clock,
            grpc_host,
            shared_data,
            #[cfg(feature = "experimental_metrics")]
            metrics,
            executor: Rc::new(RefCell::new(LocalPool::new())),
            reactor: Rc::new(RootReactor::new(context_id)),
            event_handlers: Rc::new(RefCell::new(event_handlers)),
            configure: Rc::new(configure),
        }
    }
}

impl<C> Context for AsyncRootContext<C> {
    fn on_http_call_response(
        &mut self,
        token_id: u32,
        num_headers: usize,
        body_size: usize,
        num_trailers: usize,
    ) {
        self.reactor.notify_response(HttpCallResponse {
            request_id: token_id.into(),
            num_headers,
            body_size,
            num_trailers,
        });

        self.reactor.set_active_cid(self.context_id.into());

        self.executor.borrow_mut().run_until_stalled();
    }

    fn on_grpc_call_response(&mut self, token_id: u32, status_code: u32, response_size: usize) {
        let event = GrpcCallResponse {
            call_id: GrpcCallId::new(token_id),
            status_code,
            response_size,
        };

        let (content, status) = if status_code == GrpcStatusCode::Ok.code() {
            // Ask for response body only if status is OK
            (
                self.grpc_host.get_grpc_call_response_body(0, response_size),
                None,
            )
        } else {
            // Ask for status message only if status is not OK
            (None, self.grpc_host.get_grpc_status())
        };

        self.reactor.notify_grpc_call_response(GrpcResponseParts {
            event,
            content,
            status,
        });

        self.reactor.set_active_cid(self.context_id.into());
        self.executor.borrow_mut().run_until_stalled();
    }

    fn on_done(&mut self) -> bool {
        self.reactor.set_done();
        true
    }
}

impl<C> RootContext for AsyncRootContext<C>
where
    C: Handler<ConfigureContext> + 'static,
    C::Output: IntoHandlerResult,
{
    fn get_type(&self) -> Option<ContextType> {
        Some(ContextType::HttpContext)
    }

    fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
        match *self.state.borrow() {
            ConfigurationState::Failed(ref error) => {
                let error_context = ErrorContext::new(self.host.clone(), error.clone());
                Some(Box::new(error_context))
            }
            ConfigurationState::Finished => {
                log::error!("Inconsistent filter state.");
                let error: Box<dyn Error> =
                    Box::from("Incoming request after the configure function finished");
                let error_context = ErrorContext::new(self.host.clone(), error.into());
                Some(Box::new(error_context))
            }
            ConfigurationState::Started => {
                let context_id = HttpCid::from(context_id);
                let executor = self.executor.clone();
                let host = self.host.clone();
                let grpc_host = self.grpc_host.clone();
                let config_reactor = self.reactor.clone();
                let reactor = if let Some(reactor) = config_reactor.create_http_context(context_id)
                {
                    reactor
                } else {
                    log::warn!(
                        "There is no current HttpContext available. \
                            A filter function was not launched in configure function or \
                            an async operation is still in progress."
                    );
                    let error: Box<dyn Error> = Box::from("Configuration in pending state");
                    let error_context = ErrorContext::new(self.host.clone(), error.into());
                    return Some(Box::new(error_context));
                };

                config_reactor.set_active_cid(context_id.into());
                self.executor.borrow_mut().run_until_stalled();
                let filter = AsyncHttpContext::new(
                    context_id,
                    executor,
                    host,
                    grpc_host,
                    config_reactor.clone(),
                    reactor,
                    self.event_handlers.clone(),
                );

                config_reactor.set_active_cid(self.context_id.into());

                Some(Box::new(filter))
            }
        }
    }

    fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool {
        let host = self.host.clone();
        let clock = self.clock.clone();
        let grpc_host = self.grpc_host.clone();
        let shared_data = self.shared_data.clone();
        #[cfg(feature = "experimental_metrics")]
        let metrics = self.metrics.clone();
        let reactor = self.reactor.clone();
        reactor.set_active_cid(self.context_id.into());
        let context = ConfigureContext {
            host: host.clone(),
            clock: clock.clone(),
            grpc_host: grpc_host.clone(),
            shared_data: shared_data.clone(),
            #[cfg(feature = "experimental_metrics")]
            metrics: metrics.clone(),
            root_reactor: reactor,
            unique_extractions: Default::default(),
        };
        let state = self.state.clone();
        let configure = self.configure.clone();
        let task = async move {
            match configure.call(context).await {
                Ok(result) => {
                    let handler_output = result.into_handler_result();
                    match handler_output {
                        Ok(()) => *state.borrow_mut() = ConfigurationState::Finished,
                        Err(error) => {
                            log::error!("Launcher problem: {error}");
                            *state.borrow_mut() = ConfigurationState::Failed(error.into());
                        }
                    }
                }
                Err(error) => {
                    log::error!("Configuration problem in extraction: {error}");
                    *state.borrow_mut() = ConfigurationState::Failed(Rc::new(error));
                }
            }
        };
        let spawn_result = self.executor.borrow().spawner().spawn_local(task);
        if let Err(error) = spawn_result {
            log::error!("Configuration problem: {error}");
            *self.state.borrow_mut() = ConfigurationState::Failed(Rc::new(error));
        }
        self.executor.borrow_mut().run_until_stalled();
        true
    }

    fn on_tick(&mut self) {
        if self.reactor.notify_tick() {
            self.executor.borrow_mut().run_until_stalled();
        }
    }
}