Skip to main content

pcs_external/
client.rs

1//! [`PcsExternalClient`] — phantom-typed gRPC client for PCS External API.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use prost_types::value::Kind as PbValueKind;
7use prost_types::{Struct as PbStruct, Value as PbValue};
8use ppoppo_sdk_core::interceptor::{AuthInterceptor, BearerCredential};
9use ppoppo_sdk_core::token_cache::TokenCache;
10
11use crate::error::{Error, classify_status};
12use crate::proto::{
13    CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
14    SendRequestEvent as PbSendRequestEvent, SendRequestEventType as PbEventType,
15    SendRequestStatus as PbSendStatus, StreamSendRequestEventsReq,
16    external_message_service_client::ExternalMessageServiceClient,
17};
18use crate::scopes::{GetSendStatusCapable, PcsExternalScopeSet, SendAlertCapable};
19use crate::transport::ExternalChannel;
20use crate::types::{
21    DeliveryEvent, DeliveryEventKind, DeliveryStream, PollConfig, RecipientList, SendOutcome,
22    SendRequestId, SendRequestState, SendStatus, SendStatusTotals, TemplateId,
23};
24
25/// Phantom-typed gRPC client for the PCS External Developer Platform.
26///
27/// The type parameter `S: PcsExternalScopeSet` constrains which methods
28/// are callable at compile time. Construct via [`crate::PcsExternalClientBuilder`].
29///
30/// ## Capability gating
31///
32/// | Method | Required capability |
33/// |---|---|
34/// | `send_alert` | `S: SendAlertCapable` |
35/// | `get_send_status` | `S: GetSendStatusCapable` |
36///
37/// Calling a method your scope set doesn't satisfy is a compile-time
38/// E0277 — see [`crate::scopes`] for the compile_fail doctest.
39pub struct PcsExternalClient<S: PcsExternalScopeSet> {
40    pub(crate) channel: ExternalChannel,
41    pub(crate) cache: Arc<TokenCache>,
42    pub(crate) _scope: PhantomData<S>,
43}
44
45// Manual Clone: ExternalChannel and Arc<TokenCache> are both Clone; PhantomData is always Clone.
46impl<S: PcsExternalScopeSet> Clone for PcsExternalClient<S> {
47    fn clone(&self) -> Self {
48        Self {
49            channel: self.channel.clone(),
50            cache: Arc::clone(&self.cache),
51            _scope: PhantomData,
52        }
53    }
54}
55
56impl<S: SendAlertCapable> PcsExternalClient<S> {
57    /// Send a batch of templated messages to `recipients`.
58    ///
59    /// Translates to `ExternalMessageService::CreateSendRequest`.
60    ///
61    /// # Errors
62    ///
63    /// - [`Error::TokenRefresh`] — JWT acquisition failed before the call.
64    /// - [`Error::Rejected`] — PCS rejected the request (bad template, quota, etc.).
65    /// - [`Error::ServerError`] — PCS 5xx, retry-eligible.
66    /// - [`Error::Transport`] — network-level failure.
67    pub async fn send_alert(
68        &self,
69        template: &TemplateId,
70        recipients: &RecipientList,
71        poll: Option<&PollConfig>,
72    ) -> Result<SendOutcome, Error> {
73        let token = self.cache.get().await?;
74        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
75        let mut client =
76            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
77
78        let req = CreateSendRequestReq {
79            template_id: template.0.clone(),
80            recipients: recipients
81                .iter()
82                .map(|r| RecipientInput {
83                    ppnum: r.ppnum.as_str().to_string(),
84                    vars: Some(vars_to_pb_struct(&r.vars)),
85                })
86                .collect(),
87            poll_config: poll.map(|p| PbPollConfig {
88                expires_in_hours: p.expires_in_hours,
89                allow_multiple: p.allow_multiple,
90            }),
91        };
92
93        let resp = client
94            .create_send_request(tonic::Request::new(req))
95            .await
96            .map_err(|s| classify_status(&s))?
97            .into_inner();
98
99        Ok(SendOutcome {
100            id: SendRequestId(resp.id),
101            state: pb_status_to_state(resp.status),
102            total_recipients: i32_to_u32(resp.total_recipients),
103        })
104    }
105}
106
107impl<S: GetSendStatusCapable> PcsExternalClient<S> {
108    /// Poll aggregate delivery status for a previously-issued send request.
109    ///
110    /// Translates to `ExternalMessageService::GetSendRequestStatus`.
111    ///
112    /// # Errors
113    ///
114    /// Same shape as [`PcsExternalClient::send_alert`].
115    pub async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, Error> {
116        let token = self.cache.get().await?;
117        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
118        let mut client =
119            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
120
121        let req = GetSendRequestStatusReq { id: id.0.clone() };
122        let resp = client
123            .get_send_request_status(tonic::Request::new(req))
124            .await
125            .map_err(|s| classify_status(&s))?
126            .into_inner();
127
128        let summary = resp
129            .request
130            .ok_or_else(|| Error::ProtoMismatch("GetSendRequestStatusResp.request was None".into()))?;
131
132        Ok(SendStatus {
133            id: SendRequestId(summary.id),
134            state: pb_status_to_state(summary.status),
135            totals: SendStatusTotals {
136                total: i32_to_u32(summary.total_recipients),
137                delivered: i32_to_u32(summary.delivered),
138                pending_consent: i32_to_u32(summary.pending_consent),
139                failed: i32_to_u32(summary.failed),
140            },
141        })
142    }
143
144    /// Open a server-streaming delivery event channel.
145    ///
146    /// Streams per-recipient lifecycle events for all send requests issued by
147    /// this app. Pass `after_event_id` to resume from a cursor after a
148    /// reconnect so no events are replayed.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`crate::Error::TokenRefresh`] if the JWT cannot be acquired,
153    /// [`crate::Error::Rejected`] / [`crate::Error::Transport`] if the
154    /// initial RPC handshake fails. Stream-level errors surface through
155    /// [`DeliveryStream::message`].
156    pub async fn stream_send_request_events(
157        &self,
158        send_request_id: Option<&SendRequestId>,
159        after_event_id: Option<&str>,
160    ) -> Result<DeliveryStream, crate::Error> {
161        let token = self.cache.get().await?;
162        let interceptor = AuthInterceptor::new(BearerCredential::new(token));
163        let mut client =
164            ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
165
166        let req = StreamSendRequestEventsReq {
167            send_request_id: send_request_id.map(|id| id.0.clone()),
168            after_event_id: after_event_id.map(String::from),
169        };
170
171        let stream = client
172            .stream_send_request_events(tonic::Request::new(req))
173            .await
174            .map_err(|s| classify_status(&s))?
175            .into_inner();
176
177        use futures_util::StreamExt as _;
178        let mapped = stream
179            .map(|item| item.map(proto_event_to_delivery).map_err(|s| classify_status(&s)));
180
181        Ok(DeliveryStream::new(mapped))
182    }
183}
184
185fn proto_event_to_delivery(evt: PbSendRequestEvent) -> DeliveryEvent {
186    // Prost strips the enum name prefix: SEND_REQUEST_EVENT_TYPE_ → Unspecified.
187    let kind = match PbEventType::try_from(evt.event_type).unwrap_or(PbEventType::Unspecified) {
188        PbEventType::RecipientDelivered => DeliveryEventKind::RecipientDelivered {
189            ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
190            message_id: evt.recipient.as_ref().and_then(|r| r.message_id.clone()),
191        },
192        PbEventType::RecipientFailed => DeliveryEventKind::RecipientFailed {
193            ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
194            error_code: evt.recipient.as_ref().and_then(|r| r.error_code.clone()),
195        },
196        PbEventType::RecipientPendingConsent => DeliveryEventKind::RecipientPendingConsent {
197            ppnum: evt.recipient.map(|r| r.ppnum).unwrap_or_default(),
198        },
199        PbEventType::ConsentGranted => DeliveryEventKind::ConsentGranted,
200        PbEventType::ConsentDenied => DeliveryEventKind::ConsentDenied,
201        PbEventType::RequestCompleted => DeliveryEventKind::RequestCompleted,
202        PbEventType::PollResponseReceived => DeliveryEventKind::PollResponseReceived,
203        PbEventType::Unspecified => DeliveryEventKind::Unknown,
204    };
205    DeliveryEvent {
206        event_id: evt.event_id,
207        send_request_id: SendRequestId::new(evt.send_request_id),
208        kind,
209        occurred_at: evt.occurred_at,
210    }
211}
212
213fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
214    let fields = vars
215        .iter()
216        .map(|(k, v)| {
217            (
218                k.clone(),
219                PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
220            )
221        })
222        .collect();
223    PbStruct { fields }
224}
225
226fn pb_status_to_state(s: i32) -> SendRequestState {
227    match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
228        PbSendStatus::Queued => SendRequestState::Queued,
229        PbSendStatus::Processing => SendRequestState::Processing,
230        PbSendStatus::Completed => SendRequestState::Completed,
231        PbSendStatus::Failed => SendRequestState::Failed,
232        PbSendStatus::Unspecified => SendRequestState::Unknown,
233    }
234}
235
236fn i32_to_u32(n: i32) -> u32 {
237    // PCS proto uses i32 for counters that are semantically u32. Saturate
238    // negatives to 0 — they indicate a server bug, not a real count.
239    n.max(0) as u32
240}
241
242#[cfg(test)]
243#[allow(clippy::unwrap_used, clippy::expect_used)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn pb_status_mapping_covers_all_variants() {
249        assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
250        assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
251        assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
252        assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
253        assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
254        assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
255    }
256
257    #[test]
258    fn i32_to_u32_saturates_negatives() {
259        assert_eq!(i32_to_u32(-5), 0);
260        assert_eq!(i32_to_u32(0), 0);
261        assert_eq!(i32_to_u32(42), 42);
262    }
263
264    #[test]
265    fn vars_translation_emits_string_values() {
266        let mut vars = std::collections::BTreeMap::new();
267        vars.insert("name".into(), "Daisy".into());
268        let s = vars_to_pb_struct(&vars);
269        assert_eq!(s.fields.len(), 1);
270        let name_v = s.fields.get("name").unwrap();
271        assert!(matches!(&name_v.kind, Some(PbValueKind::StringValue(v)) if v == "Daisy"));
272    }
273}