node_flow/context/storage/shared_storage/design.rs
1use std::ops::{Deref, DerefMut};
2
3/// Provides type-based shared storage for concurrent environments.
4///
5/// `SharedStorage` is a type-based shared storage.
6/// Items in this storage are be shared with all other branches.
7/// It allows storing and retrieving values by their type.
8/// Each type `T` has at most **one instance** stored at a time.
9///
10/// This trait is designed for use in systems that require shared, concurrent
11/// access to arbitrary state.
12///
13/// Unlike [`LocalStorage`](crate::context::storage::LocalStorage) implementors of this trait
14/// are able to safely manage data shared across branches.
15///
16/// # Design
17///
18/// Implementations are expected to handle concurrency correctly (for example via
19/// `RwLock`, `Mutex`, or some lock-free data structure) and to return appropriate
20/// guard objects implementing [`Deref`] or [`DerefMut`] for temporary access.
21///
22/// # Examples
23/// ```
24/// use std::sync::{Arc, RwLock};
25/// use std::future::Future;
26/// use std::ops::{Deref, DerefMut};
27/// use node_flow::context::storage::SharedStorage;
28///
29/// // Always empty storage (example only)
30/// struct ExampleSharedStorage;
31/// // Guard for get and get_mut
32/// struct Guard<T>(T);
33/// impl<T> Deref for Guard<T> // ...
34/// # {
35/// # type Target = T;
36/// #
37/// # fn deref(&self) -> &Self::Target {
38/// # unreachable!()
39/// # }
40/// # }
41/// impl<T> DerefMut for Guard<T> // ...
42/// # {
43/// # fn deref_mut(&mut self) -> &mut Self::Target {
44/// # unreachable!()
45/// # }
46/// # }
47///
48/// impl SharedStorage for ExampleSharedStorage {
49/// fn get<T>(&self) -> impl Future<Output = Option<impl Deref<Target = T>>> + Send {
50/// async { None::<Guard<T>> }
51/// }
52///
53/// fn get_mut<T>(&mut self) -> impl Future<Output = Option<impl DerefMut<Target = T>>> + Send {
54/// async { None::<Guard<T>> }
55/// }
56///
57/// fn insert<T>(&mut self, _val: T) -> impl Future<Output = Option<T>> + Send {
58/// async { None }
59/// }
60///
61/// fn insert_with_if_absent<T, E>(
62/// &self,
63/// fut: impl Future<Output = Result<T, E>> + Send,
64/// ) -> impl Future<Output = Result<(), E>> + Send {
65/// async {
66/// let _ = fut.await;
67/// Ok(())
68/// }
69/// }
70///
71/// fn remove<T>(&mut self) -> impl Future<Output = Option<T>> + Send {
72/// async { None }
73/// }
74/// }
75/// ```
76pub trait SharedStorage {
77 /// Gets reference of a value with type `T` from storage if it is present.
78 ///
79 /// # Examples
80 /// ```
81 /// # tokio::runtime::Builder::new_current_thread()
82 /// # .enable_all()
83 /// # .build()
84 /// # .unwrap()
85 /// # .block_on(async {
86 /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
87 /// # type ExampleStorage = SharedStorageImpl;
88 /// use std::ops::Deref;
89 /// #[derive(Debug, PartialEq, Eq)]
90 /// struct ExampleValue(u8);
91 /// let mut storage = ExampleStorage::new();
92 ///
93 /// let _ = storage.insert(ExampleValue(5u8)).await;
94 /// let guard = storage.get().await;
95 /// let result: Option<&ExampleValue> = guard.as_deref();
96 /// assert_eq!(result, Some(&ExampleValue(5u8)));
97 /// let guard = storage.get().await;
98 /// let result: Option<&u16> = guard.as_deref();
99 /// assert_eq!(result, None);
100 /// # });
101 /// ```
102 fn get<T>(&self) -> impl Future<Output = Option<impl Deref<Target = T>>> + Send
103 where
104 T: 'static;
105
106 /// Gets mutable reference of a value with type `T` from storage if it is present.
107 ///
108 /// # Examples
109 /// ```
110 /// # tokio::runtime::Builder::new_current_thread()
111 /// # .enable_all()
112 /// # .build()
113 /// # .unwrap()
114 /// # .block_on(async {
115 /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
116 /// # type ExampleStorage = SharedStorageImpl;
117 /// use std::ops::Deref;
118 /// #[derive(Debug, PartialEq, Eq)]
119 /// struct ExampleValue(u8);
120 /// let mut storage = ExampleStorage::new();
121 ///
122 /// let _ = storage.insert(ExampleValue(5u8)).await;
123 /// if let Some(mut val) = storage.get_mut::<ExampleValue>().await {
124 /// val.0 = 15u8;
125 /// }
126 /// let guard = storage.get().await;
127 /// let result: Option<&ExampleValue> = guard.as_deref();
128 /// assert_eq!(result, Some(&ExampleValue(15u8)));
129 /// # });
130 /// ```
131 fn get_mut<T>(&mut self) -> impl Future<Output = Option<impl DerefMut<Target = T>>> + Send
132 where
133 T: 'static;
134
135 /// Inserts value with type `T` to storage and returns the value that was there previously if it was there.
136 ///
137 /// # Examples
138 /// ```
139 /// # tokio::runtime::Builder::new_current_thread()
140 /// # .enable_all()
141 /// # .build()
142 /// # .unwrap()
143 /// # .block_on(async {
144 /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
145 /// # type ExampleStorage = SharedStorageImpl;
146 /// use std::ops::Deref;
147 /// #[derive(Debug, PartialEq, Eq)]
148 /// struct ExampleValue(u8);
149 /// let mut storage = ExampleStorage::new();
150 ///
151 /// let result = storage.insert(ExampleValue(5u8)).await;
152 /// assert_eq!(result, None);
153 /// let _ = storage.insert(ExampleValue(15u8)).await;
154 /// let result = storage.insert(ExampleValue(25u8)).await;
155 /// assert_eq!(result, Some(ExampleValue(15u8)));
156 /// let guard = storage.get().await;
157 /// let result: Option<&ExampleValue> = guard.as_deref();
158 /// assert_eq!(result, Some(&ExampleValue(25u8)));
159 /// # });
160 /// ```
161 fn insert<T>(&mut self, val: T) -> impl Future<Output = Option<T>> + Send
162 where
163 T: Send + Sync + 'static;
164
165 /// Inserts value with type `T` to storage if it doesn't contain it.
166 ///
167 /// # Examples
168 /// ```
169 /// # tokio::runtime::Builder::new_current_thread()
170 /// # .enable_all()
171 /// # .build()
172 /// # .unwrap()
173 /// # .block_on(async {
174 /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
175 /// # type ExampleStorage = SharedStorageImpl;
176 /// use std::ops::Deref;
177 /// #[derive(Debug, PartialEq, Eq)]
178 /// struct ExampleValue(u8);
179 /// let mut storage = ExampleStorage::new();
180 ///
181 /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(5u8)) }).await.unwrap();
182 /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(15u8)) }).await.unwrap();
183 /// let result = storage.remove().await;
184 /// assert_eq!(result, Some(ExampleValue(5u8)));
185 /// storage.insert_with_if_absent(async { Ok::<_, ()>(ExampleValue(25u8)) }).await.unwrap();
186 /// let result = storage.remove().await;
187 /// assert_eq!(result, Some(ExampleValue(25u8)));
188 /// # });
189 /// ```
190 fn insert_with_if_absent<T, E>(
191 &self,
192 fut: impl Future<Output = Result<T, E>> + Send,
193 ) -> impl Future<Output = Result<(), E>> + Send
194 where
195 T: Send + Sync + 'static,
196 E: Send;
197
198 /// Removes and returns value with type `T` from storage if it is present.
199 ///
200 /// # Examples
201 /// ```
202 /// # tokio::runtime::Builder::new_current_thread()
203 /// # .enable_all()
204 /// # .build()
205 /// # .unwrap()
206 /// # .block_on(async {
207 /// # use node_flow::context::storage::{SharedStorage, shared_storage::SharedStorageImpl};
208 /// # type ExampleStorage = SharedStorageImpl;
209 /// use std::ops::Deref;
210 /// #[derive(Debug, PartialEq, Eq)]
211 /// struct ExampleValue(u8);
212 /// let mut storage = ExampleStorage::new();
213 ///
214 /// let result = storage.insert(ExampleValue(5u8)).await;
215 /// assert_eq!(result, None);
216 /// let result = storage.remove().await;
217 /// assert_eq!(result, Some(ExampleValue(5u8)));
218 /// let result = storage.remove::<ExampleValue>().await;
219 /// assert_eq!(result, None);
220 /// # });
221 /// ```
222 fn remove<T>(&mut self) -> impl Future<Output = Option<T>> + Send
223 where
224 T: 'static;
225}