use crate::distributed::error::DistributedStorageError;
use crate::distributed::model::{Keys, Object, Partitions, Store, StoreMode, Stores};
use crate::distributed::DistributedStorage;
use pdk_core::classy::extract::context::ConfigureContext;
use pdk_core::classy::extract::{Extract, FromContext};
use pdk_core::classy::hl::{HttpClient, InvalidUri, Service, Uri};
use pdk_core::classy::Configuration;
use pdk_core::logger::debug;
use serde::Deserialize;
use std::str::FromStr;
use thiserror::Error;
const DEFAULT_LOCAL_STORAGE_URL: &str = "http://127.0.0.1:4000";
const DEFAULT_LOCAL_STORAGE_SERVICE: &str = "x-flex-keyvalue-store";
const ETAG_HEADER: &str = "etag";
const IF_MATCH_HEADER: &str = "if-match";
const IF_NONE_HEADER: &str = "if-none-match";
const API_PREFIX: &str = "/api/v1";
pub struct DistributedStorageClient {
http_client: HttpClient,
service: Service,
}
impl DistributedStorageClient {
fn new(http_client: HttpClient, service: Service) -> Self {
Self {
http_client,
service,
}
}
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum DistributedStorageClientExtractionError {
#[error("Invalid Uri for shared data coordinates: {0}.")]
InvalidUri(#[from] InvalidUri),
}
impl FromContext<ConfigureContext> for DistributedStorageClient {
type Error = DistributedStorageClientExtractionError;
fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
let client: HttpClient = context.extract_always();
let Configuration(bytes): Configuration = context.extract_always();
let shared_storage = serde_json::from_slice::<SharedStorageConfig>(bytes.as_slice())
.map(|s| s.shared_storage)
.unwrap_or_else(|_| {
debug!("Could not retrieve coordinates of the shared storage, will use the default coordinates.");
SharedStorageConfigData {
base_url: DEFAULT_LOCAL_STORAGE_URL.to_string(),
service: DEFAULT_LOCAL_STORAGE_SERVICE.to_string(),
}
});
let uri = if !shared_storage.base_url.starts_with("http") {
Uri::from_str(format!("http://{}", shared_storage.base_url).as_str())
} else {
Uri::from_str(shared_storage.base_url.as_str())
}?;
let service = Service::new(&shared_storage.service, uri);
Ok(DistributedStorageClient::new(client, service))
}
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SharedStorageConfig {
shared_storage: SharedStorageConfigData,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SharedStorageConfigData {
base_url: String,
service: String,
}
impl DistributedStorage for DistributedStorageClient {
async fn upsert_store(&self, store: &Store) -> Result<(), DistributedStorageError> {
debug!("Creating store {}.", store.store_id());
let request_path = format!("{}/stores/{}", API_PREFIX, store.store_id());
let json = serde_json::to_string(&store).unwrap();
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.body(json.as_bytes())
.put()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() != 201 {
Err(DistributedStorageError::from(response))
} else {
Ok(())
}
}
async fn get_stores(&self) -> Result<Vec<Store>, DistributedStorageError> {
debug!("Getting stores.");
let request_path = format!("{API_PREFIX}/stores");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.get()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() == 200 {
if let Ok(stores) = serde_json::from_slice::<Stores>(response.body()) {
return Ok(stores.values);
}
}
Err(DistributedStorageError::from(response))
}
async fn get_keys(
&self,
store: &str,
partition: &str,
) -> Result<Vec<String>, DistributedStorageError> {
debug!("Getting keys in store '{store}' partition '{partition}'.");
let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.get()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() == 200 {
if let Ok(keys) = serde_json::from_slice::<Keys>(response.body()) {
return Ok(keys.values.into_iter().map(|k| k.key_id).collect());
}
}
Err(DistributedStorageError::from(response))
}
async fn get_partitions(&self, store: &str) -> Result<Vec<String>, DistributedStorageError> {
debug!("Getting partitions for store '{store}'.");
let request_path = format!("{API_PREFIX}/stores/{store}/partitions");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.get()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() == 200 {
if let Ok(partitions) = serde_json::from_slice::<Partitions>(response.body()) {
return Ok(partitions.values);
}
}
Err(DistributedStorageError::from(response))
}
async fn store(
&self,
store: &str,
partition: &str,
key: &str,
mode: &StoreMode,
item: &[u8],
) -> Result<(), DistributedStorageError> {
debug!("Storing item: store '{store}' partition '{partition}' key '{key}'.");
let headers = match &mode {
StoreMode::Always => vec![],
StoreMode::Absent => vec![(IF_NONE_HEADER, "*")],
StoreMode::Cas(cas) => vec![(IF_MATCH_HEADER, cas.as_str())],
};
let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");
let json = serde_json::to_string(&Object::new_binary(item)).unwrap();
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.body(json.as_bytes())
.headers(headers)
.put()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() != 201 {
Err(DistributedStorageError::from(response))
} else {
Ok(())
}
}
async fn get(
&self,
store: &str,
partition: &str,
key: &str,
) -> Result<(Vec<u8>, String), DistributedStorageError> {
debug!("Retrieving item: store '{store}' partition '{partition}' key '{key}'.");
let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.get()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() == 200 {
if let Ok(obj) = serde_json::from_slice::<Object>(response.body()) {
if let Ok(obj) = obj.get_binary() {
return Ok((
obj,
response
.headers()
.get(ETAG_HEADER)
.cloned()
.unwrap_or_default(),
));
}
}
}
Err(DistributedStorageError::from(response))
}
async fn delete(
&self,
store: &str,
partition: &str,
key: &str,
) -> Result<(), DistributedStorageError> {
debug!("Deleting item: store '{store}' partition '{partition}' key '{key}'.");
let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.delete()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() != 204 {
Err(DistributedStorageError::from(response))
} else {
Ok(())
}
}
async fn delete_partition(
&self,
store: &str,
partition: &str,
) -> Result<(), DistributedStorageError> {
debug!("Deleting partition: store '{store}' partition '{partition}'.");
let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}");
let response = self
.http_client
.request(&self.service)
.path(request_path.as_str())
.delete()
.await
.map_err(DistributedStorageError::from)?;
if response.status_code() != 204 {
Err(DistributedStorageError::from(response))
} else {
Ok(())
}
}
}