Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

//! The entrypoint to configure your function as a filter.

use futures::{Stream, StreamExt};
use std::{
    convert::Infallible,
    future::{ready, Ready},
    task::{Poll, Waker},
};

use crate::{
    extract::{
        context::{ConfigureContext, FilterContext},
        Exclusive, FromContextOnce,
    },
    handler::{Handler, IntoHandler},
    reactor::{http::HttpReactor, root::RootReactor},
};
use std::rc::Rc;

/// Launcher for asynchronous dispatch of filters through the [`Self::launch()`] method.
/// This type is intended to be injected in configuration function.
pub struct Launcher {
    context: ConfigureContext,
}

/// The error type for [`Launcher::launch()`].
#[derive(thiserror::Error, Debug)]
#[error("Launch Error")]
pub struct LaunchError {}

impl Launcher {
    pub(crate) fn new(context: ConfigureContext) -> Self {
        Self { context }
    }

    /// Launches a `filter` to handle incoming concurrent requests.
    /// This method ends when no more requests can be handled.
    ///
    /// * `filter` - The filter to be launched.
    ///
    /// # Errors
    ///
    /// This method will return an error in the following situation, but is not
    /// limited to just this cases:
    ///
    /// * a `filter` can not be injected with extractible arguments.
    pub async fn launch<F, I>(self, filter: F) -> Result<(), LaunchError>
    where
        F: IntoHandler<FilterContext, I, Output = ()>,
    {
        let configure_context = Rc::new(self.context);
        let filter = filter.into_handler();
        let contexts = ContextCreateStream::new(configure_context.root_reactor.clone());

        contexts
            .map(|http_reactor| (&filter, &configure_context, http_reactor))
            .for_each_concurrent(
                None,
                |(filter, configure_context, http_reactor)| async move {
                    let context =
                        FilterContext::new(configure_context.clone(), http_reactor.clone());

                    match filter.call(context).await {
                        Ok(()) => {}
                        Err(extraction_error) => {
                            log::error!("Extraction problem in filter: {extraction_error}");
                        }
                    }
                },
            )
            .await;

        Ok(())
    }
}

impl FromContextOnce<ConfigureContext> for Launcher {
    type Error = Infallible;

    type Future<'c> = Ready<Result<Self, Self::Error>>;

    fn from_context_once(context: Exclusive<ConfigureContext>) -> Self::Future<'_> {
        // Ideally we should pass a context borrowing instead of cloning it.
        let context = ConfigureContext {
            host: context.host.clone(),
            clock: context.clock.clone(),
            grpc_host: context.grpc_host.clone(),
            shared_data: context.shared_data.clone(),
            #[cfg(feature = "experimental_metrics")]
            metrics: context.metrics.clone(),
            root_reactor: context.root_reactor.clone(),
            unique_extractions: Default::default(),
        };
        ready(Ok(Launcher::new(context)))
    }
}

struct ContextCreateStream {
    reactor: Rc<RootReactor>,
    waker: Option<Waker>,
}

impl ContextCreateStream {
    fn new(reactor: Rc<RootReactor>) -> Self {
        Self {
            reactor,
            waker: None,
        }
    }
}

impl Unpin for ContextCreateStream {}

impl Stream for ContextCreateStream {
    type Item = Rc<HttpReactor>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        if self.reactor.done() {
            self.reactor.take_create_waker();
            return Poll::Ready(None);
        }

        if let Some(http_reactor) = self.reactor.take_new_http_reactor() {
            return Poll::Ready(Some(http_reactor));
        }

        match &self.waker {
            None => {
                // Register the waker in the reactor.
                self.reactor.insert_create_waker(cx.waker().clone());
                self.waker = Some(cx.waker().clone());
            }
            Some(waker) if !waker.will_wake(cx.waker()) => {
                self.reactor.insert_create_waker(waker.clone());
                self.waker = Some(cx.waker().clone());
            }
            Some(_) => {}
        }
        Poll::Pending
    }
}