use std::marker::PhantomData;
use std::sync::Arc;
use prost_types::value::Kind as PbValueKind;
use prost_types::{Struct as PbStruct, Value as PbValue};
use ppoppo_sdk_core::interceptor::{AuthInterceptor, BearerCredential};
use ppoppo_sdk_core::token_cache::TokenCache;
use crate::error::{Error, classify_status};
use crate::proto::{
CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
SendRequestEvent as PbSendRequestEvent, SendRequestEventType as PbEventType,
SendRequestStatus as PbSendStatus, StreamSendRequestEventsReq,
external_message_service_client::ExternalMessageServiceClient,
};
use crate::scopes::{GetSendStatusCapable, PcsExternalScopeSet, SendAlertCapable};
use crate::transport::ExternalChannel;
use crate::types::{
DeliveryEvent, DeliveryEventKind, DeliveryStream, PollConfig, RecipientList, SendOutcome,
SendRequestId, SendRequestState, SendStatus, SendStatusTotals, TemplateId,
};
pub struct PcsExternalClient<S: PcsExternalScopeSet> {
pub(crate) channel: ExternalChannel,
pub(crate) cache: Arc<TokenCache>,
pub(crate) _scope: PhantomData<S>,
}
impl<S: PcsExternalScopeSet> Clone for PcsExternalClient<S> {
fn clone(&self) -> Self {
Self {
channel: self.channel.clone(),
cache: Arc::clone(&self.cache),
_scope: PhantomData,
}
}
}
impl<S: SendAlertCapable> PcsExternalClient<S> {
pub async fn send_alert(
&self,
template: &TemplateId,
recipients: &RecipientList,
poll: Option<&PollConfig>,
) -> Result<SendOutcome, Error> {
let token = self.cache.get().await?;
let interceptor = AuthInterceptor::new(BearerCredential::new(token));
let mut client =
ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
let req = CreateSendRequestReq {
template_id: template.0.clone(),
recipients: recipients
.iter()
.map(|r| RecipientInput {
ppnum: r.ppnum.as_str().to_string(),
vars: Some(vars_to_pb_struct(&r.vars)),
})
.collect(),
poll_config: poll.map(|p| PbPollConfig {
expires_in_hours: p.expires_in_hours,
allow_multiple: p.allow_multiple,
}),
};
let resp = client
.create_send_request(tonic::Request::new(req))
.await
.map_err(|s| classify_status(&s))?
.into_inner();
Ok(SendOutcome {
id: SendRequestId(resp.id),
state: pb_status_to_state(resp.status),
total_recipients: i32_to_u32(resp.total_recipients),
})
}
}
impl<S: GetSendStatusCapable> PcsExternalClient<S> {
pub async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, Error> {
let token = self.cache.get().await?;
let interceptor = AuthInterceptor::new(BearerCredential::new(token));
let mut client =
ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
let req = GetSendRequestStatusReq { id: id.0.clone() };
let resp = client
.get_send_request_status(tonic::Request::new(req))
.await
.map_err(|s| classify_status(&s))?
.into_inner();
let summary = resp
.request
.ok_or_else(|| Error::ProtoMismatch("GetSendRequestStatusResp.request was None".into()))?;
Ok(SendStatus {
id: SendRequestId(summary.id),
state: pb_status_to_state(summary.status),
totals: SendStatusTotals {
total: i32_to_u32(summary.total_recipients),
delivered: i32_to_u32(summary.delivered),
pending_consent: i32_to_u32(summary.pending_consent),
failed: i32_to_u32(summary.failed),
},
})
}
pub async fn stream_send_request_events(
&self,
send_request_id: Option<&SendRequestId>,
after_event_id: Option<&str>,
) -> Result<DeliveryStream, crate::Error> {
let token = self.cache.get().await?;
let interceptor = AuthInterceptor::new(BearerCredential::new(token));
let mut client =
ExternalMessageServiceClient::with_interceptor(self.channel.clone(), interceptor);
let req = StreamSendRequestEventsReq {
send_request_id: send_request_id.map(|id| id.0.clone()),
after_event_id: after_event_id.map(String::from),
};
let stream = client
.stream_send_request_events(tonic::Request::new(req))
.await
.map_err(|s| classify_status(&s))?
.into_inner();
use futures_util::StreamExt as _;
let mapped = stream
.map(|item| item.map(proto_event_to_delivery).map_err(|s| classify_status(&s)));
Ok(DeliveryStream::new(mapped))
}
}
fn proto_event_to_delivery(evt: PbSendRequestEvent) -> DeliveryEvent {
let kind = match PbEventType::try_from(evt.event_type).unwrap_or(PbEventType::Unspecified) {
PbEventType::RecipientDelivered => DeliveryEventKind::RecipientDelivered {
ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
message_id: evt.recipient.as_ref().and_then(|r| r.message_id.clone()),
},
PbEventType::RecipientFailed => DeliveryEventKind::RecipientFailed {
ppnum: evt.recipient.as_ref().map(|r| r.ppnum.clone()).unwrap_or_default(),
error_code: evt.recipient.as_ref().and_then(|r| r.error_code.clone()),
},
PbEventType::RecipientPendingConsent => DeliveryEventKind::RecipientPendingConsent {
ppnum: evt.recipient.map(|r| r.ppnum).unwrap_or_default(),
},
PbEventType::ConsentGranted => DeliveryEventKind::ConsentGranted,
PbEventType::ConsentDenied => DeliveryEventKind::ConsentDenied,
PbEventType::RequestCompleted => DeliveryEventKind::RequestCompleted,
PbEventType::PollResponseReceived => DeliveryEventKind::PollResponseReceived,
PbEventType::Unspecified => DeliveryEventKind::Unknown,
};
DeliveryEvent {
event_id: evt.event_id,
send_request_id: SendRequestId::new(evt.send_request_id),
kind,
occurred_at: evt.occurred_at,
}
}
fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
let fields = vars
.iter()
.map(|(k, v)| {
(
k.clone(),
PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
)
})
.collect();
PbStruct { fields }
}
fn pb_status_to_state(s: i32) -> SendRequestState {
match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
PbSendStatus::Queued => SendRequestState::Queued,
PbSendStatus::Processing => SendRequestState::Processing,
PbSendStatus::Completed => SendRequestState::Completed,
PbSendStatus::Failed => SendRequestState::Failed,
PbSendStatus::Unspecified => SendRequestState::Unknown,
}
}
fn i32_to_u32(n: i32) -> u32 {
n.max(0) as u32
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn pb_status_mapping_covers_all_variants() {
assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
}
#[test]
fn i32_to_u32_saturates_negatives() {
assert_eq!(i32_to_u32(-5), 0);
assert_eq!(i32_to_u32(0), 0);
assert_eq!(i32_to_u32(42), 42);
}
#[test]
fn vars_translation_emits_string_values() {
let mut vars = std::collections::BTreeMap::new();
vars.insert("name".into(), "Daisy".into());
let s = vars_to_pb_struct(&vars);
assert_eq!(s.fields.len(), 1);
let name_v = s.fields.get("name").unwrap();
assert!(matches!(&name_v.kind, Some(PbValueKind::StringValue(v)) if v == "Daisy"));
}
}