mod local;
mod distributed;
#[cfg(feature = "ll")]
pub mod ll {
pub mod local {
pub use crate::local::*;
}
pub mod distributed {
pub use crate::distributed::*;
}
}
use pdk_core::classy::extract::context::ConfigureContext;
use pdk_core::classy::extract::{Extract, FromContext};
use pdk_core::logger;
use serde::{de::DeserializeOwned, Serialize};
use std::rc::Rc;
use thiserror::Error;
use url::form_urlencoded;
use crate::distributed::DistributedStorage;
use crate::local::LocalStorage;
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum StoreMode {
Always,
Absent,
Cas(String),
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DataStorageError {
#[error("CAS mismatch.")]
CasMismatch,
#[error("Serialization error: {0}.")]
Serialization(#[from] bincode::Error),
#[error("CAS parse error: {0}.")]
CasParseError(#[from] std::num::ParseIntError),
#[error("Timeout.")]
Timeout,
#[error("HTTP Client Error.")]
HttpClient,
#[error("Unexpected error: {0}.")]
Unexpected(String),
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DataStorageBuilderError {
#[error("Local storage not available")]
LocalStorageRequired,
#[error("Policy metadata not available")]
MetadataRequired,
}
impl From<crate::local::LocalStorageError> for DataStorageError {
fn from(error: crate::local::LocalStorageError) -> Self {
match error {
crate::local::LocalStorageError::CasMismatch => DataStorageError::CasMismatch,
_ => DataStorageError::Unexpected(error.to_string()),
}
}
}
impl From<crate::distributed::DistributedStorageError> for DataStorageError {
fn from(error: crate::distributed::DistributedStorageError) -> Self {
match error {
crate::distributed::DistributedStorageError::CasMismatch => {
DataStorageError::CasMismatch
}
crate::distributed::DistributedStorageError::Timeout => DataStorageError::Timeout,
crate::distributed::DistributedStorageError::HttpClient(_) => {
DataStorageError::HttpClient
}
error => DataStorageError::Unexpected(error.to_string()),
}
}
}
#[allow(async_fn_in_trait)]
pub trait DataStorage {
async fn get_keys(&self) -> Result<Vec<String>, DataStorageError>;
async fn store<T: Serialize>(
&self,
key: &str,
mode: &StoreMode,
item: &T,
) -> Result<(), DataStorageError>;
async fn get<T: DeserializeOwned>(
&self,
key: &str,
) -> Result<Option<(T, String)>, DataStorageError>;
async fn delete(&self, key: &str) -> Result<(), DataStorageError>;
async fn delete_all(&self) -> Result<(), DataStorageError>;
}
pub struct LocalDataStorage {
storage: crate::local::SharedData,
namespace: String,
}
impl LocalDataStorage {
pub(crate) fn new(storage: crate::local::SharedData, namespace: String) -> Self {
Self { storage, namespace }
}
fn convert_store_mode(
&self,
mode: &StoreMode,
) -> Result<crate::local::StoreMode, DataStorageError> {
match mode {
StoreMode::Always => Ok(crate::local::StoreMode::Always),
StoreMode::Absent => Ok(crate::local::StoreMode::Absent),
StoreMode::Cas(cas_str) => {
let cas: u32 = cas_str.parse()?;
Ok(crate::local::StoreMode::Cas(cas))
}
}
}
fn namespaced_key(&self, key: &str) -> String {
format!("{}:{}", self.namespace, key)
}
}
impl DataStorage for LocalDataStorage {
async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
let all_keys = self.storage.keys();
let namespace_prefix = format!("{}:", self.namespace);
let filtered_keys: Vec<String> = all_keys
.into_iter()
.filter(|key| key.starts_with(&namespace_prefix))
.map(|key| {
key.strip_prefix(&namespace_prefix)
.unwrap_or(&key)
.to_string()
})
.collect();
Ok(filtered_keys)
}
async fn store<T: Serialize>(
&self,
key: &str,
mode: &StoreMode,
item: &T,
) -> Result<(), DataStorageError> {
let serialized = bincode::serialize(item)?;
let local_mode = self.convert_store_mode(mode)?;
let namespaced_key = self.namespaced_key(key);
self.storage.set(&namespaced_key, &serialized, local_mode)?;
Ok(())
}
async fn get<T: DeserializeOwned>(
&self,
key: &str,
) -> Result<Option<(T, String)>, DataStorageError> {
let namespaced_key = self.namespaced_key(key);
match self.storage.get(&namespaced_key)? {
Some((data, cas)) => {
let deserialized: T =
bincode::deserialize(&data).map_err(DataStorageError::from)?;
Ok(Some((deserialized, cas.to_string())))
}
None => Ok(None),
}
}
async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
let namespaced_key = self.namespaced_key(key);
self.storage.delete(&namespaced_key)?;
Ok(())
}
async fn delete_all(&self) -> Result<(), DataStorageError> {
let all_keys = self.storage.keys();
let namespace_prefix = format!("{}:", self.namespace);
for key in all_keys {
if key.starts_with(&namespace_prefix) {
self.storage.delete(&key)?;
}
}
Ok(())
}
}
pub struct RemoteDataStorage {
storage: Rc<crate::distributed::DistributedStorageClient>,
sanitized_store: String,
sanitized_partition: String,
ttl_millis: u32,
}
impl RemoteDataStorage {
pub(crate) fn new(
storage: Rc<crate::distributed::DistributedStorageClient>,
store: String,
partition: String,
ttl_millis: u32,
) -> Self {
let sanitized_store = form_urlencoded::byte_serialize(store.as_bytes()).collect();
let sanitized_partition = form_urlencoded::byte_serialize(partition.as_bytes()).collect();
Self {
storage,
sanitized_store,
sanitized_partition,
ttl_millis,
}
}
fn convert_store_mode(&self, mode: &StoreMode) -> crate::distributed::StoreMode {
match mode {
StoreMode::Always => crate::distributed::StoreMode::Always,
StoreMode::Absent => crate::distributed::StoreMode::Absent,
StoreMode::Cas(cas_str) => crate::distributed::StoreMode::Cas(cas_str.clone()),
}
}
fn sanitize_key(&self, key: &str) -> String {
form_urlencoded::byte_serialize(key.as_bytes()).collect()
}
}
impl DataStorage for RemoteDataStorage {
async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
match self
.storage
.get_keys(&self.sanitized_store, &self.sanitized_partition)
.await
{
Ok(keys) => {
let decoded_keys: Vec<String> = keys
.into_iter()
.filter_map(|encoded_key| {
let decoded = form_urlencoded::parse(encoded_key.as_bytes())
.next()
.map(|(key, _)| key.into_owned());
if decoded.is_none() {
logger::debug!("Key not URL-encoded or decode failed: {encoded_key}");
}
decoded
})
.collect();
Ok(decoded_keys)
}
Err(e) => {
logger::warn!("Error getting keys: {e}");
Ok(vec![])
}
}
}
async fn store<T: Serialize>(
&self,
key: &str,
mode: &StoreMode,
item: &T,
) -> Result<(), DataStorageError> {
let serialized = bincode::serialize(item)?;
let distributed_mode = self.convert_store_mode(mode);
let sanitized_key = self.sanitize_key(key);
match self
.storage
.store(
&self.sanitized_store,
&self.sanitized_partition,
&sanitized_key,
&distributed_mode,
&serialized,
)
.await
{
Ok(()) => Ok(()),
Err(crate::distributed::DistributedStorageError::StoreNotFound) => {
let store = crate::distributed::Store::new(
self.sanitized_store.clone(),
Some(self.ttl_millis),
None,
);
if let Err(e) = self.storage.upsert_store(&store).await {
logger::warn!("Error creating store: {e}");
}
self.storage
.store(
&self.sanitized_store,
&self.sanitized_partition,
&sanitized_key,
&distributed_mode,
&serialized,
)
.await?;
Ok(())
}
Err(e) => Err(e.into()), }
}
async fn get<T: DeserializeOwned>(
&self,
key: &str,
) -> Result<Option<(T, String)>, DataStorageError> {
let sanitized_key = self.sanitize_key(key);
match self
.storage
.get(
&self.sanitized_store,
&self.sanitized_partition,
&sanitized_key,
)
.await
{
Ok((data, cas)) => {
let deserialized: T =
bincode::deserialize(&data).map_err(DataStorageError::from)?;
Ok(Some((deserialized, cas)))
}
Err(crate::distributed::DistributedStorageError::KeyNotFound) => {
logger::debug!("Key not found: {key}");
Ok(None)
}
Err(e) => {
logger::error!("Error getting value for key {key}: {e:?}");
Err(e.into())
}
}
}
async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
let sanitized_key = self.sanitize_key(key);
if let Err(e) = self
.storage
.delete(
&self.sanitized_store,
&self.sanitized_partition,
&sanitized_key,
)
.await
{
logger::warn!("Error deleting key {key}: {e}");
}
Ok(())
}
async fn delete_all(&self) -> Result<(), DataStorageError> {
if let Err(e) = self
.storage
.delete_partition(&self.sanitized_store, &self.sanitized_partition)
.await
{
logger::warn!("Error deleting partition: {e}");
}
Ok(())
}
}
pub struct DataStorageBuilder {
prefix: String,
shared_data: Rc<crate::local::SharedData>,
distributed_storage: Option<Rc<crate::distributed::DistributedStorageClient>>,
}
impl FromContext<ConfigureContext> for DataStorageBuilder {
type Error = DataStorageBuilderError;
fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
let shared_data: crate::local::SharedData = context
.extract()
.map_err(|_| DataStorageBuilderError::LocalStorageRequired)?;
let distributed_storage: Result<crate::distributed::DistributedStorageClient, _> =
context.extract();
let metadata: pdk_core::policy_context::api::Metadata = context
.extract()
.map_err(|_| DataStorageBuilderError::MetadataRequired)?;
let prefix = format!(
"isolated-storage-{}-{}",
metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
);
pdk_core::logger::info!(
"DataStorageBuilder: creating prefix '{}' for policy '{}' in namespace '{}'",
prefix,
metadata.policy_metadata.policy_name,
metadata.policy_metadata.policy_namespace
);
Ok(DataStorageBuilder {
prefix,
shared_data: Rc::new(shared_data),
distributed_storage: distributed_storage.ok().map(Rc::new),
})
}
}
impl DataStorageBuilder {
pub fn shared(mut self) -> Self {
self.prefix = "shared-storage".to_string();
self
}
pub fn local<T: Into<String>>(&self, key: T) -> LocalDataStorage {
let key_str = key.into();
let namespace = format!("{}-{}", self.prefix, key_str);
pdk_core::logger::info!(
"DataStorageBuilder::local: creating namespace '{}' with prefix '{}' and key '{}'",
namespace,
self.prefix,
key_str
);
LocalDataStorage::new((*self.shared_data).clone(), namespace)
}
pub fn remote<T: Into<String>>(&self, key: T, ttl_millis: u32) -> RemoteDataStorage {
let key_str = key.into();
let storage = self
.distributed_storage
.as_ref()
.expect("Distributed storage not available - check if it's configured");
RemoteDataStorage::new(Rc::clone(storage), key_str.clone(), key_str, ttl_millis)
}
}