pcs-external 0.2.0

Ppoppo Chat System (PCS) External API client -- gRPC client for the External Developer Platform
Documentation
//! `GrpcPcsAdapter` — production adapter implementing
//! [`PcsExternalPort`](super::PcsExternalPort) over gRPC.
//!
//! Owns the path-prefix-aware [`ExternalChannel`] and the API key. All
//! `tonic` / `prost` types stay inside this file — port methods return
//! domain types only.

use std::env;
use std::sync::Arc;

use prost_types::value::Kind as PbValueKind;
use prost_types::{Struct as PbStruct, Value as PbValue};

use super::failure::{PcsFailure, classify_status, classify_transport};
use super::port::{PcsExternalPort, RawPcsChannel};
use super::types::{
    PollConfig, RecipientList, SendOutcome, SendRequestId, SendRequestState, SendStatus,
    SendStatusTotals, TemplateId,
};
use crate::error::Error;
use crate::external::proto::external_message_service_client::ExternalMessageServiceClient;
use crate::external::proto::{
    CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
    SendRequestStatus as PbSendStatus,
};
#[allow(deprecated)] // Internal use; escape-hatch consumers go through `connect` themselves.
use crate::external::connect;
use crate::external::{ExternalChannel, auth_request};

/// Production [`PcsExternalPort`] adapter wrapping an [`ExternalChannel`].
///
/// Cheap-cloneable (the inner [`ExternalChannel`] holds an `Arc<Channel>`
/// and the API key is `Arc<str>`); intended to live in shared application
/// state.
#[derive(Debug, Clone)]
pub struct GrpcPcsAdapter {
    channel: ExternalChannel,
    api_key: Arc<str>,
}

impl GrpcPcsAdapter {
    /// Build an adapter from explicit endpoint + API key.
    ///
    /// Validates that `api_key` parses as an HTTP header value (no
    /// CR/LF/NUL) at construction; calls onto the resulting adapter
    /// then never re-validate.
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidApiKey`] if `api_key` contains characters
    /// that cannot be sent as an HTTP `Authorization` value, plus the
    /// transport-construction errors of [`connect`].
    pub async fn connect(endpoint: &str, api_key: impl Into<String>) -> Result<Self, Error> {
        let api_key = api_key.into();
        validate_api_key(&api_key)?;
        #[allow(deprecated)]
        let channel = connect(endpoint).await?;
        Ok(Self { channel, api_key: Arc::from(api_key) })
    }

    /// Default constructor reading `PCS_EXTERNAL_ENDPOINT` and
    /// `PCS_API_KEY` from the environment.
    ///
    /// Covers the 80% configuration path documented in the workspace
    /// `CLAUDE.md` Quick Start. Use [`Self::connect`] to override.
    ///
    /// # Errors
    ///
    /// Returns [`Error::External`] when either env var is unset, plus
    /// the construction errors of [`Self::connect`].
    pub async fn from_env() -> Result<Self, Error> {
        let endpoint = env::var("PCS_EXTERNAL_ENDPOINT").map_err(|_| {
            Error::External("PCS_EXTERNAL_ENDPOINT environment variable is not set".into())
        })?;
        let api_key = env::var("PCS_API_KEY")
            .map_err(|_| Error::External("PCS_API_KEY environment variable is not set".into()))?;
        Self::connect(&endpoint, api_key).await
    }

    /// Construct from a pre-built [`ExternalChannel`] (advanced — useful
    /// when the caller has already configured the channel via
    /// [`connect`] and wants to share it across adapters).
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidApiKey`] for invalid header characters.
    pub fn from_parts(channel: ExternalChannel, api_key: impl Into<String>) -> Result<Self, Error> {
        let api_key = api_key.into();
        validate_api_key(&api_key)?;
        Ok(Self { channel, api_key: Arc::from(api_key) })
    }
}

fn validate_api_key(api_key: &str) -> Result<(), Error> {
    // Same shape as `auth_request` — surface the failure at construction
    // rather than per-call.
    let value = format!("Bearer {api_key}");
    value
        .parse::<tonic::metadata::MetadataValue<_>>()
        .map_err(|_| Error::InvalidApiKey)?;
    Ok(())
}

