node_flow/context/storage/shared_storage/
design.rs

1use std::ops::{Deref, DerefMut};
2
3/// Provides type-based shared storage for concurrent environments.
4///
5/// `SharedStorage` is a type-based shared storage.
6/// Items in this storage are be shared with all other branches.
7/// It allows storing and retrieving values by their type.
8/// Each type `T` has at most **one instance** stored at a time.
9///
10/// This trait is designed for use in systems that require shared, concurrent
11/// access to arbitrary state.
12///
13/// Unlike [`LocalStorage`](crate::context::storage::LocalStorage) implementors of this trait
14/// are able to safely manage data shared across branches.
15///
16/// # Design
17///
18/// Implementations are expected to handle concurrency correctly (for example via
19/// `RwLock`, `Mutex`, or some lock-free data structure) and to return appropriate
20/// guard objects implementing [`Deref`] or [`DerefMut`] for temporary access.
21///
22/// # Examples
23/// ```
24/// use std::sync::{Arc, RwLock};
25/// use std::future::Future;
26/// use std::ops::{Deref, DerefMut};
27/// use node_flow::context::storage::SharedStorage;
28///
29/// // Always empty storage (example only)
30/// struct ExampleSharedStorage;
31/// // Guard for get and get_mut
32/// struct Guard<T>(T);
33/// impl<T> Deref for Guard<T> // ...
34/// # {
35/// #     type Target = T;
36/// #
37/// #     fn deref(&self) -> &Self::Target {
38/// #         unreachable!()
39/// #     }
40/// # }
41/// impl<T> DerefMut for Guard<T> // ...
42/// # {
43/// #     fn deref_mut(&mut self) -> &mut Self::Target {
44/// #         unreachable!()
45/// #     }
46/// # }
47///
48/// impl SharedStorage for ExampleSharedStorage {
49///     fn get<T>(&self) -> impl Future<Output = Option<impl Deref<Target = T>>> + Send {
50///         async { None::<Guard<T>> }
51///     }
52///
53///     fn get_mut<T>(&mut self) -> impl Future<Output = Option<impl DerefMut<Target = T>>> + Send {
54///         async { None::<Guard<T>> }
55///     }
56///
57///     fn insert<T>(&mut self, _val: T) -> impl Future<Output = Option<T>> + Send {
58///         async { None }
59///     }
60///
61///     fn insert_with_if_absent<T, E>(
62///         &self,
63///         fut: impl Future<Output = Result<T, E>> + Send,
64///     ) -> impl Future<Output = Result<(), E>> + Send {
65///         async {
66///             let _ = fut.await;
67///             Ok(())
68///         }
69///     }
70///
71///     fn remove<T>(&mut self) -> impl Future<Output = Option<T>> + Send {
72///         async { None }
73///     }
74/// }
75/// ```
76pub trait SharedStorage {
77    /// Gets reference of a value with type `T` from storage if it is present.
78    ///
79    /// # Examples
80    /// ```
81    /// # tokio::runtime::Builder::new_current_thread()
82    /// #     .enable_all()
83    /// #     .build()
84    /// #     .unwrap()
85    /// #     .block_on(async {
86    /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
87    /// # type ExampleStorage = SharedStorageImpl;
88    /// use std::ops::Deref;
89    /// #[derive(Debug, PartialEq, Eq)]
90    /// struct ExampleValue(u8);
91    /// let mut storage = ExampleStorage::new();
92    ///
93    /// let _ = storage.insert(ExampleValue(5u8)).await;
94    /// let guard = storage.get().await;
95    /// let result: Option<&ExampleValue> = guard.as_deref();
96    /// assert_eq!(result, Some(&ExampleValue(5u8)));
97    /// let guard = storage.get().await;
98    /// let result: Option<&u16> = guard.as_deref();
99    /// assert_eq!(result, None);
100    /// # });
101    /// ```
102    fn get<T>(&self) -> impl Future<Output = Option<impl Deref<Target = T>>> + Send
103    where
104        T: 'static;
105
106    /// Gets mutable reference of a value with type `T` from storage if it is present.
107    ///
108    /// # Examples
109    /// ```
110    /// # tokio::runtime::Builder::new_current_thread()
111    /// #     .enable_all()
112    /// #     .build()
113    /// #     .unwrap()
114    /// #     .block_on(async {
115    /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
116    /// # type ExampleStorage = SharedStorageImpl;
117    /// use std::ops::Deref;
118    /// #[derive(Debug, PartialEq, Eq)]
119    /// struct ExampleValue(u8);
120    /// let mut storage = ExampleStorage::new();
121    ///
122    /// let _ = storage.insert(ExampleValue(5u8)).await;
123    /// if let Some(mut val) = storage.get_mut::<ExampleValue>().await {
124    ///     val.0 = 15u8;
125    /// }
126    /// let guard = storage.get().await;
127    /// let result: Option<&ExampleValue> = guard.as_deref();
128    /// assert_eq!(result, Some(&ExampleValue(15u8)));
129    /// # });
130    /// ```
131    fn get_mut<T>(&mut self) -> impl Future<Output = Option<impl DerefMut<Target = T>>> + Send
132    where
133        T: 'static;
134
135    /// Inserts value with type `T` to storage and returns the value that was there previously if it was there.
136    ///
137    /// # Examples
138    /// ```
139    /// # tokio::runtime::Builder::new_current_thread()
140    /// #     .enable_all()
141    /// #     .build()
142    /// #     .unwrap()
143    /// #     .block_on(async {
144    /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
145    /// # type ExampleStorage = SharedStorageImpl;
146    /// use std::ops::Deref;
147    /// #[derive(Debug, PartialEq, Eq)]
148    /// struct ExampleValue(u8);
149    /// let mut storage = ExampleStorage::new();
150    ///
151    /// let result = storage.insert(ExampleValue(5u8)).await;
152    /// assert_eq!(result, None);
153    /// let _ = storage.insert(ExampleValue(15u8)).await;
154    /// let result = storage.insert(ExampleValue(25u8)).await;
155    /// assert_eq!(result, Some(ExampleValue(15u8)));
156    /// let guard = storage.get().await;
157    /// let result: Option<&ExampleValue> = guard.as_deref();
158    /// assert_eq!(result, Some(&ExampleValue(25u8)));
159    /// # });
160    /// ```
161    fn insert<T>(&mut self, val: T) -> impl Future<Output = Option<T>> + Send
162    where
163        T: Send + Sync + 'static;
164
165    /// Inserts value with type `T` to storage if it doesn't contain it.
166    ///
167    /// # Examples
168    /// ```
169    /// # tokio::runtime::Builder::new_current_thread()
170    /// #     .enable_all()
171    /// #     .build()
172    /// #     .unwrap()
173    /// #     .block_on(async {
174    /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
175    /// # type ExampleStorage = SharedStorageImpl;
176    /// use std::ops::Deref;
177    /// #[derive(Debug, PartialEq, Eq)]
178    /// struct ExampleValue(u8);
179    /// let mut storage = ExampleStorage::new();
180    ///
181    /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(5u8)) }).await.unwrap();
182    /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(15u8)) }).await.unwrap();
183    /// let result = storage.remove().await;
184    /// assert_eq!(result, Some(ExampleValue(5u8)));
185    /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(25u8)) }).await.unwrap();
186    /// let result = storage.remove().await;
187    /// assert_eq!(result, Some(ExampleValue(25u8)));
188    /// # });
189    /// ```
190    fn insert_with_if_absent<T, E>(
191        &self,
192        fut: impl Future<Output = Result<T, E>> + Send,
193    ) -> impl Future<Output = Result<(), E>> + Send
194    where
195        T: Send + Sync + 'static,
196        E: Send;
197
198    /// Removes and returns value with type `T` from storage if it is present.
199    ///
200    /// # Examples
201    /// ```
202    /// # tokio::runtime::Builder::new_current_thread()
203    /// #     .enable_all()
204    /// #     .build()
205    /// #     .unwrap()
206    /// #     .block_on(async {
207    /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
208    /// # type ExampleStorage = SharedStorageImpl;
209    /// use std::ops::Deref;
210    /// #[derive(Debug, PartialEq, Eq)]
211    /// struct ExampleValue(u8);
212    /// let mut storage = ExampleStorage::new();
213    ///
214    /// let result = storage.insert(ExampleValue(5u8)).await;
215    /// assert_eq!(result, None);
216    /// let result = storage.remove().await;
217    /// assert_eq!(result, Some(ExampleValue(5u8)));
218    /// let result = storage.remove::<ExampleValue>().await;
219    /// assert_eq!(result, None);
220    /// # });
221    /// ```
222    fn remove<T>(&mut self) -> impl Future<Output = Option<T>> + Send
223    where
224        T: 'static;
225}