vss_client/
client.rs

1use prost::Message;
2use reqwest::header::CONTENT_TYPE;
3use reqwest::Client;
4use std::collections::HashMap;
5use std::default::Default;
6use std::sync::Arc;
7
8use crate::error::VssError;
9use crate::headers::{get_headermap, FixedHeaders, VssHeaderProvider};
10use crate::types::{
11	DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse,
12	ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
13};
14use crate::util::retry::{retry, RetryPolicy};
15
16const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
17
18/// Thin-client to access a hosted instance of Versioned Storage Service (VSS).
19/// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API.
20#[derive(Clone)]
21pub struct VssClient<R>
22where
23	R: RetryPolicy<E = VssError>,
24{
25	base_url: String,
26	client: Client,
27	retry_policy: R,
28	header_provider: Arc<dyn VssHeaderProvider>,
29}
30
31impl<R: RetryPolicy<E = VssError>> VssClient<R> {
32	/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
33	pub fn new(base_url: String, retry_policy: R) -> Self {
34		let client = Client::new();
35		Self::from_client(base_url, client, retry_policy)
36	}
37
38	/// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint.
39	pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self {
40		Self {
41			base_url,
42			client,
43			retry_policy,
44			header_provider: Arc::new(FixedHeaders::new(HashMap::new())),
45		}
46	}
47
48	/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
49	///
50	/// HTTP headers will be provided by the given `header_provider`.
51	pub fn new_with_headers(
52		base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>,
53	) -> Self {
54		let client = Client::new();
55		Self { base_url, client, retry_policy, header_provider }
56	}
57
58	/// Returns the underlying base URL.
59	pub fn base_url(&self) -> &str {
60		&self.base_url
61	}
62
63	/// Fetches a value against a given `key` in `request`.
64	/// Makes a service call to the `GetObject` endpoint of the VSS server.
65	/// For API contract/usage, refer to docs for [`GetObjectRequest`] and [`GetObjectResponse`].
66	pub async fn get_object(
67		&self, request: &GetObjectRequest,
68	) -> Result<GetObjectResponse, VssError> {
69		retry(
70			|| async {
71				let url = format!("{}/getObject", self.base_url);
72				self.post_request(request, &url).await.and_then(|response: GetObjectResponse| {
73					if response.value.is_none() {
74						Err(VssError::InternalServerError(
75							"VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
76						))
77					} else {
78						Ok(response)
79					}
80				})
81			},
82			&self.retry_policy,
83		)
84		.await
85	}
86
87	/// Writes multiple [`PutObjectRequest::transaction_items`] as part of a single transaction.
88	/// Makes a service call to the `PutObject` endpoint of the VSS server, with multiple items.
89	/// Items in the `request` are written in a single all-or-nothing transaction.
90	/// For API contract/usage, refer to docs for [`PutObjectRequest`] and [`PutObjectResponse`].
91	pub async fn put_object(
92		&self, request: &PutObjectRequest,
93	) -> Result<PutObjectResponse, VssError> {
94		retry(
95			|| async {
96				let url = format!("{}/putObjects", self.base_url);
97				self.post_request(request, &url).await
98			},
99			&self.retry_policy,
100		)
101		.await
102	}
103
104	/// Deletes the given `key` and `value` in `request`.
105	/// Makes a service call to the `DeleteObject` endpoint of the VSS server.
106	/// For API contract/usage, refer to docs for [`DeleteObjectRequest`] and [`DeleteObjectResponse`].
107	pub async fn delete_object(
108		&self, request: &DeleteObjectRequest,
109	) -> Result<DeleteObjectResponse, VssError> {
110		retry(
111			|| async {
112				let url = format!("{}/deleteObject", self.base_url);
113				self.post_request(request, &url).await
114			},
115			&self.retry_policy,
116		)
117		.await
118	}
119
120	/// Lists keys and their corresponding version for a given [`ListKeyVersionsRequest::store_id`].
121	/// Makes a service call to the `ListKeyVersions` endpoint of the VSS server.
122	/// For API contract/usage, refer to docs for [`ListKeyVersionsRequest`] and [`ListKeyVersionsResponse`].
123	pub async fn list_key_versions(
124		&self, request: &ListKeyVersionsRequest,
125	) -> Result<ListKeyVersionsResponse, VssError> {
126		retry(
127			|| async {
128				let url = format!("{}/listKeyVersions", self.base_url);
129				self.post_request(request, &url).await
130			},
131			&self.retry_policy,
132		)
133		.await
134	}
135
136	async fn post_request<Rq: Message, Rs: Message + Default>(
137		&self, request: &Rq, url: &str,
138	) -> Result<Rs, VssError> {
139		let request_body = request.encode_to_vec();
140		let headermap = self
141			.header_provider
142			.get_headers(&request_body)
143			.await
144			.and_then(|h| get_headermap(&h))
145			.map_err(|e| VssError::AuthError(e.to_string()))?;
146		let response_raw = self
147			.client
148			.post(url)
149			.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
150			.headers(headermap)
151			.body(request_body)
152			.send()
153			.await?;
154		let status = response_raw.status();
155		let payload = response_raw.bytes().await?;
156
157		if status.is_success() {
158			let response = Rs::decode(&payload[..])?;
159			Ok(response)
160		} else {
161			Err(VssError::new(status, payload))
162		}
163	}
164}