1use prost::Message;
2use reqwest::header::CONTENT_TYPE;
3use reqwest::Client;
4use std::collections::HashMap;
5use std::default::Default;
6use std::sync::Arc;
7
8use crate::error::VssError;
9use crate::headers::{get_headermap, FixedHeaders, VssHeaderProvider};
10use crate::types::{
11 DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse,
12 ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
13};
14use crate::util::retry::{retry, RetryPolicy};
15
16const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
17
18#[derive(Clone)]
21pub struct VssClient<R>
22where
23 R: RetryPolicy<E = VssError>,
24{
25 base_url: String,
26 client: Client,
27 retry_policy: R,
28 header_provider: Arc<dyn VssHeaderProvider>,
29}
30
31impl<R: RetryPolicy<E = VssError>> VssClient<R> {
32 pub fn new(base_url: String, retry_policy: R) -> Self {
34 let client = Client::new();
35 Self::from_client(base_url, client, retry_policy)
36 }
37
38 pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self {
40 Self {
41 base_url,
42 client,
43 retry_policy,
44 header_provider: Arc::new(FixedHeaders::new(HashMap::new())),
45 }
46 }
47
48 pub fn new_with_headers(
52 base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>,
53 ) -> Self {
54 let client = Client::new();
55 Self { base_url, client, retry_policy, header_provider }
56 }
57
58 pub fn base_url(&self) -> &str {
60 &self.base_url
61 }
62
63 pub async fn get_object(
67 &self, request: &GetObjectRequest,
68 ) -> Result<GetObjectResponse, VssError> {
69 retry(
70 || async {
71 let url = format!("{}/getObject", self.base_url);
72 self.post_request(request, &url).await.and_then(|response: GetObjectResponse| {
73 if response.value.is_none() {
74 Err(VssError::InternalServerError(
75 "VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
76 ))
77 } else {
78 Ok(response)
79 }
80 })
81 },
82 &self.retry_policy,
83 )
84 .await
85 }
86
87 pub async fn put_object(
92 &self, request: &PutObjectRequest,
93 ) -> Result<PutObjectResponse, VssError> {
94 retry(
95 || async {
96 let url = format!("{}/putObjects", self.base_url);
97 self.post_request(request, &url).await
98 },
99 &self.retry_policy,
100 )
101 .await
102 }
103
104 pub async fn delete_object(
108 &self, request: &DeleteObjectRequest,
109 ) -> Result<DeleteObjectResponse, VssError> {
110 retry(
111 || async {
112 let url = format!("{}/deleteObject", self.base_url);
113 self.post_request(request, &url).await
114 },
115 &self.retry_policy,
116 )
117 .await
118 }
119
120 pub async fn list_key_versions(
124 &self, request: &ListKeyVersionsRequest,
125 ) -> Result<ListKeyVersionsResponse, VssError> {
126 retry(
127 || async {
128 let url = format!("{}/listKeyVersions", self.base_url);
129 self.post_request(request, &url).await
130 },
131 &self.retry_policy,
132 )
133 .await
134 }
135
136 async fn post_request<Rq: Message, Rs: Message + Default>(
137 &self, request: &Rq, url: &str,
138 ) -> Result<Rs, VssError> {
139 let request_body = request.encode_to_vec();
140 let headermap = self
141 .header_provider
142 .get_headers(&request_body)
143 .await
144 .and_then(|h| get_headermap(&h))
145 .map_err(|e| VssError::AuthError(e.to_string()))?;
146 let response_raw = self
147 .client
148 .post(url)
149 .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
150 .headers(headermap)
151 .body(request_body)
152 .send()
153 .await?;
154 let status = response_raw.status();
155 let payload = response_raw.bytes().await?;
156
157 if status.is_success() {
158 let response = Rs::decode(&payload[..])?;
159 Ok(response)
160 } else {
161 Err(VssError::new(status, payload))
162 }
163 }
164}