pdk-data-storage-lib 1.7.0

PDK Data Storage Library
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

//! Default client implementation for the distributed storage
//!
//! Provides [`DistributedStorageClient`], an injectable client that implements
//! the trait [`DistributedStorage`] to interact
//! with the platform shared key-value store.

use crate::distributed::error::DistributedStorageError;
use crate::distributed::model::{Keys, Object, Partitions, Store, StoreMode, Stores};
use crate::distributed::DistributedStorage;
use pdk_core::classy::extract::context::ConfigureContext;
use pdk_core::classy::extract::{Extract, FromContext};
use pdk_core::classy::hl::{HttpClient, InvalidUri, Service, Uri};
use pdk_core::classy::Configuration;
use pdk_core::logger::debug;
use serde::Deserialize;
use std::str::FromStr;
use thiserror::Error;

const DEFAULT_LOCAL_STORAGE_URL: &str = "http://127.0.0.1:4000";
const DEFAULT_LOCAL_STORAGE_SERVICE: &str = "x-flex-keyvalue-store";

const ETAG_HEADER: &str = "etag";
const IF_MATCH_HEADER: &str = "if-match";
const IF_NONE_HEADER: &str = "if-none-match";
const API_PREFIX: &str = "/api/v1";

/// Injectable client for the platform shared key-value store.
pub struct DistributedStorageClient {
    http_client: HttpClient,
    service: Service,
}

impl DistributedStorageClient {
    fn new(http_client: HttpClient, service: Service) -> Self {
        Self {
            http_client,
            service,
        }
    }
}

#[non_exhaustive]
#[derive(Error, Debug)]
/// Errors that can occur while extracting a [`DistributedStorageClient`] from
/// the configuration context.
pub enum DistributedStorageClientExtractionError {
    #[error("Invalid Uri for shared data coordinates: {0}.")]
    InvalidUri(#[from] InvalidUri),
}

impl FromContext<ConfigureContext> for DistributedStorageClient {
    type Error = DistributedStorageClientExtractionError;

    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
        let client: HttpClient = context.extract_always();
        let Configuration(bytes): Configuration = context.extract_always();

        let shared_storage = serde_json::from_slice::<SharedStorageConfig>(bytes.as_slice())
            .map(|s| s.shared_storage)
            .unwrap_or_else(|_| {
                debug!("Could not retrieve coordinates of the shared storage, will use the default coordinates.");
                SharedStorageConfigData {
                    base_url: DEFAULT_LOCAL_STORAGE_URL.to_string(),
                    service: DEFAULT_LOCAL_STORAGE_SERVICE.to_string(),
                }
            });

        let uri = if !shared_storage.base_url.starts_with("http") {
            Uri::from_str(format!("http://{}", shared_storage.base_url).as_str())
        } else {
            Uri::from_str(shared_storage.base_url.as_str())
        }?;

        let service = Service::new(&shared_storage.service, uri);

        Ok(DistributedStorageClient::new(client, service))
    }
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SharedStorageConfig {
    shared_storage: SharedStorageConfigData,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SharedStorageConfigData {
    base_url: String,
    service: String,
}

impl DistributedStorage for DistributedStorageClient {
    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn upsert_store(&self, store: &Store) -> Result<(), DistributedStorageError> {
        debug!("Creating store {}.", store.store_id());

        let request_path = format!("{}/stores/{}", API_PREFIX, store.store_id());

        let json = serde_json::to_string(&store).unwrap();

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .body(json.as_bytes())
            .put()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() != 201 {
            Err(DistributedStorageError::from(response))
        } else {
            Ok(())
        }
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn get_stores(&self) -> Result<Vec<Store>, DistributedStorageError> {
        debug!("Getting stores.");

        let request_path = format!("{API_PREFIX}/stores");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .get()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() == 200 {
            if let Ok(stores) = serde_json::from_slice::<Stores>(response.body()) {
                return Ok(stores.values);
            }
        }
        Err(DistributedStorageError::from(response))
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn get_keys(
        &self,
        store: &str,
        partition: &str,
    ) -> Result<Vec<String>, DistributedStorageError> {
        debug!("Getting keys in store '{store}' partition '{partition}'.");

        let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .get()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() == 200 {
            if let Ok(keys) = serde_json::from_slice::<Keys>(response.body()) {
                return Ok(keys.values.into_iter().map(|k| k.key_id).collect());
            }
        }
        Err(DistributedStorageError::from(response))
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn get_partitions(&self, store: &str) -> Result<Vec<String>, DistributedStorageError> {
        debug!("Getting partitions for store '{store}'.");
        let request_path = format!("{API_PREFIX}/stores/{store}/partitions");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .get()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() == 200 {
            if let Ok(partitions) = serde_json::from_slice::<Partitions>(response.body()) {
                return Ok(partitions.values);
            }
        }
        Err(DistributedStorageError::from(response))
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn store(
        &self,
        store: &str,
        partition: &str,
        key: &str,
        mode: &StoreMode,
        item: &[u8],
    ) -> Result<(), DistributedStorageError> {
        debug!("Storing item: store '{store}' partition '{partition}' key '{key}'.");
        let headers = match &mode {
            StoreMode::Always => vec![],
            StoreMode::Absent => vec![(IF_NONE_HEADER, "*")],
            StoreMode::Cas(cas) => vec![(IF_MATCH_HEADER, cas.as_str())],
        };

        let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");

        let json = serde_json::to_string(&Object::new_binary(item)).unwrap();

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .body(json.as_bytes())
            .headers(headers)
            .put()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() != 201 {
            Err(DistributedStorageError::from(response))
        } else {
            Ok(())
        }
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn get(
        &self,
        store: &str,
        partition: &str,
        key: &str,
    ) -> Result<(Vec<u8>, String), DistributedStorageError> {
        debug!("Retrieving item: store '{store}' partition '{partition}' key '{key}'.");

        let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .get()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() == 200 {
            if let Ok(obj) = serde_json::from_slice::<Object>(response.body()) {
                if let Ok(obj) = obj.get_binary() {
                    return Ok((
                        obj,
                        response
                            .headers()
                            .get(ETAG_HEADER)
                            .cloned()
                            .unwrap_or_default(),
                    ));
                }
            }
        }
        Err(DistributedStorageError::from(response))
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn delete(
        &self,
        store: &str,
        partition: &str,
        key: &str,
    ) -> Result<(), DistributedStorageError> {
        debug!("Deleting item: store '{store}' partition '{partition}' key '{key}'.");
        let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}/keys/{key}");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .delete()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() != 204 {
            Err(DistributedStorageError::from(response))
        } else {
            Ok(())
        }
    }

    /// WARNING: all store, partition and key ids MUST be url safe. To improve performance this
    /// should be done by the caller.
    async fn delete_partition(
        &self,
        store: &str,
        partition: &str,
    ) -> Result<(), DistributedStorageError> {
        debug!("Deleting partition: store '{store}' partition '{partition}'.");

        let request_path = format!("{API_PREFIX}/stores/{store}/partitions/{partition}");

        let response = self
            .http_client
            .request(&self.service)
            .path(request_path.as_str())
            .delete()
            .await
            .map_err(DistributedStorageError::from)?;

        if response.status_code() != 204 {
            Err(DistributedStorageError::from(response))
        } else {
            Ok(())
        }
    }
}