1mod local;
18
19mod distributed;
20
21#[cfg(feature = "ll")]
22pub mod ll {
24 pub mod local {
26 pub use crate::local::*;
27 }
28
29 pub mod distributed {
31 pub use crate::distributed::*;
32 }
33}
34
35use pdk_core::classy::extract::context::ConfigureContext;
36use pdk_core::classy::extract::{Extract, FromContext};
37use pdk_core::logger;
38use serde::{de::DeserializeOwned, Serialize};
39use std::rc::Rc;
40use thiserror::Error;
41use url::form_urlencoded;
42
43use crate::distributed::DistributedStorage;
44use crate::local::LocalStorage;
45
46#[derive(PartialEq, Eq, Debug, Clone)]
51pub enum StoreMode {
52 Always,
57 Absent,
62 Cas(String),
67}
68
69#[derive(Debug, Error)]
74#[non_exhaustive]
75pub enum DataStorageError {
76 #[error("CAS mismatch.")]
78 CasMismatch,
79 #[error("Serialization error: {0}.")]
81 Serialization(#[from] bincode::Error),
82 #[error("CAS parse error: {0}.")]
84 CasParseError(#[from] std::num::ParseIntError),
85 #[error("Timeout.")]
87 Timeout,
88 #[error("HTTP Client Error.")]
90 HttpClient,
91 #[error("Unexpected error: {0}.")]
93 Unexpected(String),
94}
95
96#[derive(Debug, Error)]
101#[non_exhaustive]
102pub enum DataStorageBuilderError {
103 #[error("Local storage not available")]
105 LocalStorageRequired,
106
107 #[error("Policy metadata not available")]
109 MetadataRequired,
110}
111
112impl From<crate::local::LocalStorageError> for DataStorageError {
113 fn from(error: crate::local::LocalStorageError) -> Self {
114 match error {
115 crate::local::LocalStorageError::CasMismatch => DataStorageError::CasMismatch,
116 _ => DataStorageError::Unexpected(error.to_string()),
117 }
118 }
119}
120
121impl From<crate::distributed::DistributedStorageError> for DataStorageError {
122 fn from(error: crate::distributed::DistributedStorageError) -> Self {
123 match error {
124 crate::distributed::DistributedStorageError::CasMismatch => {
125 DataStorageError::CasMismatch
126 }
127 crate::distributed::DistributedStorageError::Timeout => DataStorageError::Timeout,
128 crate::distributed::DistributedStorageError::HttpClient(_) => {
129 DataStorageError::HttpClient
130 }
131 error => DataStorageError::Unexpected(error.to_string()),
132 }
133 }
134}
135
136#[allow(async_fn_in_trait)]
142pub trait DataStorage {
143 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError>;
145
146 async fn store<T: Serialize>(
148 &self,
149 key: &str,
150 mode: &StoreMode,
151 item: &T,
152 ) -> Result<(), DataStorageError>;
153
154 async fn get<T: DeserializeOwned>(
157 &self,
158 key: &str,
159 ) -> Result<Option<(T, String)>, DataStorageError>;
160
161 async fn delete(&self, key: &str) -> Result<(), DataStorageError>;
163
164 async fn delete_all(&self) -> Result<(), DataStorageError>;
166}
167
168pub struct LocalDataStorage {
173 storage: crate::local::SharedData,
174 namespace: String,
175}
176
177impl LocalDataStorage {
178 pub(crate) fn new(storage: crate::local::SharedData, namespace: String) -> Self {
180 Self { storage, namespace }
181 }
182
183 fn convert_store_mode(
184 &self,
185 mode: &StoreMode,
186 ) -> Result<crate::local::StoreMode, DataStorageError> {
187 match mode {
188 StoreMode::Always => Ok(crate::local::StoreMode::Always),
189 StoreMode::Absent => Ok(crate::local::StoreMode::Absent),
190 StoreMode::Cas(cas_str) => {
191 let cas: u32 = cas_str.parse()?;
192 Ok(crate::local::StoreMode::Cas(cas))
193 }
194 }
195 }
196
197 fn namespaced_key(&self, key: &str) -> String {
198 format!("{}:{}", self.namespace, key)
199 }
200}
201
202impl DataStorage for LocalDataStorage {
203 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
204 let all_keys = self.storage.keys();
205 let namespace_prefix = format!("{}:", self.namespace);
206
207 let filtered_keys: Vec<String> = all_keys
209 .into_iter()
210 .filter(|key| key.starts_with(&namespace_prefix))
211 .map(|key| {
212 key.strip_prefix(&namespace_prefix)
214 .unwrap_or(&key)
215 .to_string()
216 })
217 .collect();
218
219 Ok(filtered_keys)
220 }
221
222 async fn store<T: Serialize>(
223 &self,
224 key: &str,
225 mode: &StoreMode,
226 item: &T,
227 ) -> Result<(), DataStorageError> {
228 let serialized = bincode::serialize(item)?;
229 let local_mode = self.convert_store_mode(mode)?;
230 let namespaced_key = self.namespaced_key(key);
231 self.storage.set(&namespaced_key, &serialized, local_mode)?;
232 Ok(())
233 }
234
235 async fn get<T: DeserializeOwned>(
236 &self,
237 key: &str,
238 ) -> Result<Option<(T, String)>, DataStorageError> {
239 let namespaced_key = self.namespaced_key(key);
240 match self.storage.get(&namespaced_key)? {
241 Some((data, cas)) => {
242 let deserialized: T =
243 bincode::deserialize(&data).map_err(DataStorageError::from)?;
244 Ok(Some((deserialized, cas.to_string())))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
251 let namespaced_key = self.namespaced_key(key);
252 self.storage.delete(&namespaced_key)?;
253 Ok(())
254 }
255
256 async fn delete_all(&self) -> Result<(), DataStorageError> {
257 let all_keys = self.storage.keys();
258 let namespace_prefix = format!("{}:", self.namespace);
259
260 for key in all_keys {
262 if key.starts_with(&namespace_prefix) {
263 self.storage.delete(&key)?;
264 }
265 }
266 Ok(())
267 }
268}
269
270pub struct RemoteDataStorage {
272 storage: Rc<crate::distributed::DistributedStorageClient>,
273 sanitized_store: String,
274 sanitized_partition: String,
275 ttl_millis: u32,
276}
277
278impl RemoteDataStorage {
279 pub(crate) fn new(
282 storage: Rc<crate::distributed::DistributedStorageClient>,
283 store: String,
284 partition: String,
285 ttl_millis: u32,
286 ) -> Self {
287 let sanitized_store = form_urlencoded::byte_serialize(store.as_bytes()).collect();
289 let sanitized_partition = form_urlencoded::byte_serialize(partition.as_bytes()).collect();
290 Self {
291 storage,
292 sanitized_store,
293 sanitized_partition,
294 ttl_millis,
295 }
296 }
297
298 fn convert_store_mode(&self, mode: &StoreMode) -> crate::distributed::StoreMode {
299 match mode {
300 StoreMode::Always => crate::distributed::StoreMode::Always,
301 StoreMode::Absent => crate::distributed::StoreMode::Absent,
302 StoreMode::Cas(cas_str) => crate::distributed::StoreMode::Cas(cas_str.clone()),
303 }
304 }
305
306 fn sanitize_key(&self, key: &str) -> String {
307 form_urlencoded::byte_serialize(key.as_bytes()).collect()
308 }
309}
310
311impl DataStorage for RemoteDataStorage {
312 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
313 match self
315 .storage
316 .get_keys(&self.sanitized_store, &self.sanitized_partition)
317 .await
318 {
319 Ok(keys) => {
320 let decoded_keys: Vec<String> = keys
322 .into_iter()
323 .filter_map(|encoded_key| {
324 let decoded = form_urlencoded::parse(encoded_key.as_bytes())
325 .next()
326 .map(|(key, _)| key.into_owned());
327
328 if decoded.is_none() {
329 logger::debug!("Key not URL-encoded or decode failed: {encoded_key}");
330 }
331
332 decoded
333 })
334 .collect();
335
336 Ok(decoded_keys)
337 }
338 Err(e) => {
339 logger::warn!("Error getting keys: {e}");
340 Ok(vec![])
341 }
342 }
343 }
344
345 async fn store<T: Serialize>(
346 &self,
347 key: &str,
348 mode: &StoreMode,
349 item: &T,
350 ) -> Result<(), DataStorageError> {
351 let serialized = bincode::serialize(item)?;
352 let distributed_mode = self.convert_store_mode(mode);
353 let sanitized_key = self.sanitize_key(key);
354
355 match self
357 .storage
358 .store(
359 &self.sanitized_store,
360 &self.sanitized_partition,
361 &sanitized_key,
362 &distributed_mode,
363 &serialized,
364 )
365 .await
366 {
367 Ok(()) => Ok(()),
368 Err(crate::distributed::DistributedStorageError::StoreNotFound) => {
369 let store = crate::distributed::Store::new(
371 self.sanitized_store.clone(),
372 Some(self.ttl_millis),
373 None,
374 );
375
376 if let Err(e) = self.storage.upsert_store(&store).await {
378 logger::warn!("Error creating store: {e}");
379 }
380
381 self.storage
383 .store(
384 &self.sanitized_store,
385 &self.sanitized_partition,
386 &sanitized_key,
387 &distributed_mode,
388 &serialized,
389 )
390 .await?;
391 Ok(())
392 }
393 Err(e) => Err(e.into()), }
395 }
396
397 async fn get<T: DeserializeOwned>(
398 &self,
399 key: &str,
400 ) -> Result<Option<(T, String)>, DataStorageError> {
401 let sanitized_key = self.sanitize_key(key);
402 match self
403 .storage
404 .get(
405 &self.sanitized_store,
406 &self.sanitized_partition,
407 &sanitized_key,
408 )
409 .await
410 {
411 Ok((data, cas)) => {
412 let deserialized: T =
413 bincode::deserialize(&data).map_err(DataStorageError::from)?;
414 Ok(Some((deserialized, cas)))
415 }
416 Err(crate::distributed::DistributedStorageError::StoreNotFound) => {
417 logger::debug!("Store not found for key {key}, returning None");
418 Ok(None)
419 }
420 Err(crate::distributed::DistributedStorageError::KeyNotFound) => {
421 logger::debug!("Key not found: {key}");
422 Ok(None)
423 }
424 Err(e) => {
425 logger::error!("Error getting value for key {key}: {e:?}");
426 Err(e.into())
427 }
428 }
429 }
430
431 async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
432 let sanitized_key = self.sanitize_key(key);
434 if let Err(e) = self
435 .storage
436 .delete(
437 &self.sanitized_store,
438 &self.sanitized_partition,
439 &sanitized_key,
440 )
441 .await
442 {
443 logger::warn!("Error deleting key {key}: {e}");
444 }
445 Ok(())
446 }
447
448 async fn delete_all(&self) -> Result<(), DataStorageError> {
449 if let Err(e) = self
451 .storage
452 .delete_partition(&self.sanitized_store, &self.sanitized_partition)
453 .await
454 {
455 logger::warn!("Error deleting partition: {e}");
456 }
457 Ok(())
458 }
459}
460
461pub struct DataStorageBuilder {
480 prefix: String,
481 shared_data: Rc<crate::local::SharedData>,
482 distributed_storage: Option<Rc<crate::distributed::DistributedStorageClient>>,
483}
484impl FromContext<ConfigureContext> for DataStorageBuilder {
495 type Error = DataStorageBuilderError;
496
497 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
498 let shared_data: crate::local::SharedData = context
500 .extract()
501 .map_err(|_| DataStorageBuilderError::LocalStorageRequired)?;
502 let distributed_storage: Result<crate::distributed::DistributedStorageClient, _> =
504 context.extract();
505 let metadata: pdk_core::policy_context::api::Metadata = context
507 .extract()
508 .map_err(|_| DataStorageBuilderError::MetadataRequired)?;
509
510 let prefix = format!(
511 "isolated-storage-{}-{}",
512 metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
513 );
514
515 pdk_core::logger::info!(
516 "DataStorageBuilder: creating prefix '{}' for policy '{}' in namespace '{}'",
517 prefix,
518 metadata.policy_metadata.policy_name,
519 metadata.policy_metadata.policy_namespace
520 );
521
522 Ok(DataStorageBuilder {
523 prefix,
524 shared_data: Rc::new(shared_data),
525 distributed_storage: distributed_storage.ok().map(Rc::new),
526 })
527 }
528}
529
530impl DataStorageBuilder {
531 pub fn shared(mut self) -> Self {
535 self.prefix = "shared-storage".to_string();
536 self
537 }
538
539 pub fn local<T: Into<String>>(&self, key: T) -> LocalDataStorage {
541 let key_str = key.into();
542 let namespace = format!("{}-{}", self.prefix, key_str);
545
546 pdk_core::logger::info!(
547 "DataStorageBuilder::local: creating namespace '{}' with prefix '{}' and key '{}'",
548 namespace,
549 self.prefix,
550 key_str
551 );
552
553 LocalDataStorage::new((*self.shared_data).clone(), namespace)
554 }
555
556 pub fn remote<T: Into<String>>(&self, key: T, ttl_millis: u32) -> RemoteDataStorage {
565 let key_str = key.into();
566 let storage = self
567 .distributed_storage
568 .as_ref()
569 .expect("Distributed storage not available - check if it's configured");
570 RemoteDataStorage::new(Rc::clone(storage), key_str.clone(), key_str, ttl_millis)
571 }
572}