pcs-external 0.3.0

Ppoppo Chat System (PCS) External API client -- gRPC client for the External Developer Platform
Documentation
//! [`PcsExternalClient`] — phantom-typed gRPC client for PCS External API.

use std::marker::PhantomData;
use std::sync::Arc;

use prost_types::value::Kind as PbValueKind;
use prost_types::{Struct as PbStruct, Value as PbValue};
use ppoppo_sdk_core::interceptor::{AuthInterceptor, BearerCredential};
use ppoppo_sdk_core::token_cache::TokenCache;

use crate::error::{Error, classify_status};
use crate::proto::{
    CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
    SendRequestEvent as PbSendRequestEvent, SendRequestEventType as PbEventType,
    SendRequestStatus as PbSendStatus, StreamSendRequestEventsReq,
    external_message_service_client::ExternalMessageServiceClient,
};
use crate::scopes::{GetSendStatusCapable, PcsExternalScopeSet, SendAlertCapable};
use crate::transport::ExternalChannel;
use crate::types::{
    DeliveryEvent, DeliveryEventKind, DeliveryStream, PollConfig, RecipientList, SendOutcome,
    SendRequestId, SendRequestState, SendStatus, SendStatusTotals, TemplateId,
};

/// Phantom-typed gRPC client for the PCS External Developer Platform.
///
/// The type parameter `S: PcsExternalScopeSet` constrains which methods
/// are callable at compile time. Construct via [`crate::PcsExternalClientBuilder`].
///
/// ## Capability gating
///
/// | Method | Required capability |
/// |---|---|
/// | `send_alert` | `S: SendAlertCapable` |
/// | `get_send_status` | `S: GetSendStatusCapable` |
///
/// Calling a method your scope set doesn't satisfy is a compile-time
/// E0277 — see [`crate::scopes`] for the compile_fail doctest.
pub struct PcsExternalClient<S: PcsExternalScopeSet> {
    pub(crate) channel: ExternalChannel,
    pub(crate) cache: Arc<TokenCache>,
    pub(crate) _scope: PhantomData<S>,
}

// Manual Clone: ExternalChannel and Arc<TokenCache> are both Clone; PhantomData is always Clone.
impl<S: PcsExternalScopeSet> Clone for PcsExternalClient<S> {
    fn clone(&self) -> Self {
        Self {
            channel: self.channel.clone(),
            cache: Arc::clone(&self.cache),
            _scope: PhantomData,
        }
    }
}

impl<S: SendAlertCapable> PcsExternalClient<S> {
    /// Send a batch of templated messages to `recipients`.
    ///
    /// Translates to `ExternalMessageService::CreateSendRequest`.
    ///
    /// # Errors
    ///
    /// - [`Error::TokenRefresh`] — JWT acquisition failed before the call.
    /// - [`Error::Rejected`] — PCS rejected the request (bad template, quota, etc.).
    /// - [`Error::ServerError`] — PCS 5xx, retry-eligible.
    /// - [`Error::Transport`] — network-level failure.
    pub async fn send_alert(
        &self,
        template: &TemplateId,
        recipients: &RecipientList,
        poll: Option<&PollConfig>,
    ) -> Result<SendOutcome, Error> {
        let token = self.cache.get().await?;
        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
        let mut client =
            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);

        let req = CreateSendRequestReq {
            template_id: template.0.clone(),
            recipients: recipients
                .iter()
                .map(|r| RecipientInput {
                    ppnum: r.ppnum.as_str().to_string(),
                    vars: Some(vars_to_pb_struct(&r.vars)),
                })
                .collect(),
            poll_config: poll.map(|p| PbPollConfig {
                expires_in_hours: p.expires_in_hours,
                allow_multiple: p.allow_multiple,
            }),
        };

        let resp = client
            .create_send_request(tonic::Request::new(req))
            .await
            .map_err(|s| classify_status(&s))?
            .into_inner();

        Ok(SendOutcome {
            id: SendRequestId(resp.id),
            state: pb_status_to_state(resp.status),
            total_recipients: i32_to_u32(resp.total_recipients),
        })
    }
}

impl<S: GetSendStatusCapable> PcsExternalClient<S> {
    /// Poll aggregate delivery status for a previously-issued send request.
    ///
    /// Translates to `ExternalMessageService::GetSendRequestStatus`.
    ///
    /// # Errors
    ///
    /// Same shape as [`PcsExternalClient::send_alert`].
    pub async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, Error> {
        let token = self.cache.get().await?;
        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
        let mut client =
            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);

        let req = GetSendRequestStatusReq { id: id.0.clone() };
        let resp = client
            .get_send_request_status(tonic::Request::new(req))
            .await
            .map_err(|s| classify_status(&s))?
            .into_inner();

        let summary = resp
            .request
            .ok_or_else(|| Error::ProtoMismatch("GetSendRequestStatusResp.request was None".into()))?;

