1pub mod account;
2pub mod basin;
3pub mod stream;
4
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::StreamExt;
11use prost_types::method_options::IdempotencyLevel;
12use secrecy::{ExposeSecret, SecretString};
13use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};
14
15use crate::{client::ClientError, types};
16
17pub async fn send_request<T: ServiceRequest>(
18 mut service: T,
19 token: &SecretString,
20 basin_header: Option<AsciiMetadataValue>,
21) -> Result<T::Response, ClientError> {
22 let req = prepare_request(&mut service, token, basin_header)?;
23 match service.send(req).await {
24 Ok(resp) => Ok(service.parse_response(resp)?),
25 Err(status) => Err(ClientError::Service(status)),
26 }
27}
28
29fn prepare_request<T: ServiceRequest>(
30 service: &mut T,
31 token: &SecretString,
32 basin_header: Option<AsciiMetadataValue>,
33) -> Result<tonic::Request<T::ApiRequest>, types::ConvertError> {
34 let mut req = service.prepare_request()?;
35 add_authorization_header(req.metadata_mut(), token)?;
36 if let Some(basin) = basin_header {
37 req.metadata_mut()
38 .insert(AsciiMetadataKey::from_static("s2-basin"), basin);
39 }
40 Ok(req)
41}
42
43fn add_authorization_header(
44 meta: &mut MetadataMap,
45 token: &SecretString,
46) -> Result<(), types::ConvertError> {
47 let mut val: AsciiMetadataValue = format!("Bearer {}", token.expose_secret())
48 .try_into()
49 .map_err(|_| "failed to parse token as metadata value")?;
50 val.set_sensitive(true);
51 meta.insert("authorization", val);
52 Ok(())
53}
54
55pub(crate) fn add_s2_request_token_header(
56 meta: &mut MetadataMap,
57 s2_request_token: &str,
58) -> Result<(), types::ConvertError> {
59 let s2_request_token: AsciiMetadataValue = s2_request_token
60 .try_into()
61 .map_err(|_| "failed to parse token as metadata value")?;
62
63 meta.insert("s2-request-token", s2_request_token);
64
65 Ok(())
66}
67
68pub(crate) fn gen_s2_request_token() -> String {
69 uuid::Uuid::new_v4().simple().to_string()
70}
71
72pub trait ServiceRequest: std::fmt::Debug {
73 type ApiRequest;
75 type Response;
77 type ApiResponse;
79
80 const IDEMPOTENCY_LEVEL: IdempotencyLevel;
82
83 fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError>;
85
86 async fn send(
88 &mut self,
89 req: tonic::Request<Self::ApiRequest>,
90 ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status>;
91
92 fn should_retry(&self, err: &ClientError) -> bool {
94 if Self::IDEMPOTENCY_LEVEL == IdempotencyLevel::IdempotencyUnknown {
95 return false;
96 };
97
98 if let ClientError::Service(status) = err {
100 matches!(
101 status.code(),
102 tonic::Code::Unavailable
103 | tonic::Code::DeadlineExceeded
104 | tonic::Code::Cancelled
105 | tonic::Code::Unknown
106 | tonic::Code::ResourceExhausted
107 )
108 } else {
109 false
110 }
111 }
112
113 fn parse_response(
115 &self,
116 resp: tonic::Response<Self::ApiResponse>,
117 ) -> Result<Self::Response, types::ConvertError>;
118}
119
120pub trait StreamingRequest: Unpin {
121 type RequestItem;
122 type ApiRequestItem;
123
124 fn prepare_request_item(&self, req: Self::RequestItem) -> Self::ApiRequestItem;
125}
126
127pub struct ServiceStreamingRequest<R, S>
128where
129 R: StreamingRequest,
130 S: futures::Stream<Item = R::RequestItem> + Unpin,
131{
132 req: R,
133 stream: S,
134}
135
136impl<R, S> ServiceStreamingRequest<R, S>
137where
138 R: StreamingRequest,
139 S: futures::Stream<Item = R::RequestItem> + Unpin,
140{
141 pub fn new(req: R, stream: S) -> Self {
142 Self { req, stream }
143 }
144}
145
146impl<R, S> futures::Stream for ServiceStreamingRequest<R, S>
147where
148 R: StreamingRequest,
149 S: futures::Stream<Item = R::RequestItem> + Unpin,
150{
151 type Item = R::ApiRequestItem;
152
153 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154 match self.stream.poll_next_unpin(cx) {
155 Poll::Pending => Poll::Pending,
156 Poll::Ready(None) => Poll::Ready(None),
157 Poll::Ready(Some(req)) => Poll::Ready(Some(self.req.prepare_request_item(req))),
158 }
159 }
160}
161
162pub trait StreamingResponse: Unpin {
163 type ResponseItem;
165 type ApiResponseItem;
167
168 fn parse_response_item(
170 &self,
171 resp: Self::ApiResponseItem,
172 ) -> Result<Self::ResponseItem, ClientError>;
173}
174
175pub struct ServiceStreamingResponse<S: StreamingResponse> {
176 req: S,
177 stream: tonic::Streaming<S::ApiResponseItem>,
178}
179
180impl<S: StreamingResponse> ServiceStreamingResponse<S> {
181 pub fn new(req: S, stream: tonic::Streaming<S::ApiResponseItem>) -> Self {
182 Self { req, stream }
183 }
184}
185
186impl<S: StreamingResponse> futures::Stream for ServiceStreamingResponse<S> {
187 type Item = Result<S::ResponseItem, ClientError>;
188
189 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190 match self.stream.poll_next_unpin(cx) {
191 Poll::Pending => Poll::Pending,
192 Poll::Ready(None) => Poll::Ready(None),
193 Poll::Ready(Some(item)) => {
194 let item = match item {
195 Ok(resp) => self.req.parse_response_item(resp),
196 Err(status) => Err(ClientError::Service(status)),
197 };
198 Poll::Ready(Some(item))
199 }
200 }
201 }
202}
203
204pub type Streaming<R> = Pin<Box<dyn Send + futures::Stream<Item = Result<R, ClientError>>>>;