1use bitreq::Client;
2use prost::Message;
3use std::collections::HashMap;
4use std::default::Default;
5use std::sync::Arc;
6
7use log::trace;
8
9use crate::error::VssError;
10use crate::headers::{FixedHeaders, VssHeaderProvider};
11use crate::types::{
12 DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse,
13 ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
14};
15use crate::util::retry::{retry, RetryPolicy};
16use crate::util::KeyValueVecKeyPrinter;
17
18const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
19const CONTENT_TYPE: &str = "content-type";
20const DEFAULT_TIMEOUT_SECS: u64 = 10;
21const MAX_RESPONSE_BODY_SIZE: usize = 1024 * 1024 * 1024; const DEFAULT_CLIENT_CAPACITY: usize = 10;
23
24#[derive(Clone)]
27pub struct VssClient<R>
28where
29 R: RetryPolicy<E = VssError>,
30{
31 base_url: String,
32 client: Client,
33 retry_policy: R,
34 header_provider: Arc<dyn VssHeaderProvider>,
35}
36
37impl<R: RetryPolicy<E = VssError>> VssClient<R> {
38 pub fn new(base_url: String, retry_policy: R) -> Self {
40 let client = Client::new(DEFAULT_CLIENT_CAPACITY);
41 Self::from_client(base_url, client, retry_policy)
42 }
43
44 pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self {
46 Self {
47 base_url,
48 client,
49 retry_policy,
50 header_provider: Arc::new(FixedHeaders::new(HashMap::new())),
51 }
52 }
53
54 pub fn from_client_and_headers(
58 base_url: String, client: Client, retry_policy: R,
59 header_provider: Arc<dyn VssHeaderProvider>,
60 ) -> Self {
61 Self { base_url, client, retry_policy, header_provider }
62 }
63
64 pub fn new_with_headers(
68 base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>,
69 ) -> Self {
70 let client = Client::new(DEFAULT_CLIENT_CAPACITY);
71 Self { base_url, client, retry_policy, header_provider }
72 }
73
74 pub fn base_url(&self) -> &str {
76 &self.base_url
77 }
78
79 pub async fn get_object(
83 &self, request: &GetObjectRequest,
84 ) -> Result<GetObjectResponse, VssError> {
85 let request_id: u64 = rand::random();
86 trace!("Sending GetObjectRequest {} for key {}.", request_id, request.key);
87 let res = retry(
88 || async {
89 let url = format!("{}/getObject", self.base_url);
90 self.post_request(request, &url, true).await.and_then(
91 |response: GetObjectResponse| {
92 if response.value.is_none() {
93 Err(VssError::InternalServerError(
94 "VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
95 ))
96 } else {
97 Ok(response)
98 }
99 },
100 )
101 },
102 &self.retry_policy,
103 )
104 .await;
105 if let Err(ref e) = res {
106 trace!("GetObjectRequest {} failed: {}", request_id, e);
107 }
108 res
109 }
110
111 pub async fn put_object(
116 &self, request: &PutObjectRequest,
117 ) -> Result<PutObjectResponse, VssError> {
118 let request_id: u64 = rand::random();
119 trace!(
120 "Sending PutObjectRequest {} for transaction_items {} and delete_items {}.",
121 request_id,
122 KeyValueVecKeyPrinter(&request.transaction_items),
123 KeyValueVecKeyPrinter(&request.delete_items),
124 );
125 let res = retry(
126 || async {
127 let url = format!("{}/putObjects", self.base_url);
128 self.post_request(request, &url, false).await
129 },
130 &self.retry_policy,
131 )
132 .await;
133 if let Err(ref e) = res {
134 trace!("PutObjectRequest {} failed: {}", request_id, e);
135 }
136 res
137 }
138
139 pub async fn delete_object(
143 &self, request: &DeleteObjectRequest,
144 ) -> Result<DeleteObjectResponse, VssError> {
145 let request_id: u64 = rand::random();
146 trace!(
147 "Sending DeleteObjectRequest {} for key {:?}",
148 request_id,
149 request.key_value.as_ref().map(|t| &t.key)
150 );
151 let res = retry(
152 || async {
153 let url = format!("{}/deleteObject", self.base_url);
154 self.post_request(request, &url, true).await
155 },
156 &self.retry_policy,
157 )
158 .await;
159 if let Err(ref e) = res {
160 trace!("DeleteObjectRequest {} failed: {}", request_id, e);
161 }
162 res
163 }
164
165 pub async fn list_key_versions(
169 &self, request: &ListKeyVersionsRequest,
170 ) -> Result<ListKeyVersionsResponse, VssError> {
171 let request_id: u64 = rand::random();
172 trace!(
173 "Sending ListKeyVersionsRequest {} for key_prefix {:?}, page_size {:?}, page_token {:?}",
174 request_id,
175 request.key_prefix,
176 request.page_size,
177 request.page_token
178 );
179 let res = retry(
180 || async {
181 let url = format!("{}/listKeyVersions", self.base_url);
182 self.post_request(request, &url, true).await
183 },
184 &self.retry_policy,
185 )
186 .await;
187 if let Err(ref e) = res {
188 trace!("ListKeyVersionsRequest {} failed: {}", request_id, e);
189 }
190 res
191 }
192
193 async fn post_request<Rq: Message, Rs: Message + Default>(
194 &self, request: &Rq, url: &str, enable_pipelining: bool,
195 ) -> Result<Rs, VssError> {
196 let request_body = request.encode_to_vec();
197 let headers = self
198 .header_provider
199 .get_headers(&request_body)
200 .await
201 .map_err(|e| VssError::AuthError(e.to_string()))?;
202
203 let mut http_request = bitreq::post(url)
204 .with_header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
205 .with_headers(headers)
206 .with_body(request_body)
207 .with_timeout(DEFAULT_TIMEOUT_SECS)
208 .with_max_body_size(Some(MAX_RESPONSE_BODY_SIZE));
209
210 if enable_pipelining {
211 http_request = http_request.with_pipelining();
212 }
213
214 let response = self.client.send_async(http_request).await?;
215
216 let status_code = response.status_code;
217 let payload = response.into_bytes();
218
219 if (200..300).contains(&status_code) {
220 let response = Rs::decode(&payload[..])?;
221 Ok(response)
222 } else {
223 Err(VssError::new(status_code, payload))
224 }
225 }
226}