1mod local;
18
19mod distributed;
20
21#[cfg(feature = "ll")]
22pub mod ll {
24 pub mod local {
26 pub use crate::local::*;
27 }
28
29 pub mod distributed {
31 pub use crate::distributed::*;
32 }
33}
34
35use pdk_core::classy::extract::context::ConfigureContext;
36use pdk_core::classy::extract::{Extract, FromContext};
37use pdk_core::logger;
38use serde::{de::DeserializeOwned, Serialize};
39use std::rc::Rc;
40use thiserror::Error;
41use url::form_urlencoded;
42
43use crate::distributed::DistributedStorage;
44use crate::local::LocalStorage;
45
46#[derive(PartialEq, Eq, Debug, Clone)]
51pub enum StoreMode {
52 Always,
57 Absent,
62 Cas(String),
67}
68
69#[derive(Debug, Error)]
74#[non_exhaustive]
75pub enum DataStorageError {
76 #[error("CAS mismatch.")]
78 CasMismatch,
79 #[error("Serialization error: {0}.")]
81 Serialization(#[from] bincode::Error),
82 #[error("CAS parse error: {0}.")]
84 CasParseError(#[from] std::num::ParseIntError),
85 #[error("Timeout.")]
87 Timeout,
88 #[error("HTTP Client Error.")]
90 HttpClient,
91 #[error("Unexpected error: {0}.")]
93 Unexpected(String),
94}
95
96#[derive(Debug, Error)]
101#[non_exhaustive]
102pub enum DataStorageBuilderError {
103 #[error("Local storage not available")]
105 LocalStorageRequired,
106
107 #[error("Policy metadata not available")]
109 MetadataRequired,
110}
111
112impl From<crate::local::LocalStorageError> for DataStorageError {
113 fn from(error: crate::local::LocalStorageError) -> Self {
114 match error {
115 crate::local::LocalStorageError::CasMismatch => DataStorageError::CasMismatch,
116 _ => DataStorageError::Unexpected(error.to_string()),
117 }
118 }
119}
120
121impl From<crate::distributed::DistributedStorageError> for DataStorageError {
122 fn from(error: crate::distributed::DistributedStorageError) -> Self {
123 match error {
124 crate::distributed::DistributedStorageError::CasMismatch => {
125 DataStorageError::CasMismatch
126 }
127 crate::distributed::DistributedStorageError::Timeout => DataStorageError::Timeout,
128 crate::distributed::DistributedStorageError::HttpClient(_) => {
129 DataStorageError::HttpClient
130 }
131 error => DataStorageError::Unexpected(error.to_string()),
132 }
133 }
134}
135
136#[allow(async_fn_in_trait)]
142pub trait DataStorage {
143 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError>;
145
146 async fn store<T: Serialize>(
148 &self,
149 key: &str,
150 mode: &StoreMode,
151 item: &T,
152 ) -> Result<(), DataStorageError>;
153
154 async fn get<T: DeserializeOwned>(
157 &self,
158 key: &str,
159 ) -> Result<Option<(T, String)>, DataStorageError>;
160
161 async fn delete(&self, key: &str) -> Result<(), DataStorageError>;
163
164 async fn delete_all(&self) -> Result<(), DataStorageError>;
166}
167
168pub struct LocalDataStorage {
173 storage: crate::local::SharedData,
174 namespace: String,
175}
176
177impl LocalDataStorage {
178 pub(crate) fn new(storage: crate::local::SharedData, namespace: String) -> Self {
180 Self { storage, namespace }
181 }
182
183 fn convert_store_mode(
184 &self,
185 mode: &StoreMode,
186 ) -> Result<crate::local::StoreMode, DataStorageError> {
187 match mode {
188 StoreMode::Always => Ok(crate::local::StoreMode::Always),
189 StoreMode::Absent => Ok(crate::local::StoreMode::Absent),
190 StoreMode::Cas(cas_str) => {
191 let cas: u32 = cas_str.parse()?;
192 Ok(crate::local::StoreMode::Cas(cas))
193 }
194 }
195 }
196
197 fn namespaced_key(&self, key: &str) -> String {
198 format!("{}:{}", self.namespace, key)
199 }
200}
201
202impl DataStorage for LocalDataStorage {
203 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
204 let all_keys = self.storage.keys();
205 let namespace_prefix = format!("{}:", self.namespace);
206
207 let filtered_keys: Vec<String> = all_keys
209 .into_iter()
210 .filter(|key| key.starts_with(&namespace_prefix))
211 .map(|key| {
212 key.strip_prefix(&namespace_prefix)
214 .unwrap_or(&key)
215 .to_string()
216 })
217 .collect();
218
219 Ok(filtered_keys)
220 }
221
222 async fn store<T: Serialize>(
223 &self,
224 key: &str,
225 mode: &StoreMode,
226 item: &T,
227 ) -> Result<(), DataStorageError> {
228 let serialized = bincode::serialize(item)?;
229 let local_mode = self.convert_store_mode(mode)?;
230 let namespaced_key = self.namespaced_key(key);
231 self.storage.set(&namespaced_key, &serialized, local_mode)?;
232 Ok(())
233 }
234
235 async fn get<T: DeserializeOwned>(
236 &self,
237 key: &str,
238 ) -> Result<Option<(T, String)>, DataStorageError> {
239 let namespaced_key = self.namespaced_key(key);
240 match self.storage.get(&namespaced_key)? {
241 Some((data, cas)) => {
242 let deserialized: T =
243 bincode::deserialize(&data).map_err(DataStorageError::from)?;
244 Ok(Some((deserialized, cas.to_string())))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
251 let namespaced_key = self.namespaced_key(key);
252 self.storage.delete(&namespaced_key)?;
253 Ok(())
254 }
255
256 async fn delete_all(&self) -> Result<(), DataStorageError> {
257 let all_keys = self.storage.keys();
258 let namespace_prefix = format!("{}:", self.namespace);
259
260 for key in all_keys {
262 if key.starts_with(&namespace_prefix) {
263 self.storage.delete(&key)?;
264 }
265 }
266 Ok(())
267 }
268}
269
270pub struct RemoteDataStorage {
272 storage: Rc<crate::distributed::DistributedStorageClient>,
273 sanitized_store: String,
274 sanitized_partition: String,
275 ttl_millis: u32,
276}
277
278impl RemoteDataStorage {
279 pub(crate) fn new(
282 storage: Rc<crate::distributed::DistributedStorageClient>,
283 store: String,
284 partition: String,
285 ttl_millis: u32,
286 ) -> Self {
287 let sanitized_store = form_urlencoded::byte_serialize(store.as_bytes()).collect();
289 let sanitized_partition = form_urlencoded::byte_serialize(partition.as_bytes()).collect();
290 Self {
291 storage,
292 sanitized_store,
293 sanitized_partition,
294 ttl_millis,
295 }
296 }
297
298 fn convert_store_mode(&self, mode: &StoreMode) -> crate::distributed::StoreMode {
299 match mode {
300 StoreMode::Always => crate::distributed::StoreMode::Always,
301 StoreMode::Absent => crate::distributed::StoreMode::Absent,
302 StoreMode::Cas(cas_str) => crate::distributed::StoreMode::Cas(cas_str.clone()),
303 }
304 }
305
306 fn sanitize_key(&self, key: &str) -> String {
307 form_urlencoded::byte_serialize(key.as_bytes()).collect()
308 }
309}
310
311impl DataStorage for RemoteDataStorage {
312 async fn get_keys(&self) -> Result<Vec<String>, DataStorageError> {
313 match self
315 .storage
316 .get_keys(&self.sanitized_store, &self.sanitized_partition)
317 .await
318 {
319 Ok(keys) => {
320 let decoded_keys: Vec<String> = keys
322 .into_iter()
323 .filter_map(|encoded_key| {
324 let decoded = form_urlencoded::parse(encoded_key.as_bytes())
325 .next()
326 .map(|(key, _)| key.into_owned());
327
328 if decoded.is_none() {
329 logger::debug!("Key not URL-encoded or decode failed: {encoded_key}");
330 }
331
332 decoded
333 })
334 .collect();
335
336 Ok(decoded_keys)
337 }
338 Err(e) => {
339 logger::warn!("Error getting keys: {e}");
340 Ok(vec![])
341 }
342 }
343 }
344
345 async fn store<T: Serialize>(
346 &self,
347 key: &str,
348 mode: &StoreMode,
349 item: &T,
350 ) -> Result<(), DataStorageError> {
351 let serialized = bincode::serialize(item)?;
352 let distributed_mode = self.convert_store_mode(mode);
353 let sanitized_key = self.sanitize_key(key);
354
355 match self
357 .storage
358 .store(
359 &self.sanitized_store,
360 &self.sanitized_partition,
361 &sanitized_key,
362 &distributed_mode,
363 &serialized,
364 )
365 .await
366 {
367 Ok(()) => Ok(()),
368 Err(crate::distributed::DistributedStorageError::StoreNotFound) => {
369 let store = crate::distributed::Store::new(
371 self.sanitized_store.clone(),
372 Some(self.ttl_millis),
373 None,
374 );
375
376 if let Err(e) = self.storage.upsert_store(&store).await {
378 logger::warn!("Error creating store: {e}");
379 }
380
381 self.storage
383 .store(
384 &self.sanitized_store,
385 &self.sanitized_partition,
386 &sanitized_key,
387 &distributed_mode,
388 &serialized,
389 )
390 .await?;
391 Ok(())
392 }
393 Err(e) => Err(e.into()), }
395 }
396
397 async fn get<T: DeserializeOwned>(
398 &self,
399 key: &str,
400 ) -> Result<Option<(T, String)>, DataStorageError> {
401 let sanitized_key = self.sanitize_key(key);
402 match self
403 .storage
404 .get(
405 &self.sanitized_store,
406 &self.sanitized_partition,
407 &sanitized_key,
408 )
409 .await
410 {
411 Ok((data, cas)) => {
412 let deserialized: T =
413 bincode::deserialize(&data).map_err(DataStorageError::from)?;
414 Ok(Some((deserialized, cas)))
415 }
416 Err(crate::distributed::DistributedStorageError::KeyNotFound) => {
417 logger::debug!("Key not found: {key}");
418 Ok(None)
419 }
420 Err(e) => {
421 logger::error!("Error getting value for key {key}: {e:?}");
422 Err(e.into())
423 }
424 }
425 }
426
427 async fn delete(&self, key: &str) -> Result<(), DataStorageError> {
428 let sanitized_key = self.sanitize_key(key);
430 if let Err(e) = self
431 .storage
432 .delete(
433 &self.sanitized_store,
434 &self.sanitized_partition,
435 &sanitized_key,
436 )
437 .await
438 {
439 logger::warn!("Error deleting key {key}: {e}");
440 }
441 Ok(())
442 }
443
444 async fn delete_all(&self) -> Result<(), DataStorageError> {
445 if let Err(e) = self
447 .storage
448 .delete_partition(&self.sanitized_store, &self.sanitized_partition)
449 .await
450 {
451 logger::warn!("Error deleting partition: {e}");
452 }
453 Ok(())
454 }
455}
456
457pub struct DataStorageBuilder {
476 prefix: String,
477 shared_data: Rc<crate::local::SharedData>,
478 distributed_storage: Option<Rc<crate::distributed::DistributedStorageClient>>,
479}
480impl FromContext<ConfigureContext> for DataStorageBuilder {
491 type Error = DataStorageBuilderError;
492
493 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
494 let shared_data: crate::local::SharedData = context
496 .extract()
497 .map_err(|_| DataStorageBuilderError::LocalStorageRequired)?;
498 let distributed_storage: Result<crate::distributed::DistributedStorageClient, _> =
500 context.extract();
501 let metadata: pdk_core::policy_context::api::Metadata = context
503 .extract()
504 .map_err(|_| DataStorageBuilderError::MetadataRequired)?;
505
506 let prefix = format!(
507 "isolated-storage-{}-{}",
508 metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
509 );
510
511 pdk_core::logger::info!(
512 "DataStorageBuilder: creating prefix '{}' for policy '{}' in namespace '{}'",
513 prefix,
514 metadata.policy_metadata.policy_name,
515 metadata.policy_metadata.policy_namespace
516 );
517
518 Ok(DataStorageBuilder {
519 prefix,
520 shared_data: Rc::new(shared_data),
521 distributed_storage: distributed_storage.ok().map(Rc::new),
522 })
523 }
524}
525
526impl DataStorageBuilder {
527 pub fn shared(mut self) -> Self {
531 self.prefix = "shared-storage".to_string();
532 self
533 }
534
535 pub fn local<T: Into<String>>(&self, key: T) -> LocalDataStorage {
537 let key_str = key.into();
538 let namespace = format!("{}-{}", self.prefix, key_str);
541
542 pdk_core::logger::info!(
543 "DataStorageBuilder::local: creating namespace '{}' with prefix '{}' and key '{}'",
544 namespace,
545 self.prefix,
546 key_str
547 );
548
549 LocalDataStorage::new((*self.shared_data).clone(), namespace)
550 }
551
552 pub fn remote<T: Into<String>>(&self, key: T, ttl_millis: u32) -> RemoteDataStorage {
561 let key_str = key.into();
562 let storage = self
563 .distributed_storage
564 .as_ref()
565 .expect("Distributed storage not available - check if it's configured");
566 RemoteDataStorage::new(Rc::clone(storage), key_str.clone(), key_str, ttl_millis)
567 }
568}