1use std::marker::PhantomData;
4use std::sync::Arc;
5
6use prost_types::value::Kind as PbValueKind;
7use prost_types::{Struct as PbStruct, Value as PbValue};
8use ppoppo_sdk_core::interceptor::{AuthInterceptor, BearerCredential};
9use ppoppo_sdk_core::token_cache::TokenCache;
10
11use crate::error::{Error, classify_status};
12use crate::proto::{
13 CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
14 SendRequestEvent as PbSendRequestEvent, SendRequestEventType as PbEventType,
15 SendRequestStatus as PbSendStatus, StreamSendRequestEventsReq,
16 external_message_service_client::ExternalMessageServiceClient,
17};
18use crate::scopes::{GetSendStatusCapable, PcsExternalScopeSet, SendAlertCapable};
19use crate::transport::ExternalChannel;
20use crate::types::{
21 DeliveryEvent, DeliveryEventKind, DeliveryStream, PollConfig, RecipientList, SendOutcome,
22 SendRequestId, SendRequestState, SendStatus, SendStatusTotals, TemplateId,
23};
24
25pub struct PcsExternalClient<S: PcsExternalScopeSet> {
40 pub(crate) channel: ExternalChannel,
41 pub(crate) cache: Arc<TokenCache>,
42 pub(crate) _scope: PhantomData<S>,
43}
44
45impl<S: PcsExternalScopeSet> Clone for PcsExternalClient<S> {
47 fn clone(&self) -> Self {
48 Self {
49 channel: self.channel.clone(),
50 cache: Arc::clone(&self.cache),
51 _scope: PhantomData,
52 }
53 }
54}
55
56impl<S: SendAlertCapable> PcsExternalClient<S> {
57 pub async fn send_alert(
68 &self,
69 template: &TemplateId,
70 recipients: &RecipientList,
71 poll: Option<&PollConfig>,
72 ) -> Result<SendOutcome, Error> {
73 let token = self.cache.get().await?;
74 let interceptor = AuthInterceptor::new(BearerCredential::new(token));
75 let mut client =
76 ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
77
78 let req = CreateSendRequestReq {
79 template_id: template.0.clone(),
80 recipients: recipients
81 .iter()
82 .map(|r| RecipientInput {
83 ppnum: r.ppnum.as_str().to_string(),
84 vars: Some(vars_to_pb_struct(&r.vars)),
85 })
86 .collect(),
87 poll_config: poll.map(|p| PbPollConfig {
88 expires_in_hours: p.expires_in_hours,
89 allow_multiple: p.allow_multiple,
90 }),
91 };
92
93 let resp = client
94 .create_send_request(tonic::Request::new(req))
95 .await
96 .map_err(|s| classify_status(&s))?
97 .into_inner();
98
99 Ok(SendOutcome {
100 id: SendRequestId(resp.id),
101 state: pb_status_to_state(resp.status),
102 total_recipients: i32_to_u32(resp.total_recipients),
103 })
104 }
105}
106
107impl<S: GetSendStatusCapable> PcsExternalClient<S> {
108 pub async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, Error> {
116 let token = self.cache.get().await?;
117 let interceptor = AuthInterceptor::new(BearerCredential::new(token));
118 let mut client =
119 ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
120
121 let req = GetSendRequestStatusReq { id: id.0.clone() };
122 let resp = client
123 .get_send_request_status(tonic::Request::new(req))
124 .await
125 .map_err(|s| classify_status(&s))?
126 .into_inner();
127
128 let summary = resp
129 .request
130 .ok_or_else(|| Error::ProtoMismatch("GetSendRequestStatusResp.request was None".into()))?;
131
132 Ok(SendStatus {
133 id: SendRequestId(summary.id),
134 state: pb_status_to_state(summary.status),
135 totals: SendStatusTotals {
136 total: i32_to_u32(summary.total_recipients),
137 delivered: i32_to_u32(summary.delivered),
138 pending_consent: i32_to_u32(summary.pending_consent),
139 failed: i32_to_u32(summary.failed),
140 },
141 })
142 }
143
144 pub async fn stream_send_request_events(
157 &self,
158 send_request_id: Option<&SendRequestId>,
159 after_event_id: Option<&str>,
160 ) -> Result<DeliveryStream, crate::Error> {
161 let token = self.cache.get().await?;
162 let interceptor = AuthInterceptor::new(BearerCredential::new(token));
163 let mut client =
164 ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
165
166 let req = StreamSendRequestEventsReq {
167 send_request_id: send_request_id.map(|id| id.0.clone()),
168 after_event_id: after_event_id.map(String::from),
169 };
170
171 let stream = client
172 .stream_send_request_events(tonic::Request::new(req))
173 .await
174 .map_err(|s| classify_status(&s))?
175 .into_inner();
176
177 use futures_util::StreamExt as _;
178 let mapped = stream
179 .map(|item| item.map(proto_event_to_delivery).map_err(|s| classify_status(&s)));
180
181 Ok(DeliveryStream::new(mapped))
182 }
183}
184
185fn proto_event_to_delivery(evt: PbSendRequestEvent) -> DeliveryEvent {
186 let kind = match PbEventType::try_from(evt.event_type).unwrap_or(PbEventType::Unspecified) {
188 PbEventType::RecipientDelivered => DeliveryEventKind::RecipientDelivered {
189 ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
190 message_id: evt.recipient.as_ref().and_then(|r| r.message_id.clone()),
191 },
192 PbEventType::RecipientFailed => DeliveryEventKind::RecipientFailed {
193 ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
194 error_code: evt.recipient.as_ref().and_then(|r| r.error_code.clone()),
195 },
196 PbEventType::RecipientPendingConsent => DeliveryEventKind::RecipientPendingConsent {
197 ppnum: evt.recipient.map(|r| r.ppnum).unwrap_or_default(),
198 },
199 PbEventType::ConsentGranted => DeliveryEventKind::ConsentGranted,
200 PbEventType::ConsentDenied => DeliveryEventKind::ConsentDenied,
201 PbEventType::RequestCompleted => DeliveryEventKind::RequestCompleted,
202 PbEventType::PollResponseReceived => DeliveryEventKind::PollResponseReceived,
203 PbEventType::Unspecified => DeliveryEventKind::Unknown,
204 };
205 DeliveryEvent {
206 event_id: evt.event_id,
207 send_request_id: SendRequestId::new(evt.send_request_id),
208 kind,
209 occurred_at: evt.occurred_at,
210 }
211}
212
213fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
214 let fields = vars
215 .iter()
216 .map(|(k, v)| {
217 (
218 k.clone(),
219 PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
220 )
221 })
222 .collect();
223 PbStruct { fields }
224}
225
226fn pb_status_to_state(s: i32) -> SendRequestState {
227 match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
228 PbSendStatus::Queued => SendRequestState::Queued,
229 PbSendStatus::Processing => SendRequestState::Processing,
230 PbSendStatus::Completed => SendRequestState::Completed,
231 PbSendStatus::Failed => SendRequestState::Failed,
232 PbSendStatus::Unspecified => SendRequestState::Unknown,
233 }
234}
235
236fn i32_to_u32(n: i32) -> u32 {
237 n.max(0) as u32
240}
241
242#[cfg(test)]
243#[allow(clippy::unwrap_used, clippy::expect_used)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn pb_status_mapping_covers_all_variants() {
249 assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
250 assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
251 assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
252 assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
253 assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
254 assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
255 }
256
257 #[test]
258 fn i32_to_u32_saturates_negatives() {
259 assert_eq!(i32_to_u32(-5), 0);
260 assert_eq!(i32_to_u32(0), 0);
261 assert_eq!(i32_to_u32(42), 42);
262 }
263
264 #[test]
265 fn vars_translation_emits_string_values() {
266 let mut vars = std::collections::BTreeMap::new();
267 vars.insert("name".into(), "Daisy".into());
268 let s = vars_to_pb_struct(&vars);
269 assert_eq!(s.fields.len(), 1);
270 let name_v = s.fields.get("name").unwrap();
271 assert!(matches!(&name_v.kind, Some(PbValueKind::StringValue(v)) if v == "Daisy"));
272 }
273}