pcs_external/pcs_port/
adapter.rs1use std::env;
9use std::sync::Arc;
10
11use prost_types::value::Kind as PbValueKind;
12use prost_types::{Struct as PbStruct, Value as PbValue};
13
14use super::failure::{PcsFailure, classify_status, classify_transport};
15use super::port::{PcsExternalPort, RawPcsChannel};
16use super::types::{
17 PollConfig, RecipientList, SendOutcome, SendRequestId, SendRequestState, SendStatus,
18 SendStatusTotals, TemplateId,
19};
20use crate::error::Error;
21use crate::external::proto::external_message_service_client::ExternalMessageServiceClient;
22use crate::external::proto::{
23 CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
24 SendRequestStatus as PbSendStatus,
25};
26#[allow(deprecated)] use crate::external::connect;
28use crate::external::{ExternalChannel, auth_request};
29
30#[derive(Debug, Clone)]
36pub struct GrpcPcsAdapter {
37 channel: ExternalChannel,
38 api_key: Arc<str>,
39}
40
41impl GrpcPcsAdapter {
42 pub async fn connect(endpoint: &str, api_key: impl Into<String>) -> Result<Self, Error> {
54 let api_key = api_key.into();
55 validate_api_key(&api_key)?;
56 #[allow(deprecated)]
57 let channel = connect(endpoint).await?;
58 Ok(Self { channel, api_key: Arc::from(api_key) })
59 }
60
61 pub async fn from_env() -> Result<Self, Error> {
72 let endpoint = env::var("PCS_EXTERNAL_ENDPOINT").map_err(|_| {
73 Error::External("PCS_EXTERNAL_ENDPOINT environment variable is not set".into())
74 })?;
75 let api_key = env::var("PCS_API_KEY")
76 .map_err(|_| Error::External("PCS_API_KEY environment variable is not set".into()))?;
77 Self::connect(&endpoint, api_key).await
78 }
79
80 pub fn from_parts(channel: ExternalChannel, api_key: impl Into<String>) -> Result<Self, Error> {
88 let api_key = api_key.into();
89 validate_api_key(&api_key)?;
90 Ok(Self { channel, api_key: Arc::from(api_key) })
91 }
92}
93
94fn validate_api_key(api_key: &str) -> Result<(), Error> {
95 let value = format!("Bearer {api_key}");
98 value
99 .parse::<tonic::metadata::MetadataValue<_>>()
100 .map_err(|_| Error::InvalidApiKey)?;
101 Ok(())
102}
103
104impl PcsExternalPort for GrpcPcsAdapter {
105 async fn send_alert(
106 &self,
107 template: &TemplateId,
108 recipients: &RecipientList,
109 poll: Option<&PollConfig>,
110 ) -> Result<SendOutcome, PcsFailure> {
111 let req = CreateSendRequestReq {
112 template_id: template.0.clone(),
113 recipients: recipients
114 .iter()
115 .map(|r| RecipientInput {
116 ppnum: r.ppnum.as_str().to_string(),
117 vars: Some(vars_to_pb_struct(&r.vars)),
118 })
119 .collect(),
120 poll_config: poll.map(|p| PbPollConfig {
121 expires_in_hours: p.expires_in_hours,
122 allow_multiple: p.allow_multiple,
123 }),
124 };
125 let request = auth_request(&self.api_key, req).map_err(|e| {
126 classify_transport(format!("auth_request rejected pre-validated key: {e}"))
129 })?;
130 let mut client = ExternalMessageServiceClient::new(self.channel.clone());
131 let resp = client
132 .create_send_request(request)
133 .await
134 .map_err(|s| classify_status(&s))?
135 .into_inner();
136 Ok(SendOutcome {
137 id: SendRequestId(resp.id),
138 state: pb_status_to_state(resp.status),
139 total_recipients: u32_from_i32(resp.total_recipients),
140 })
141 }
142
143 async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, PcsFailure> {
144 let req = GetSendRequestStatusReq { id: id.0.clone() };
145 let request = auth_request(&self.api_key, req).map_err(|e| {
146 classify_transport(format!("auth_request rejected pre-validated key: {e}"))
147 })?;
148 let mut client = ExternalMessageServiceClient::new(self.channel.clone());
149 let resp = client
150 .get_send_request_status(request)
151 .await
152 .map_err(|s| classify_status(&s))?
153 .into_inner();
154 let summary = resp.request.ok_or_else(|| {
155 classify_transport("GetSendRequestStatusResp.request was None")
156 })?;
157 Ok(SendStatus {
158 id: SendRequestId(summary.id),
159 state: pb_status_to_state(summary.status),
160 totals: SendStatusTotals {
161 total: u32_from_i32(summary.total_recipients),
162 delivered: u32_from_i32(summary.delivered),
163 pending_consent: u32_from_i32(summary.pending_consent),
164 failed: u32_from_i32(summary.failed),
165 },
166 })
167 }
168
169 fn raw_channel(&self) -> RawPcsChannel<'_> {
170 RawPcsChannel { channel: self.channel.clone(), api_key: &self.api_key }
171 }
172}
173
174fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
175 let mut fields = std::collections::BTreeMap::new();
176 for (k, v) in vars {
177 fields.insert(
178 k.clone(),
179 PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
180 );
181 }
182 PbStruct { fields: fields.into_iter().collect() }
184}
185
186fn pb_status_to_state(s: i32) -> SendRequestState {
187 match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
189 PbSendStatus::Queued => SendRequestState::Queued,
190 PbSendStatus::Processing => SendRequestState::Processing,
191 PbSendStatus::Completed => SendRequestState::Completed,
192 PbSendStatus::Failed => SendRequestState::Failed,
193 PbSendStatus::Unspecified => SendRequestState::Unknown,
194 }
195}
196
197fn u32_from_i32(n: i32) -> u32 {
198 n.max(0) as u32
201}
202
203#[cfg(test)]
204#[allow(clippy::unwrap_used, clippy::expect_used)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn pb_status_mapping_covers_all_variants() {
210 assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
211 assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
212 assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
213 assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
214 assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
215 assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
218 }
219
220 #[test]
221 fn u32_from_i32_saturates_negatives() {
222 assert_eq!(u32_from_i32(-5), 0);
223 assert_eq!(u32_from_i32(0), 0);
224 assert_eq!(u32_from_i32(42), 42);
225 }
226
227 #[test]
228 fn vars_translation_emits_string_values() {
229 let mut vars = std::collections::BTreeMap::new();
230 vars.insert("name".into(), "Daisy".into());
231 vars.insert("date".into(), "2026-05-01".into());
232 let s = vars_to_pb_struct(&vars);
233 assert_eq!(s.fields.len(), 2);
234 let name_v = s.fields.get("name").unwrap();
235 assert!(matches!(
236 &name_v.kind,
237 Some(PbValueKind::StringValue(v)) if v == "Daisy"
238 ));
239 }
240
241 #[test]
242 fn validate_api_key_rejects_newline() {
243 let err = validate_api_key("pk_live_abc\r\nX-Inj: bad").unwrap_err();
244 assert!(matches!(err, Error::InvalidApiKey));
245 }
246
247 #[test]
248 fn validate_api_key_accepts_normal() {
249 validate_api_key("pk_live_abc123").unwrap();
250 }
251}