        Ok(SendStatus {
            id: SendRequestId(summary.id),
            state: pb_status_to_state(summary.status),
            totals: SendStatusTotals {
                total: i32_to_u32(summary.total_recipients),
                delivered: i32_to_u32(summary.delivered),
                pending_consent: i32_to_u32(summary.pending_consent),
                failed: i32_to_u32(summary.failed),
            },
        })
    }

    /// Open a server-streaming delivery event channel.
    ///
    /// Streams per-recipient lifecycle events for all send requests issued by
    /// this app. Pass `after_event_id` to resume from a cursor after a
    /// reconnect so no events are replayed.
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::TokenRefresh`] if the JWT cannot be acquired,
    /// [`crate::Error::Rejected`] / [`crate::Error::Transport`] if the
    /// initial RPC handshake fails. Stream-level errors surface through
    /// [`DeliveryStream::message`].
    pub async fn stream_send_request_events(
        &self,
        send_request_id: Option<&SendRequestId>,
        after_event_id: Option<&str>,
    ) -> Result<DeliveryStream, crate::Error> {
        let token = self.cache.get().await?;
        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
        let mut client =
            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);

        let req = StreamSendRequestEventsReq {
            send_request_id: send_request_id.map(|id| id.0.clone()),
            after_event_id: after_event_id.map(String::from),
        };

        let stream = client
            .stream_send_request_events(tonic::Request::new(req))
            .await
            .map_err(|s| classify_status(&s))?
            .into_inner();

        use futures_util::StreamExt as _;
        let mapped = stream
            .map(|item| item.map(proto_event_to_delivery).map_err(|s| classify_status(&s)));

        Ok(DeliveryStream::new(mapped))
    }
}

fn proto_event_to_delivery(evt: PbSendRequestEvent) -> DeliveryEvent {
    // Prost strips the enum name prefix: SEND_REQUEST_EVENT_TYPE_ → Unspecified.
    let kind = match PbEventType::try_from(evt.event_type).unwrap_or(PbEventType::Unspecified) {
        PbEventType::RecipientDelivered => DeliveryEventKind::RecipientDelivered {
            ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
            message_id: evt.recipient.as_ref().and_then(|r| r.message_id.clone()),
        },
        PbEventType::RecipientFailed => DeliveryEventKind::RecipientFailed {
            ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
            error_code: evt.recipient.as_ref().and_then(|r| r.error_code.clone()),
        },
        PbEventType::RecipientPendingConsent => DeliveryEventKind::RecipientPendingConsent {
            ppnum: evt.recipient.map(|r| r.ppnum).unwrap_or_default(),
        },
        PbEventType::ConsentGranted => DeliveryEventKind::ConsentGranted,
        PbEventType::ConsentDenied => DeliveryEventKind::ConsentDenied,
        PbEventType::RequestCompleted => DeliveryEventKind::RequestCompleted,
        PbEventType::PollResponseReceived => DeliveryEventKind::PollResponseReceived,
        PbEventType::Unspecified => DeliveryEventKind::Unknown,
    };
    DeliveryEvent {
        event_id: evt.event_id,
        send_request_id: SendRequestId::new(evt.send_request_id),
        kind,
        occurred_at: evt.occurred_at,
    }
}

fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
    let fields = vars
        .iter()
        .map(|(k, v)| {
            (
                k.clone(),
                PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
            )
        })
        .collect();
    PbStruct { fields }
}

fn pb_status_to_state(s: i32) -> SendRequestState {
    match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
        PbSendStatus::Queued => SendRequestState::Queued,
        PbSendStatus::Processing => SendRequestState::Processing,
        PbSendStatus::Completed => SendRequestState::Completed,
        PbSendStatus::Failed => SendRequestState::Failed,
        PbSendStatus::Unspecified => SendRequestState::Unknown,
    }
}

fn i32_to_u32(n: i32) -> u32 {
    // PCS proto uses i32 for counters that are semantically u32. Saturate
    // negatives to 0 — they indicate a server bug, not a real count.
    n.max(0) as u32
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;

    #[test]
    fn pb_status_mapping_covers_all_variants() {
        assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
        assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
        assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
        assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
        assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
        assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
    }

    #[test]
    fn i32_to_u32_saturates_negatives() {
        assert_eq!(i32_to_u32(-5), 0);
        assert_eq!(i32_to_u32(0), 0);
        assert_eq!(i32_to_u32(42), 42);
    }

    #[test]
    fn vars_translation_emits_string_values() {
        let mut vars = std::collections::BTreeMap::new();
        vars.insert("name".into(), "Daisy".into());
        let s = vars_to_pb_struct(&vars);
        assert_eq!(s.fields.len(), 1);
        let name_v = s.fields.get("name").unwrap();
        assert!(matches!(&name_v.kind, Some(PbValueKind::StringValue(v)) if v == "Daisy"));
    }
}