use std::env;
use std::sync::Arc;
use prost_types::value::Kind as PbValueKind;
use prost_types::{Struct as PbStruct, Value as PbValue};
use super::failure::{PcsFailure, classify_status, classify_transport};
use super::port::{PcsExternalPort, RawPcsChannel};
use super::types::{
PollConfig, RecipientList, SendOutcome, SendRequestId, SendRequestState, SendStatus,
SendStatusTotals, TemplateId,
};
use crate::error::Error;
use crate::external::proto::external_message_service_client::ExternalMessageServiceClient;
use crate::external::proto::{
CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
SendRequestStatus as PbSendStatus,
};
#[allow(deprecated)] use crate::external::connect;
use crate::external::{ExternalChannel, auth_request};
#[derive(Debug, Clone)]
pub struct GrpcPcsAdapter {
channel: ExternalChannel,
api_key: Arc<str>,
}
impl GrpcPcsAdapter {
pub async fn connect(endpoint: &str, api_key: impl Into<String>) -> Result<Self, Error> {
let api_key = api_key.into();
validate_api_key(&api_key)?;
#[allow(deprecated)]
let channel = connect(endpoint).await?;
Ok(Self { channel, api_key: Arc::from(api_key) })
}
pub async fn from_env() -> Result<Self, Error> {
let endpoint = env::var("PCS_EXTERNAL_ENDPOINT").map_err(|_| {
Error::External("PCS_EXTERNAL_ENDPOINT environment variable is not set".into())
})?;
let api_key = env::var("PCS_API_KEY")
.map_err(|_| Error::External("PCS_API_KEY environment variable is not set".into()))?;
Self::connect(&endpoint, api_key).await
}
pub fn from_parts(channel: ExternalChannel, api_key: impl Into<String>) -> Result<Self, Error> {
let api_key = api_key.into();
validate_api_key(&api_key)?;
Ok(Self { channel, api_key: Arc::from(api_key) })
}
}
fn validate_api_key(api_key: &str) -> Result<(), Error> {
let value = format!("Bearer {api_key}");
value
.parse::<tonic::metadata::MetadataValue<_>>()
.map_err(|_| Error::InvalidApiKey)?;
Ok(())
}
impl PcsExternalPort for GrpcPcsAdapter {
async fn send_alert(
&self,
template: &TemplateId,
recipients: &RecipientList,
poll: Option<&PollConfig>,
) -> Result<SendOutcome, PcsFailure> {
let req = CreateSendRequestReq {
template_id: template.0.clone(),
recipients: recipients
.iter()
.map(|r| RecipientInput {
ppnum: r.ppnum.as_str().to_string(),
vars: Some(vars_to_pb_struct(&r.vars)),
})
.collect(),
poll_config: poll.map(|p| PbPollConfig {
expires_in_hours: p.expires_in_hours,
allow_multiple: p.allow_multiple,
}),
};
let request = auth_request(&self.api_key, req).map_err(|e| {
classify_transport(format!("auth_request rejected pre-validated key: {e}"))
})?;
let mut client = ExternalMessageServiceClient::new(self.channel.clone());
let resp = client
.create_send_request(request)
.await
.map_err(|s| classify_status(&s))?
.into_inner();
Ok(SendOutcome {
id: SendRequestId(resp.id),
state: pb_status_to_state(resp.status),
total_recipients: u32_from_i32(resp.total_recipients),
})
}
async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, PcsFailure> {
let req = GetSendRequestStatusReq { id: id.0.clone() };
let request = auth_request(&self.api_key, req).map_err(|e| {
classify_transport(format!("auth_request rejected pre-validated key: {e}"))
})?;
let mut client = ExternalMessageServiceClient::new(self.channel.clone());
let resp = client
.get_send_request_status(request)
.await
.map_err(|s| classify_status(&s))?
.into_inner();
let summary = resp.request.ok_or_else(|| {
classify_transport("GetSendRequestStatusResp.request was None")
})?;
Ok(SendStatus {
id: SendRequestId(summary.id),
state: pb_status_to_state(summary.status),
totals: SendStatusTotals {
total: u32_from_i32(summary.total_recipients),
delivered: u32_from_i32(summary.delivered),
pending_consent: u32_from_i32(summary.pending_consent),
failed: u32_from_i32(summary.failed),
},
})
}
fn raw_channel(&self) -> RawPcsChannel<'_> {
RawPcsChannel { channel: self.channel.clone(), api_key: &self.api_key }
}
}
fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
let mut fields = std::collections::BTreeMap::new();
for (k, v) in vars {
fields.insert(
k.clone(),
PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
);
}
PbStruct { fields: fields.into_iter().collect() }
}
fn pb_status_to_state(s: i32) -> SendRequestState {
match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
PbSendStatus::Queued => SendRequestState::Queued,
PbSendStatus::Processing => SendRequestState::Processing,
PbSendStatus::Completed => SendRequestState::Completed,
PbSendStatus::Failed => SendRequestState::Failed,
PbSendStatus::Unspecified => SendRequestState::Unknown,
}
}
fn u32_from_i32(n: i32) -> u32 {
n.max(0) as u32
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn pb_status_mapping_covers_all_variants() {
assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
}
#[test]
fn u32_from_i32_saturates_negatives() {
assert_eq!(u32_from_i32(-5), 0);
assert_eq!(u32_from_i32(0), 0);
assert_eq!(u32_from_i32(42), 42);
}
#[test]
fn vars_translation_emits_string_values() {
let mut vars = std::collections::BTreeMap::new();
vars.insert("name".into(), "Daisy".into());
vars.insert("date".into(), "2026-05-01".into());
let s = vars_to_pb_struct(&vars);
assert_eq!(s.fields.len(), 2);
let name_v = s.fields.get("name").unwrap();
assert!(matches!(
&name_v.kind,
Some(PbValueKind::StringValue(v)) if v == "Daisy"
));
}
#[test]
fn validate_api_key_rejects_newline() {
let err = validate_api_key("pk_live_abc\r\nX-Inj: bad").unwrap_err();
assert!(matches!(err, Error::InvalidApiKey));
}
#[test]
fn validate_api_key_accepts_normal() {
validate_api_key("pk_live_abc123").unwrap();
}
}