Skip to main content

vss_client_ng/
client.rs

1use bitreq::Client;
2use prost::Message;
3use std::collections::HashMap;
4use std::default::Default;
5use std::sync::Arc;
6
7use log::trace;
8
9use crate::error::VssError;
10use crate::headers::{FixedHeaders, VssHeaderProvider};
11use crate::types::{
12	DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse,
13	ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
14};
15use crate::util::retry::{retry, RetryPolicy};
16use crate::util::KeyValueVecKeyPrinter;
17
18const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
19const CONTENT_TYPE: &str = "content-type";
20const DEFAULT_TIMEOUT_SECS: u64 = 10;
21const MAX_RESPONSE_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1GB
22const DEFAULT_CLIENT_CAPACITY: usize = 10;
23
24/// Thin-client to access a hosted instance of Versioned Storage Service (VSS).
25/// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API.
26#[derive(Clone)]
27pub struct VssClient<R>
28where
29	R: RetryPolicy<E = VssError>,
30{
31	base_url: String,
32	client: Client,
33	retry_policy: R,
34	header_provider: Arc<dyn VssHeaderProvider>,
35}
36
37impl<R: RetryPolicy<E = VssError>> VssClient<R> {
38	/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
39	pub fn new(base_url: String, retry_policy: R) -> Self {
40		let client = Client::new(DEFAULT_CLIENT_CAPACITY);
41		Self::from_client(base_url, client, retry_policy)
42	}
43
44	/// Constructs a [`VssClient`] from a given [`bitreq::Client`], using `base_url` as the VSS server endpoint.
45	pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self {
46		Self {
47			base_url,
48			client,
49			retry_policy,
50			header_provider: Arc::new(FixedHeaders::new(HashMap::new())),
51		}
52	}
53
54	/// Constructs a [`VssClient`] from a given [`bitreq::Client`], using `base_url` as the VSS server endpoint.
55	///
56	/// HTTP headers will be provided by the given `header_provider`.
57	pub fn from_client_and_headers(
58		base_url: String, client: Client, retry_policy: R,
59		header_provider: Arc<dyn VssHeaderProvider>,
60	) -> Self {
61		Self { base_url, client, retry_policy, header_provider }
62	}
63
64	/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
65	///
66	/// HTTP headers will be provided by the given `header_provider`.
67	pub fn new_with_headers(
68		base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>,
69	) -> Self {
70		let client = Client::new(DEFAULT_CLIENT_CAPACITY);
71		Self { base_url, client, retry_policy, header_provider }
72	}
73
74	/// Returns the underlying base URL.
75	pub fn base_url(&self) -> &str {
76		&self.base_url
77	}
78
79	/// Fetches a value against a given `key` in `request`.
80	/// Makes a service call to the `GetObject` endpoint of the VSS server.
81	/// For API contract/usage, refer to docs for [`GetObjectRequest`] and [`GetObjectResponse`].
82	pub async fn get_object(
83		&self, request: &GetObjectRequest,
84	) -> Result<GetObjectResponse, VssError> {
85		let request_id: u64 = rand::random();
86		trace!("Sending GetObjectRequest {} for key {}.", request_id, request.key);
87		let res = retry(
88			|| async {
89				let url = format!("{}/getObject", self.base_url);
90				self.post_request(request, &url, true).await.and_then(
91					|response: GetObjectResponse| {
92						if response.value.is_none() {
93							Err(VssError::InternalServerError(
94								"VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
95							))
96						} else {
97							Ok(response)
98						}
99					},
100				)
101			},
102			&self.retry_policy,
103		)
104		.await;
105		if let Err(ref e) = res {
106			trace!("GetObjectRequest {} failed: {}", request_id, e);
107		}
108		res
109	}
110
111	/// Writes multiple [`PutObjectRequest::transaction_items`] as part of a single transaction.
112	/// Makes a service call to the `PutObject` endpoint of the VSS server, with multiple items.
113	/// Items in the `request` are written in a single all-or-nothing transaction.
114	/// For API contract/usage, refer to docs for [`PutObjectRequest`] and [`PutObjectResponse`].
115	pub async fn put_object(
116		&self, request: &PutObjectRequest,
117	) -> Result<PutObjectResponse, VssError> {
118		let request_id: u64 = rand::random();
119		trace!(
120			"Sending PutObjectRequest {} for transaction_items {} and delete_items {}.",
121			request_id,
122			KeyValueVecKeyPrinter(&request.transaction_items),
123			KeyValueVecKeyPrinter(&request.delete_items),
124		);
125		let res = retry(
126			|| async {
127				let url = format!("{}/putObjects", self.base_url);
128				self.post_request(request, &url, false).await
129			},
130			&self.retry_policy,
131		)
132		.await;
133		if let Err(ref e) = res {
134			trace!("PutObjectRequest {} failed: {}", request_id, e);
135		}
136		res
137	}
138
139	/// Deletes the given `key` and `value` in `request`.
140	/// Makes a service call to the `DeleteObject` endpoint of the VSS server.
141	/// For API contract/usage, refer to docs for [`DeleteObjectRequest`] and [`DeleteObjectResponse`].
142	pub async fn delete_object(
143		&self, request: &DeleteObjectRequest,
144	) -> Result<DeleteObjectResponse, VssError> {
145		let request_id: u64 = rand::random();
146		trace!(
147			"Sending DeleteObjectRequest {} for key {:?}",
148			request_id,
149			request.key_value.as_ref().map(|t| &t.key)
150		);
151		let res = retry(
152			|| async {
153				let url = format!("{}/deleteObject", self.base_url);
154				self.post_request(request, &url, true).await
155			},
156			&self.retry_policy,
157		)
158		.await;
159		if let Err(ref e) = res {
160			trace!("DeleteObjectRequest {} failed: {}", request_id, e);
161		}
162		res
163	}
164
165	/// Lists keys and their corresponding version for a given [`ListKeyVersionsRequest::store_id`].
166	/// Makes a service call to the `ListKeyVersions` endpoint of the VSS server.
167	/// For API contract/usage, refer to docs for [`ListKeyVersionsRequest`] and [`ListKeyVersionsResponse`].
168	pub async fn list_key_versions(
169		&self, request: &ListKeyVersionsRequest,
170	) -> Result<ListKeyVersionsResponse, VssError> {
171		let request_id: u64 = rand::random();
172		trace!(
173			"Sending ListKeyVersionsRequest {} for key_prefix {:?}, page_size {:?}, page_token {:?}",
174			request_id,
175			request.key_prefix,
176			request.page_size,
177			request.page_token
178		);
179		let res = retry(
180			|| async {
181				let url = format!("{}/listKeyVersions", self.base_url);
182				self.post_request(request, &url, true).await
183			},
184			&self.retry_policy,
185		)
186		.await;
187		if let Err(ref e) = res {
188			trace!("ListKeyVersionsRequest {} failed: {}", request_id, e);
189		}
190		res
191	}
192
193	async fn post_request<Rq: Message, Rs: Message + Default>(
194		&self, request: &Rq, url: &str, enable_pipelining: bool,
195	) -> Result<Rs, VssError> {
196		let request_body = request.encode_to_vec();
197		let headers = self
198			.header_provider
199			.get_headers(&request_body)
200			.await
201			.map_err(|e| VssError::AuthError(e.to_string()))?;
202
203		let mut http_request = bitreq::post(url)
204			.with_header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
205			.with_headers(headers)
206			.with_body(request_body)
207			.with_timeout(DEFAULT_TIMEOUT_SECS)
208			.with_max_body_size(Some(MAX_RESPONSE_BODY_SIZE));
209
210		if enable_pipelining {
211			http_request = http_request.with_pipelining();
212		}
213
214		let response = self.client.send_async(http_request).await?;
215
216		let status_code = response.status_code;
217		let payload = response.into_bytes();
218
219		if (200..300).contains(&status_code) {
220			let response = Rs::decode(&payload[..])?;
221			Ok(response)
222		} else {
223			Err(VssError::new(status_code, payload))
224		}
225	}
226}