Skip to main content

pcs_external/pcs_port/
adapter.rs

1//! `GrpcPcsAdapter` — production adapter implementing
2//! [`PcsExternalPort`](super::PcsExternalPort) over gRPC.
3//!
4//! Owns the path-prefix-aware [`ExternalChannel`] and the API key. All
5//! `tonic` / `prost` types stay inside this file — port methods return
6//! domain types only.
7
8use std::env;
9use std::sync::Arc;
10
11use prost_types::value::Kind as PbValueKind;
12use prost_types::{Struct as PbStruct, Value as PbValue};
13
14use super::failure::{PcsFailure, classify_status, classify_transport};
15use super::port::{PcsExternalPort, RawPcsChannel};
16use super::types::{
17    PollConfig, RecipientList, SendOutcome, SendRequestId, SendRequestState, SendStatus,
18    SendStatusTotals, TemplateId,
19};
20use crate::error::Error;
21use crate::external::proto::external_message_service_client::ExternalMessageServiceClient;
22use crate::external::proto::{
23    CreateSendRequestReq, GetSendRequestStatusReq, PollConfig as PbPollConfig, RecipientInput,
24    SendRequestStatus as PbSendStatus,
25};
26#[allow(deprecated)] // Internal use; escape-hatch consumers go through `connect` themselves.
27use crate::external::connect;
28use crate::external::{ExternalChannel, auth_request};
29
30/// Production [`PcsExternalPort`] adapter wrapping an [`ExternalChannel`].
31///
32/// Cheap-cloneable (the inner [`ExternalChannel`] holds an `Arc<Channel>`
33/// and the API key is `Arc<str>`); intended to live in shared application
34/// state.
35#[derive(Debug, Clone)]
36pub struct GrpcPcsAdapter {
37    channel: ExternalChannel,
38    api_key: Arc<str>,
39}
40
41impl GrpcPcsAdapter {
42    /// Build an adapter from explicit endpoint + API key.
43    ///
44    /// Validates that `api_key` parses as an HTTP header value (no
45    /// CR/LF/NUL) at construction; calls onto the resulting adapter
46    /// then never re-validate.
47    ///
48    /// # Errors
49    ///
50    /// Returns [`Error::InvalidApiKey`] if `api_key` contains characters
51    /// that cannot be sent as an HTTP `Authorization` value, plus the
52    /// transport-construction errors of [`connect`].
53    pub async fn connect(endpoint: &str, api_key: impl Into<String>) -> Result<Self, Error> {
54        let api_key = api_key.into();
55        validate_api_key(&api_key)?;
56        #[allow(deprecated)]
57        let channel = connect(endpoint).await?;
58        Ok(Self { channel, api_key: Arc::from(api_key) })
59    }
60
61    /// Default constructor reading `PCS_EXTERNAL_ENDPOINT` and
62    /// `PCS_API_KEY` from the environment.
63    ///
64    /// Covers the 80% configuration path documented in the workspace
65    /// `CLAUDE.md` Quick Start. Use [`Self::connect`] to override.
66    ///
67    /// # Errors
68    ///
69    /// Returns [`Error::External`] when either env var is unset, plus
70    /// the construction errors of [`Self::connect`].
71    pub async fn from_env() -> Result<Self, Error> {
72        let endpoint = env::var("PCS_EXTERNAL_ENDPOINT").map_err(|_| {
73            Error::External("PCS_EXTERNAL_ENDPOINT environment variable is not set".into())
74        })?;
75        let api_key = env::var("PCS_API_KEY")
76            .map_err(|_| Error::External("PCS_API_KEY environment variable is not set".into()))?;
77        Self::connect(&endpoint, api_key).await
78    }
79
80    /// Construct from a pre-built [`ExternalChannel`] (advanced — useful
81    /// when the caller has already configured the channel via
82    /// [`connect`] and wants to share it across adapters).
83    ///
84    /// # Errors
85    ///
86    /// Returns [`Error::InvalidApiKey`] for invalid header characters.
87    pub fn from_parts(channel: ExternalChannel, api_key: impl Into<String>) -> Result<Self, Error> {
88        let api_key = api_key.into();
89        validate_api_key(&api_key)?;
90        Ok(Self { channel, api_key: Arc::from(api_key) })
91    }
92}
93
94fn validate_api_key(api_key: &str) -> Result<(), Error> {
95    // Same shape as `auth_request` — surface the failure at construction
96    // rather than per-call.
97    let value = format!("Bearer {api_key}");
98    value
99        .parse::<tonic::metadata::MetadataValue<_>>()
100        .map_err(|_| Error::InvalidApiKey)?;
101    Ok(())
102}
103
104impl PcsExternalPort for GrpcPcsAdapter {
105    async fn send_alert(
106        &self,
107        template: &TemplateId,
108        recipients: &RecipientList,
109        poll: Option<&PollConfig>,
110    ) -> Result<SendOutcome, PcsFailure> {
111        let req = CreateSendRequestReq {
112            template_id: template.0.clone(),
113            recipients: recipients
114                .iter()
115                .map(|r| RecipientInput {
116                    ppnum: r.ppnum.as_str().to_string(),
117                    vars: Some(vars_to_pb_struct(&r.vars)),
118                })
119                .collect(),
120            poll_config: poll.map(|p| PbPollConfig {
121                expires_in_hours: p.expires_in_hours,
122                allow_multiple: p.allow_multiple,
123            }),
124        };
125        let request = auth_request(&self.api_key, req).map_err(|e| {
126            // Constructor pre-validated the key, so reaching here implies
127            // tonic header semantics drifted — surface as transport.
128            classify_transport(format!("auth_request rejected pre-validated key: {e}"))
129        })?;
130        let mut client = ExternalMessageServiceClient::new(self.channel.clone());
131        let resp = client
132            .create_send_request(request)
133            .await
134            .map_err(|s| classify_status(&s))?
135            .into_inner();
136        Ok(SendOutcome {
137            id: SendRequestId(resp.id),
138            state: pb_status_to_state(resp.status),
139            total_recipients: u32_from_i32(resp.total_recipients),
140        })
141    }
142
143    async fn get_send_status(&self, id: &SendRequestId) -> Result<SendStatus, PcsFailure> {
144        let req = GetSendRequestStatusReq { id: id.0.clone() };
145        let request = auth_request(&self.api_key, req).map_err(|e| {
146            classify_transport(format!("auth_request rejected pre-validated key: {e}"))
147        })?;
148        let mut client = ExternalMessageServiceClient::new(self.channel.clone());
149        let resp = client
150            .get_send_request_status(request)
151            .await
152            .map_err(|s| classify_status(&s))?
153            .into_inner();
154        let summary = resp.request.ok_or_else(|| {
155            classify_transport("GetSendRequestStatusResp.request was None")
156        })?;
157        Ok(SendStatus {
158            id: SendRequestId(summary.id),
159            state: pb_status_to_state(summary.status),
160            totals: SendStatusTotals {
161                total: u32_from_i32(summary.total_recipients),
162                delivered: u32_from_i32(summary.delivered),
163                pending_consent: u32_from_i32(summary.pending_consent),
164                failed: u32_from_i32(summary.failed),
165            },
166        })
167    }
168
169    fn raw_channel(&self) -> RawPcsChannel<'_> {
170        RawPcsChannel { channel: self.channel.clone(), api_key: &self.api_key }
171    }
172}
173
174fn vars_to_pb_struct(vars: &std::collections::BTreeMap<String, String>) -> PbStruct {
175    let mut fields = std::collections::BTreeMap::new();
176    for (k, v) in vars {
177        fields.insert(
178            k.clone(),
179            PbValue { kind: Some(PbValueKind::StringValue(v.clone())) },
180        );
181    }
182    // prost_types::Struct wants HashMap, not BTreeMap; convert.
183    PbStruct { fields: fields.into_iter().collect() }
184}
185
186fn pb_status_to_state(s: i32) -> SendRequestState {
187    // i32 is the prost-emitted enum carrier; map via the generated enum.
188    match PbSendStatus::try_from(s).unwrap_or(PbSendStatus::Unspecified) {
189        PbSendStatus::Queued => SendRequestState::Queued,
190        PbSendStatus::Processing => SendRequestState::Processing,
191        PbSendStatus::Completed => SendRequestState::Completed,
192        PbSendStatus::Failed => SendRequestState::Failed,
193        PbSendStatus::Unspecified => SendRequestState::Unknown,
194    }
195}
196
197fn u32_from_i32(n: i32) -> u32 {
198    // PCS proto uses i32 for counters that are semantically u32. Saturate
199    // negatives to 0 — they would indicate a server bug, not a real count.
200    n.max(0) as u32
201}
202
203#[cfg(test)]
204#[allow(clippy::unwrap_used, clippy::expect_used)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn pb_status_mapping_covers_all_variants() {
210        assert_eq!(pb_status_to_state(0), SendRequestState::Unknown);
211        assert_eq!(pb_status_to_state(1), SendRequestState::Queued);
212        assert_eq!(pb_status_to_state(2), SendRequestState::Processing);
213        assert_eq!(pb_status_to_state(3), SendRequestState::Completed);
214        assert_eq!(pb_status_to_state(4), SendRequestState::Failed);
215        // Future-proof: an enum value the SDK doesn't recognize collapses
216        // to Unknown rather than panicking.
217        assert_eq!(pb_status_to_state(99), SendRequestState::Unknown);
218    }
219
220    #[test]
221    fn u32_from_i32_saturates_negatives() {
222        assert_eq!(u32_from_i32(-5), 0);
223        assert_eq!(u32_from_i32(0), 0);
224        assert_eq!(u32_from_i32(42), 42);
225    }
226
227    #[test]
228    fn vars_translation_emits_string_values() {
229        let mut vars = std::collections::BTreeMap::new();
230        vars.insert("name".into(), "Daisy".into());
231        vars.insert("date".into(), "2026-05-01".into());
232        let s = vars_to_pb_struct(&vars);
233        assert_eq!(s.fields.len(), 2);
234        let name_v = s.fields.get("name").unwrap();
235        assert!(matches!(
236            &name_v.kind,
237            Some(PbValueKind::StringValue(v)) if v == "Daisy"
238        ));
239    }
240
241    #[test]
242    fn validate_api_key_rejects_newline() {
243        let err = validate_api_key("pk_live_abc\r\nX-Inj: bad").unwrap_err();
244        assert!(matches!(err, Error::InvalidApiKey));
245    }
246
247    #[test]
248    fn validate_api_key_accepts_normal() {
249        validate_api_key("pk_live_abc123").unwrap();
250    }
251}