s2/
service.rs

1pub mod account;
2pub mod basin;
3pub mod stream;
4
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::StreamExt;
11use prost_types::method_options::IdempotencyLevel;
12use secrecy::{ExposeSecret, SecretString};
13use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};
14
15use crate::{client::ClientError, types};
16
17pub async fn send_request<T: ServiceRequest>(
18    mut service: T,
19    token: &SecretString,
20    basin_header: Option<AsciiMetadataValue>,
21) -> Result<T::Response, ClientError> {
22    let req = prepare_request(&mut service, token, basin_header)?;
23    match service.send(req).await {
24        Ok(resp) => Ok(service.parse_response(resp)?),
25        Err(status) => Err(ClientError::Service(status)),
26    }
27}
28
29fn prepare_request<T: ServiceRequest>(
30    service: &mut T,
31    token: &SecretString,
32    basin_header: Option<AsciiMetadataValue>,
33) -> Result<tonic::Request<T::ApiRequest>, types::ConvertError> {
34    let mut req = service.prepare_request()?;
35    add_authorization_header(req.metadata_mut(), token)?;
36    if let Some(basin) = basin_header {
37        req.metadata_mut()
38            .insert(AsciiMetadataKey::from_static("s2-basin"), basin);
39    }
40    Ok(req)
41}
42
43fn add_authorization_header(
44    meta: &mut MetadataMap,
45    token: &SecretString,
46) -> Result<(), types::ConvertError> {
47    let mut val: AsciiMetadataValue = format!("Bearer {}", token.expose_secret())
48        .try_into()
49        .map_err(|_| "failed to parse token as metadata value")?;
50    val.set_sensitive(true);
51    meta.insert("authorization", val);
52    Ok(())
53}
54
55pub(crate) fn add_s2_request_token_header(
56    meta: &mut MetadataMap,
57    s2_request_token: &str,
58) -> Result<(), types::ConvertError> {
59    let s2_request_token: AsciiMetadataValue = s2_request_token
60        .try_into()
61        .map_err(|_| "failed to parse token as metadata value")?;
62
63    meta.insert("s2-request-token", s2_request_token);
64
65    Ok(())
66}
67
68pub(crate) fn gen_s2_request_token() -> String {
69    uuid::Uuid::new_v4().simple().to_string()
70}
71
72pub trait ServiceRequest: std::fmt::Debug {
73    /// Request parameters generated by prost.
74    type ApiRequest;
75    /// Response to be returned by the RPC.
76    type Response;
77    /// Response generated by prost to be returned.
78    type ApiResponse;
79
80    /// Idempotency level for the underlying service.
81    const IDEMPOTENCY_LEVEL: IdempotencyLevel;
82
83    /// Take the request parameters and generate the corresponding tonic request.
84    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError>;
85
86    /// Actually send the tonic request to receive a raw response and the parsed error.
87    async fn send(
88        &mut self,
89        req: tonic::Request<Self::ApiRequest>,
90    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status>;
91
92    /// Return true if the request should be retried based on the error returned.
93    fn should_retry(&self, err: &ClientError) -> bool {
94        if Self::IDEMPOTENCY_LEVEL == IdempotencyLevel::IdempotencyUnknown {
95            return false;
96        };
97
98        // The request is definitely idempotent.
99        if let ClientError::Service(status) = err {
100            matches!(
101                status.code(),
102                tonic::Code::Unavailable
103                    | tonic::Code::DeadlineExceeded
104                    | tonic::Code::Cancelled
105                    | tonic::Code::Unknown
106                    | tonic::Code::ResourceExhausted
107            )
108        } else {
109            false
110        }
111    }
112
113    /// Take the tonic response and generate the response to be returned.
114    fn parse_response(
115        &self,
116        resp: tonic::Response<Self::ApiResponse>,
117    ) -> Result<Self::Response, types::ConvertError>;
118}
119
120pub trait StreamingRequest: Unpin {
121    type RequestItem;
122    type ApiRequestItem;
123
124    fn prepare_request_item(&self, req: Self::RequestItem) -> Self::ApiRequestItem;
125}
126
127pub struct ServiceStreamingRequest<R, S>
128where
129    R: StreamingRequest,
130    S: futures::Stream<Item = R::RequestItem> + Unpin,
131{
132    req: R,
133    stream: S,
134}
135
136impl<R, S> ServiceStreamingRequest<R, S>
137where
138    R: StreamingRequest,
139    S: futures::Stream<Item = R::RequestItem> + Unpin,
140{
141    pub fn new(req: R, stream: S) -> Self {
142        Self { req, stream }
143    }
144}
145
146impl<R, S> futures::Stream for ServiceStreamingRequest<R, S>
147where
148    R: StreamingRequest,
149    S: futures::Stream<Item = R::RequestItem> + Unpin,
150{
151    type Item = R::ApiRequestItem;
152
153    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154        match self.stream.poll_next_unpin(cx) {
155            Poll::Pending => Poll::Pending,
156            Poll::Ready(None) => Poll::Ready(None),
157            Poll::Ready(Some(req)) => Poll::Ready(Some(self.req.prepare_request_item(req))),
158        }
159    }
160}
161
162pub trait StreamingResponse: Unpin {
163    /// Response message item to be returned by the RPC stream.
164    type ResponseItem;
165    /// Response message item generated by prost in the stream.
166    type ApiResponseItem;
167
168    /// Take the tonic response message from stream item and generate stream item.
169    fn parse_response_item(
170        &self,
171        resp: Self::ApiResponseItem,
172    ) -> Result<Self::ResponseItem, ClientError>;
173}
174
175pub struct ServiceStreamingResponse<S: StreamingResponse> {
176    req: S,
177    stream: tonic::Streaming<S::ApiResponseItem>,
178}
179
180impl<S: StreamingResponse> ServiceStreamingResponse<S> {
181    pub fn new(req: S, stream: tonic::Streaming<S::ApiResponseItem>) -> Self {
182        Self { req, stream }
183    }
184}
185
186impl<S: StreamingResponse> futures::Stream for ServiceStreamingResponse<S> {
187    type Item = Result<S::ResponseItem, ClientError>;
188
189    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190        match self.stream.poll_next_unpin(cx) {
191            Poll::Pending => Poll::Pending,
192            Poll::Ready(None) => Poll::Ready(None),
193            Poll::Ready(Some(item)) => {
194                let item = match item {
195                    Ok(resp) => self.req.parse_response_item(resp),
196                    Err(status) => Err(ClientError::Service(status)),
197                };
198                Poll::Ready(Some(item))
199            }
200        }
201    }
202}
203
204/// Generic type for streaming response.
205pub type Streaming<R> = Pin<Box<dyn Send + futures::Stream<Item = Result<R, ClientError>>>>;