impl PcsExternalPort for GrpcPcsAdapter {
    async fn send_alert(
        &self,
        template: &TemplateId,
        recipients: &RecipientList,
        poll: Option<&PollConfig>,
    ) -> Result<SendOutcome, PcsFailure> {
        let req = CreateSendRequestReq {
            template_id: template.0.clone(),
            recipients: recipients
                .iter()
                .map(|r| RecipientInput {
                    ppnum: r.ppnum.as_str().to_string(),
                    vars: Some(vars_to_pb_struct(&r.vars)),
                })
                .collect(),
            poll_config: poll.map(|p| PbPollConfig {
                expires_in_hours: p.expires_in_hours,
                allow_multiple: p.allow_multiple,
            }),
        };
        let request = auth_request(&self.api_key, req).map_err(|e| {
            // Constructor pre-validated the key, so reaching here implies
            // tonic header semantics drifted — surface as transport.
            classify_transport(format!("auth_request rejected pre-validated key: {e}"))
        })?;
        let mut client = ExternalMessageServiceClient::new(self.channel.clone());
        let resp = client
            .create_send_request(request)
            .await
            .map_err(|s| classify_status(&s))?
            .into_inner();
        Ok(SendOutcome {
            id: SendRequestId(resp.id),
            state: pb_status_to_state(resp.status),
            total_recipients: u32_from_i32(resp.total_recipients),
        })
    }

    async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, PcsFailure> {
        let req = GetSendRequestStatusReq { id: id.0.clone() };
        let request = auth_request(&self.api_key, req).map_err(|e| {
            classify_transport(format!("auth_request rejected pre-validated key: {e}"))
        })?;
        let mut client = ExternalMessageServiceClient::new(self.channel.clone());
        let resp = client
            .get_send_request_status(request)
            .await
            .map_err(|s| classify_status(&s))?
            .into_inner();
        let summary = resp.request.ok_or_else(|| {
            classify_transport("GetSendRequestStatusResp.request was None")
        })?;
        Ok(SendStatus {
            id: SendRequestId(summary.id),
            state: pb_status_to_state(summary.status),
            totals: SendStatusTotals {
                total: u32_from_i32(summary.total_recipients),
                delivered: u32_from_i32(summary.delivered),
                pending_consent: u32_from_i32(summary.pending_consent),
                failed: u32_from_i32(summary.failed),
            },
        })
    }

    fn raw_channel(&self) -> RawPcsChannel<'_> {
        RawPcsChannel { channel: self.channel.clone(), api_key: &self.api_key }
    }
}

fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
    let mut fields = std::collections::BTreeMap::new();
    for (k, v) in vars {
        fields.insert(
            k.clone(),
            PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
        );
    }
    // prost_types::Struct wants HashMap, not BTreeMap; convert.
    PbStruct { fields: fields.into_iter().collect() }
}

fn pb_status_to_state(s: i32) -> SendRequestState {
    // i32 is the prost-emitted enum carrier; map via the generated enum.
    match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
        PbSendStatus::Queued => SendRequestState::Queued,
        PbSendStatus::Processing => SendRequestState::Processing,
        PbSendStatus::Completed => SendRequestState::Completed,
        PbSendStatus::Failed => SendRequestState::Failed,
        PbSendStatus::Unspecified => SendRequestState::Unknown,
    }
}

fn u32_from_i32(n: i32) -> u32 {
    // PCS proto uses i32 for counters that are semantically u32. Saturate
    // negatives to 0 — they would indicate a server bug, not a real count.
    n.max(0) as u32
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;

    #[test]
    fn pb_status_mapping_covers_all_variants() {
        assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
        assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
        assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
        assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
        assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
        // Future-proof: an enum value the SDK doesn't recognize collapses
        // to Unknown rather than panicking.
        assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
    }

    #[test]
    fn u32_from_i32_saturates_negatives() {
        assert_eq!(u32_from_i32(-5), 0);
        assert_eq!(u32_from_i32(0), 0);
        assert_eq!(u32_from_i32(42), 42);
    }

    #[test]
    fn vars_translation_emits_string_values() {
        let mut vars = std::collections::BTreeMap::new();
        vars.insert("name".into(), "Daisy".into());
        vars.insert("date".into(), "2026-05-01".into());
        let s = vars_to_pb_struct(&vars);
        assert_eq!(s.fields.len(), 2);
        let name_v = s.fields.get("name").unwrap();
        assert!(matches!(
            &name_v.kind,
            Some(PbValueKind::StringValue(v)) if v == "Daisy"
        ));
    }

    #[test]
    fn validate_api_key_rejects_newline() {
        let err = validate_api_key("pk_live_abc\r\nX-Inj: bad").unwrap_err();
        assert!(matches!(err, Error::InvalidApiKey));
    }

    #[test]
    fn validate_api_key_accepts_normal() {
        validate_api_key("pk_live_abc123").unwrap();
    }
}