pdk_data_storage_lib/
lib.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Data Storage Library
6//!
7//! This library provides data storage functionality with support for:
8//!
9//! - Local and distributed data storage
10//! - Support for CAS (Compare-And-Swap) operations
11//! - Configurable storage modes (Always, Absent, CAS)
12//! - Asynchronous API for high-performance applications
13//!
14//! ## Features
15//! - `ll`: USE AT OWN RISK: low-level items that may change without notice. Exposes the underlying implementation of each storage type.
16
17mod local;
18
19mod distributed;
20
21#[cfg(feature = "ll")]
22/// "Low Level" implementations of each storage type.
23pub mod ll {
24    /// In memory storage implementation.
25    pub mod local {
26        pub use crate::local::*;
27    }
28
29    /// External storage implementation.
30    pub mod distributed {
31        pub use crate::distributed::*;
32    }
33}
34
35use pdk_core::classy::extract::context::ConfigureContext;
36use pdk_core::classy::extract::{Extract, FromContext};
37use pdk_core::logger;
38use serde::{de::DeserializeOwned, Serialize};
39use std::rc::Rc;
40use thiserror::Error;
41use url::form_urlencoded;
42
43use crate::distributed::DistributedStorage;
44use crate::local::LocalStorage;
45
46/// Defines the behavior for store operations.
47///
48/// This enum specifies how a store operation should behave when a key already exists
49/// in the storage system.
50#[derive(PartialEq, Eq, Debug, Clone)]
51pub enum StoreMode {
52    /// Indicates that the store operation should always succeed, overwriting any existing value.
53    ///
54    /// This mode is useful when you want to unconditionally update a value regardless
55    /// of its current state.
56    Always,
57    /// Indicates that the store operation should succeed only if no value was previously stored.
58    ///
59    /// This mode is useful for implementing "set if not exists" semantics, ensuring
60    /// that values are only written once.
61    Absent,
62    /// Indicates that the store operation should succeed only if the stored value matches the provided CAS.
63    ///
64    /// This mode is useful for implementing optimistic concurrency control, ensuring
65    /// that updates only succeed if the value hasn't been modified by another operation.
66    Cas(String),
67}
68
69/// Errors that can occur during data storage operations.
70///
71/// This enum represents all possible error conditions that can arise when
72/// performing data storage operations.
73#[derive(Debug, Error)]
74#[non_exhaustive]
75pub enum DataStorageError {
76    /// Indicates provided CAS value doesn't match current version of the stored value.
77    #[error("CAS mismatch.")]
78    CasMismatch,
79    /// Indicates the requested operation failed due to a serialization error.
80    #[error("Serialization error: {0}.")]
81    Serialization(#[from] bincode::Error),
82    /// Indicates the requested operation failed due to a CAS parse error.
83    #[error("CAS parse error: {0}.")]
84    CasParseError(#[from] std::num::ParseIntError),
85    /// Indicates the operation timed out.
86    #[error("Timeout.")]
87    Timeout,
88    /// Indicates an error occurred while performing an http client call.
89    #[error("HTTP Client Error.")]
90    HttpClient,
91    /// Indicates an unexpected error occurred.
92    #[error("Unexpected error: {0}.")]
93    Unexpected(String),
94}
95
96/// Errors that can occur when building data storage instances.
97///
98/// This enum represents error conditions that can arise when creating
99/// data storage instances using the builder pattern.
100#[derive(Debug, Error)]
101#[non_exhaustive]
102pub enum DataStorageBuilderError {
103    /// Local storage is required but not available in the current context.
104    #[error("Local storage not available")]
105    LocalStorageRequired,
106
107    /// Policy metadata is required but not available in the current context.
108    #[error("Policy metadata not available")]
109    MetadataRequired,
110}
111
112impl From<crate::local::LocalStorageError> for DataStorageError {
113    fn from(error: crate::local::LocalStorageError) -> Self {
114        match error {
115            crate::local::LocalStorageError::CasMismatch => DataStorageError::CasMismatch,
116            _ => DataStorageError::Unexpected(error.to_string()),
117        }
118    }
119}
120
121impl From<crate::distributed::DistributedStorageError> for DataStorageError {
122    fn from(error: crate::distributed::DistributedStorageError) -> Self {
123        match error {
124            crate::distributed::DistributedStorageError::CasMismatch => {
125                DataStorageError::CasMismatch
126            }
127            crate::distributed::DistributedStorageError::Timeout => DataStorageError::Timeout,
128            crate::distributed::DistributedStorageError::HttpClient(_) => {
129                DataStorageError::HttpClient
130            }
131            error => DataStorageError::Unexpected(error.to_string()),
132        }
133    }
134}
135
136/// A trait for data storage operations that can be implemented by different storage backends.
137///
138/// This trait defines the core interface for data storage operations, allowing
139/// implementations to use different storage backends (local, distributed) while
140/// providing a consistent API.
141#[allow(async_fn_in_trait)]
142pub trait DataStorage {
143    /// Returns all keys currently stored in the store.
144    async fn get_keys(&self) -> Result<Vec<String>, DataStorageError>;
145
146    /// Stores a serializable item for a given key using the provided [`StoreMode`].
147    async fn store<T: Serialize>(
148        &self,
149        key: &str,
150        mode: &StoreMode,
151        item: &T,
152    ) -> Result<(), DataStorageError>;
153
154    /// Retrieves and deserializes the value for a given key, returning the value and its CAS
155    /// string if present. Returns `Ok(None)` when the key does not exist.
156    async fn get<T: DeserializeOwned>(
157        &self,
158        key: &str,
159    ) -> Result<Option<(T, String)>, DataStorageError>;
160
161    /// Removes the item identified by the provided key from this storage instance.
162    async fn delete(&self, key: &str) -> Result<(), DataStorageError>;
163
164    /// Removes all items from this storage instance.
165    async fn delete_all(&self) -> Result<(), DataStorageError>;
166}
167
168/// A local data storage implementation that stores data in memory.
169///
170/// This implementation uses local shared data storage for high-performance
171/// in-memory operations within a single node.
172pub struct LocalDataStorage {
173    storage: crate::local::SharedData,
174    namespace: String,
175}
176
177impl LocalDataStorage {
178    /// Creates a new local data storage instance using the give storage and namespace as unique identifier.
179    pub(crate) fn new(storage: crate::local::SharedData, namespace: String) -> Self {
180        Self { storage, namespace }
181    }
182
183    fn convert_store_mode(
184        &self,
185        mode: &StoreMode,
186    ) -> Result<crate::local::StoreMode, DataStorageError> {
187        match mode {
188            StoreMode::Always => Ok(crate::local::StoreMode::Always),
189            StoreMode::Absent => Ok(crate::local::StoreMode::Absent),
190            StoreMode::Cas(cas_str) => {
191                let cas: u32 = cas_str.parse()?;
192                Ok(crate::local::StoreMode::Cas(cas))
193            }
194        }
195    }
196
197    fn namespaced_key(&self, key: &str) -> String {
198        format!("{}:{}", self.namespace, key)
199    }
200}
201
202impl DataStorage for LocalDataStorage {
203    async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
204        let all_keys = self.storage.keys();
205        let namespace_prefix = format!("{}:", self.namespace);
206
207        // Filter keys to only include those that belong to this namespace
208        let filtered_keys: Vec<String> = all_keys
209            .into_iter()
210            .filter(|key| key.starts_with(&namespace_prefix))
211            .map(|key| {
212                // Remove the namespace prefix to return just the key name
213                key.strip_prefix(&namespace_prefix)
214                    .unwrap_or(&key)
215                    .to_string()
216            })
217            .collect();
218
219        Ok(filtered_keys)
220    }
221
222    async fn store<T: Serialize>(
223        &self,
224        key: &str,
225        mode: &StoreMode,
226        item: &T,
227    ) -> Result<(), DataStorageError> {
228        let serialized = bincode::serialize(item)?;
229        let local_mode = self.convert_store_mode(mode)?;
230        let namespaced_key = self.namespaced_key(key);
231        self.storage.set(&namespaced_key, &serialized, local_mode)?;
232        Ok(())
233    }
234
235    async fn get<T: DeserializeOwned>(
236        &self,
237        key: &str,
238    ) -> Result<Option<(T, String)>, DataStorageError> {
239        let namespaced_key = self.namespaced_key(key);
240        match self.storage.get(&namespaced_key)? {
241            Some((data, cas)) => {
242                let deserialized: T =
243                    bincode::deserialize(&data).map_err(DataStorageError::from)?;
244                Ok(Some((deserialized, cas.to_string())))
245            }
246            None => Ok(None),
247        }
248    }
249
250    async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
251        let namespaced_key = self.namespaced_key(key);
252        self.storage.delete(&namespaced_key)?;
253        Ok(())
254    }
255
256    async fn delete_all(&self) -> Result<(), DataStorageError> {
257        let all_keys = self.storage.keys();
258        let namespace_prefix = format!("{}:", self.namespace);
259
260        // Only delete keys that belong to this namespace
261        for key in all_keys {
262            if key.starts_with(&namespace_prefix) {
263                self.storage.delete(&key)?;
264            }
265        }
266        Ok(())
267    }
268}
269
270/// A distributed data storage implementation that stores data across multiple nodes.
271pub struct RemoteDataStorage {
272    storage: Rc<crate::distributed::DistributedStorageClient>,
273    sanitized_store: String,
274    sanitized_partition: String,
275    ttl_millis: u32,
276}
277
278impl RemoteDataStorage {
279    /// Creates a high level data storage instance for the given `store`/`partition`,
280    /// using `storage` and a default TTL in milliseconds.
281    pub(crate) fn new(
282        storage: Rc<crate::distributed::DistributedStorageClient>,
283        store: String,
284        partition: String,
285        ttl_millis: u32,
286    ) -> Self {
287        // Sanitize store and partition names using form_urlencoded
288        let sanitized_store = form_urlencoded::byte_serialize(store.as_bytes()).collect();
289        let sanitized_partition = form_urlencoded::byte_serialize(partition.as_bytes()).collect();
290        Self {
291            storage,
292            sanitized_store,
293            sanitized_partition,
294            ttl_millis,
295        }
296    }
297
298    fn convert_store_mode(&self, mode: &StoreMode) -> crate::distributed::StoreMode {
299        match mode {
300            StoreMode::Always => crate::distributed::StoreMode::Always,
301            StoreMode::Absent => crate::distributed::StoreMode::Absent,
302            StoreMode::Cas(cas_str) => crate::distributed::StoreMode::Cas(cas_str.clone()),
303        }
304    }
305
306    fn sanitize_key(&self, key: &str) -> String {
307        form_urlencoded::byte_serialize(key.as_bytes()).collect()
308    }
309}
310
311impl DataStorage for RemoteDataStorage {
312    async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
313        // Try to get keys, if store doesn't exist, return empty list
314        match self
315            .storage
316            .get_keys(&self.sanitized_store, &self.sanitized_partition)
317            .await
318        {
319            Ok(keys) => {
320                // Decode the keys to return them in the same format as they were stored
321                let decoded_keys: Vec<String> = keys
322                    .into_iter()
323                    .filter_map(|encoded_key| {
324                        let decoded = form_urlencoded::parse(encoded_key.as_bytes())
325                            .next()
326                            .map(|(key, _)| key.into_owned());
327
328                        if decoded.is_none() {
329                            logger::debug!("Key not URL-encoded or decode failed: {encoded_key}");
330                        }
331
332                        decoded
333                    })
334                    .collect();
335
336                Ok(decoded_keys)
337            }
338            Err(e) => {
339                logger::warn!("Error getting keys: {e}");
340                Ok(vec![])
341            }
342        }
343    }
344
345    async fn store<T: Serialize>(
346        &self,
347        key: &str,
348        mode: &StoreMode,
349        item: &T,
350    ) -> Result<(), DataStorageError> {
351        let serialized = bincode::serialize(item)?;
352        let distributed_mode = self.convert_store_mode(mode);
353        let sanitized_key = self.sanitize_key(key);
354
355        // Try to store first
356        match self
357            .storage
358            .store(
359                &self.sanitized_store,
360                &self.sanitized_partition,
361                &sanitized_key,
362                &distributed_mode,
363                &serialized,
364            )
365            .await
366        {
367            Ok(()) => Ok(()),
368            Err(crate::distributed::DistributedStorageError::StoreNotFound) => {
369                // Store doesn't exist, create it and retry
370                let store = crate::distributed::Store::new(
371                    self.sanitized_store.clone(),
372                    Some(self.ttl_millis),
373                    None,
374                );
375
376                // Try to create the store, ignore if it already exists
377                if let Err(e) = self.storage.upsert_store(&store).await {
378                    logger::warn!("Error creating store: {e}");
379                }
380
381                // Try storing again
382                self.storage
383                    .store(
384                        &self.sanitized_store,
385                        &self.sanitized_partition,
386                        &sanitized_key,
387                        &distributed_mode,
388                        &serialized,
389                    )
390                    .await?;
391                Ok(())
392            }
393            Err(e) => Err(e.into()), // Other errors, propagate them
394        }
395    }
396
397    async fn get<T: DeserializeOwned>(
398        &self,
399        key: &str,
400    ) -> Result<Option<(T, String)>, DataStorageError> {
401        let sanitized_key = self.sanitize_key(key);
402        match self
403            .storage
404            .get(
405                &self.sanitized_store,
406                &self.sanitized_partition,
407                &sanitized_key,
408            )
409            .await
410        {
411            Ok((data, cas)) => {
412                let deserialized: T =
413                    bincode::deserialize(&data).map_err(DataStorageError::from)?;
414                Ok(Some((deserialized, cas)))
415            }
416            Err(crate::distributed::DistributedStorageError::KeyNotFound) => {
417                logger::debug!("Key not found: {key}");
418                Ok(None)
419            }
420            Err(e) => {
421                logger::error!("Error getting value for key {key}: {e:?}");
422                Err(e.into())
423            }
424        }
425    }
426
427    async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
428        // Try to delete, ignore if store doesn't exist
429        let sanitized_key = self.sanitize_key(key);
430        if let Err(e) = self
431            .storage
432            .delete(
433                &self.sanitized_store,
434                &self.sanitized_partition,
435                &sanitized_key,
436            )
437            .await
438        {
439            logger::warn!("Error deleting key {key}: {e}");
440        }
441        Ok(())
442    }
443
444    async fn delete_all(&self) -> Result<(), DataStorageError> {
445        // Try to delete partition, ignore if store doesn't exist
446        if let Err(e) = self
447            .storage
448            .delete_partition(&self.sanitized_store, &self.sanitized_partition)
449            .await
450        {
451            logger::warn!("Error deleting partition: {e}");
452        }
453        Ok(())
454    }
455}
456
457/// Builder for creating data storage instances.
458///
459/// Provides methods to create local and distributed storage instances with
460/// configurable settings. The builder pattern allows for flexible configuration
461/// of storage behavior.
462///
463/// # Examples
464///
465/// ```rust
466/// # use data_storage_lib::{DataStorageBuilder, DataStorage};
467/// # async fn example(builder: DataStorageBuilder) {
468/// // Create a local storage instance
469/// let local_storage = builder.local("my-local-storage");
470///
471/// // Create a distributed storage instance with 60-second TTL
472/// let remote_storage = builder.remote("my-remote-storage", 60000);
473/// # }
474/// ```
475pub struct DataStorageBuilder {
476    prefix: String,
477    shared_data: Rc<crate::local::SharedData>,
478    distributed_storage: Option<Rc<crate::distributed::DistributedStorageClient>>,
479}
480/// DataStorageBuilder can be injected in your configuration function.
481/// ```rust
482/// #[entrypoint]
483/// async fn configure(
484///     launcher: Launcher,
485///     store_builder: DataStorageBuilder,
486///     Configuration(configuration): Configuration,
487/// ) -> anyhow::Result<()> {
488/// }
489/// ```
490impl FromContext<ConfigureContext> for DataStorageBuilder {
491    type Error = DataStorageBuilderError;
492
493    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
494        // Extract local storage (required)
495        let shared_data: crate::local::SharedData = context
496            .extract()
497            .map_err(|_| DataStorageBuilderError::LocalStorageRequired)?;
498        // Extract distributed storage (optional - will be None if not available)
499        let distributed_storage: Result<crate::distributed::DistributedStorageClient, _> =
500            context.extract();
501        // Extract metadata for policy isolation
502        let metadata: pdk_core::policy_context::api::Metadata = context
503            .extract()
504            .map_err(|_| DataStorageBuilderError::MetadataRequired)?;
505
506        let prefix = format!(
507            "isolated-storage-{}-{}",
508            metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
509        );
510
511        pdk_core::logger::info!(
512            "DataStorageBuilder: creating prefix '{}' for policy '{}' in namespace '{}'",
513            prefix,
514            metadata.policy_metadata.policy_name,
515            metadata.policy_metadata.policy_namespace
516        );
517
518        Ok(DataStorageBuilder {
519            prefix,
520            shared_data: Rc::new(shared_data),
521            distributed_storage: distributed_storage.ok().map(Rc::new),
522        })
523    }
524}
525
526impl DataStorageBuilder {
527    /// Indicates that storage should not be isolated to a single policy.
528    /// The resulting state will be shared across policy instances that use
529    /// the same storage ID.
530    pub fn shared(mut self) -> Self {
531        self.prefix = "shared-storage".to_string();
532        self
533    }
534
535    /// Creates a local data storage instance identified by the key.
536    pub fn local<T: Into<String>>(&self, key: T) -> LocalDataStorage {
537        let key_str = key.into();
538        // Create a truly unique namespace by combining prefix and key
539        // This ensures each policy gets its own isolated storage
540        let namespace = format!("{}-{}", self.prefix, key_str);
541
542        pdk_core::logger::info!(
543            "DataStorageBuilder::local: creating namespace '{}' with prefix '{}' and key '{}'",
544            namespace,
545            self.prefix,
546            key_str
547        );
548
549        LocalDataStorage::new((*self.shared_data).clone(), namespace)
550    }
551
552    /// Creates a distributed data storage instance identified by the provided key and ttl in milliseconds.
553    ///
554    /// **Note**: To make use of the remote mode of the data storage your Flex Gateway must have [Shared Storage](https://docs.mulesoft.com/gateway/latest/flex-conn-shared-storage-config) configured.
555    ///
556    /// # Panics
557    ///
558    /// Panics if distributed storage is not available in the current context.
559    /// Ensure that distributed storage is properly configured before calling this method.
560    pub fn remote<T: Into<String>>(&self, key: T, ttl_millis: u32) -> RemoteDataStorage {
561        let key_str = key.into();
562        let storage = self
563            .distributed_storage
564            .as_ref()
565            .expect("Distributed storage not available - check if it's configured");
566        RemoteDataStorage::new(Rc::clone(storage), key_str.clone(), key_str, ttl_millis)
567    }
568}