Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use crate::grpc::error::{GrpcCallError, GrpcClientError};
use crate::grpc::transport::GrpcCallId;
use crate::host::grpc::GrpcHost;
use crate::reactor::root::RootReactor;
use crate::types::Cid;

use super::codec::Decoder;
use super::{GrpcResponse, GrpcStatus, GrpcStatusCode};

/// Represents an async oneshot gRPC request to the upstream.
pub struct GrpcCall<D> {
    reactor: Rc<RootReactor>,
    host: Rc<dyn GrpcHost>,
    cid_and_waker: Option<(Cid, Waker)>,
    call_id: Option<GrpcCallId>,
    error: Option<GrpcCallError>,
    ready: bool,
    decoder: D,
}

impl<D> GrpcCall<D> {
    pub(super) fn new(
        reactor: Rc<RootReactor>,
        host: Rc<dyn GrpcHost>,
        decoder: D,
        result: Result<GrpcCallId, GrpcCallError>,
    ) -> Self {
        let (call_id, error) = match result {
            Ok(call_id) => (Some(call_id), None),
            Err(error) => (None, Some(error)),
        };
        Self {
            reactor,
            host,
            decoder,
            cid_and_waker: None,
            call_id,
            error,
            ready: false,
        }
    }

    /// Turns this request into a result to detect call errors before awaiting the remote response.
    pub fn into_result(mut self) -> Result<Self, GrpcCallError> {
        if self.error.is_some() {
            // It is infallible to unwrap here
            return Err(self.error.take().unwrap());
        }
        Ok(self)
    }
}

impl<R> Drop for GrpcCall<R> {
    fn drop(&mut self) {
        if let Some(call_id) = self.call_id.as_ref() {
            let call_id = *call_id;

            // Just cancel if the call is not ready
            if !self.ready {
                let _ = self.host.cancel_grpc_call(call_id.token());
            }

            // Ensure that all related objects were removed
            self.reactor.remove_grpc_response(call_id);
            self.reactor.remove_grpc_client(call_id);
        }
    }
}

impl<D> Future for GrpcCall<D>
where
    D: Decoder + Unpin,
{
    type Output = Result<GrpcResponse<D::Output>, GrpcClientError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Some(call_id) = self.call_id else {
            self.ready = true;

            // It is infallible to unwrap here
            let error = self.error.take().unwrap();

            return Poll::Ready(Err(GrpcClientError::Call(error)));
        };

        if let Some(response_parts) = self.reactor.remove_grpc_response(call_id) {
            self.ready = true;
            let status = GrpcStatusCode::from_u32(response_parts.event.status_code);
            if status != GrpcStatusCode::Ok {
                return Poll::Ready(Err(GrpcClientError::Status(GrpcStatus::new(
                    status,
                    response_parts.status,
                ))));
            }
            let content = response_parts.content.unwrap_or_default();

            self.decoder
                .decode(content)
                .map(GrpcResponse::new)
                .map_err(|e| GrpcClientError::Decode(e.into()))
                .into()
        } else {
            let this = &mut *self.as_mut();
            match this.cid_and_waker.as_ref() {
                None => {
                    let cid = this.reactor.active_cid();

                    // Register the waker in the reactor.
                    this.reactor.insert_grpc_client(call_id, cx.waker().clone());
                    this.reactor.set_paused(cid, true);
                    this.cid_and_waker = Some((cid, cx.waker().clone()));
                }
                Some((cid, waker)) if !waker.will_wake(cx.waker()) => {
                    // Deregister the waker from the reactor to remove the old waker.
                    let _ = this
                        .reactor
                        .remove_grpc_client(call_id)
                        // It should be infallible to unwrap here
                        .expect("stored client");

                    // Register the waker in the reactor with the new waker.
                    this.reactor.insert_grpc_client(call_id, cx.waker().clone());
                    this.cid_and_waker = Some((*cid, cx.waker().clone()));
                }
                Some(_) => {}
            }
            Poll::Pending
        }
    }
}