s2/
service.rs

1pub mod account;
2pub mod basin;
3pub mod stream;
4
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::StreamExt;
11use prost_types::method_options::IdempotencyLevel;
12use secrecy::{ExposeSecret, SecretString};
13use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};
14
15use crate::{client::ClientError, types};
16
17pub async fn send_request<T: ServiceRequest>(
18    mut service: T,
19    token: &SecretString,
20    basin_header: Option<AsciiMetadataValue>,
21) -> Result<T::Response, ClientError> {
22    let req = prepare_request(&mut service, token, basin_header)?;
23    match service.send(req).await {
24        Ok(resp) => Ok(service.parse_response(resp)?),
25        Err(status) => Err(ClientError::Service(status)),
26    }
27}
28
29fn prepare_request<T: ServiceRequest>(
30    service: &mut T,
31    token: &SecretString,
32    basin_header: Option<AsciiMetadataValue>,
33) -> Result<tonic::Request<T::ApiRequest>, types::ConvertError> {
34    let mut req = service.prepare_request()?;
35    add_authorization_header(req.metadata_mut(), token)?;
36    if let Some(basin) = basin_header {
37        req.metadata_mut()
38            .insert(AsciiMetadataKey::from_static("s2-basin"), basin);
39    }
40    Ok(req)
41}
42
43fn add_authorization_header(
44    meta: &mut MetadataMap,
45    token: &SecretString,
46) -> Result<(), types::ConvertError> {
47    let mut val: AsciiMetadataValue = format!("Bearer {}", token.expose_secret())
48        .try_into()
49        .map_err(|_| "failed to parse token as metadata value")?;
50    val.set_sensitive(true);
51    meta.insert("authorization", val);
52    Ok(())
53}
54
55pub(crate) fn add_s2_request_token_header(
56    meta: &mut MetadataMap,
57    s2_request_token: &str,
58) -> Result<(), types::ConvertError> {
59    let s2_request_token: AsciiMetadataValue = s2_request_token
60        .try_into()
61        .map_err(|_| "failed to parse token as metadata value")?;
62
63    meta.insert("s2-request-token", s2_request_token);
64
65    Ok(())
66}
67
68pub(crate) fn gen_s2_request_token() -> String {
69    uuid::Uuid::new_v4().simple().to_string()
70}
71
72pub trait ServiceRequest: std::fmt::Debug {
73    /// Request parameters generated by prost.
74    type ApiRequest;
75    /// Response to be returned by the RPC.
76    type Response;
77    /// Response generated by prost to be returned.
78    type ApiResponse;
79
80    /// Idempotency level for the underlying service.
81    const IDEMPOTENCY_LEVEL: IdempotencyLevel;
82
83    /// Take the request parameters and generate the corresponding tonic request.
84    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError>;
85
86    /// Actually send the tonic request to receive a raw response and the parsed error.
87    async fn send(
88        &mut self,
89        req: tonic::Request<Self::ApiRequest>,
90    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status>;
91
92    /// Return true if the request should be retried based on the error returned.
93    fn should_retry(&self, err: &ClientError) -> bool {
94        if Self::IDEMPOTENCY_LEVEL == IdempotencyLevel::IdempotencyUnknown {
95            return false;
96        };
97
98        // The request is definitely idempotent.
99        if let ClientError::Service(status) = err {
100            matches!(
101                status.code(),
102                tonic::Code::Unavailable
103                    | tonic::Code::DeadlineExceeded
104                    | tonic::Code::Cancelled
105                    | tonic::Code::Unknown
106            )
107        } else {
108            false
109        }
110    }
111
112    /// Take the tonic response and generate the response to be returned.
113    fn parse_response(
114        &self,
115        resp: tonic::Response<Self::ApiResponse>,
116    ) -> Result<Self::Response, types::ConvertError>;
117}
118
119pub trait StreamingRequest: Unpin {
120    type RequestItem;
121    type ApiRequestItem;
122
123    fn prepare_request_item(&self, req: Self::RequestItem) -> Self::ApiRequestItem;
124}
125
126pub struct ServiceStreamingRequest<R, S>
127where
128    R: StreamingRequest,
129    S: futures::Stream<Item = R::RequestItem> + Unpin,
130{
131    req: R,
132    stream: S,
133}
134
135impl<R, S> ServiceStreamingRequest<R, S>
136where
137    R: StreamingRequest,
138    S: futures::Stream<Item = R::RequestItem> + Unpin,
139{
140    pub fn new(req: R, stream: S) -> Self {
141        Self { req, stream }
142    }
143}
144
145impl<R, S> futures::Stream for ServiceStreamingRequest<R, S>
146where
147    R: StreamingRequest,
148    S: futures::Stream<Item = R::RequestItem> + Unpin,
149{
150    type Item = R::ApiRequestItem;
151
152    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153        match self.stream.poll_next_unpin(cx) {
154            Poll::Pending => Poll::Pending,
155            Poll::Ready(None) => Poll::Ready(None),
156            Poll::Ready(Some(req)) => Poll::Ready(Some(self.req.prepare_request_item(req))),
157        }
158    }
159}
160
161pub trait StreamingResponse: Unpin {
162    /// Response message item to be returned by the RPC stream.
163    type ResponseItem;
164    /// Response message item generated by prost in the stream.
165    type ApiResponseItem;
166
167    /// Take the tonic response message from stream item and generate stream item.
168    fn parse_response_item(
169        &self,
170        resp: Self::ApiResponseItem,
171    ) -> Result<Self::ResponseItem, ClientError>;
172}
173
174pub struct ServiceStreamingResponse<S: StreamingResponse> {
175    req: S,
176    stream: tonic::Streaming<S::ApiResponseItem>,
177}
178
179impl<S: StreamingResponse> ServiceStreamingResponse<S> {
180    pub fn new(req: S, stream: tonic::Streaming<S::ApiResponseItem>) -> Self {
181        Self { req, stream }
182    }
183}
184
185impl<S: StreamingResponse> futures::Stream for ServiceStreamingResponse<S> {
186    type Item = Result<S::ResponseItem, ClientError>;
187
188    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189        match self.stream.poll_next_unpin(cx) {
190            Poll::Pending => Poll::Pending,
191            Poll::Ready(None) => Poll::Ready(None),
192            Poll::Ready(Some(item)) => {
193                let item = match item {
194                    Ok(resp) => self.req.parse_response_item(resp),
195                    Err(status) => Err(ClientError::Service(status)),
196                };
197                Poll::Ready(Some(item))
198            }
199        }
200    }
201}
202
203/// Generic type for streaming response.
204pub type Streaming<R> = Pin<Box<dyn Send + futures::Stream<Item = Result<R, ClientError>>>>;