nemo_flow_adaptive/storage/
memory.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::RwLock;
10
11use crate::error::{AdaptiveError, Result};
12use crate::storage::traits::{StorageBackend, StorageBackendDyn};
13use crate::trie::accumulator::AccumulatorState;
14use crate::trie::serialization::TrieEnvelope;
15use crate::types::plan::ExecutionPlan;
16use crate::types::records::RunRecord;
17
18pub struct InMemoryBackend {
23 runs: RwLock<HashMap<String, Vec<RunRecord>>>,
24 plans: RwLock<HashMap<String, ExecutionPlan>>,
25 tries: RwLock<HashMap<String, TrieEnvelope>>,
26 accumulators: RwLock<HashMap<String, AccumulatorState>>,
27 observations: RwLock<HashMap<String, Vec<crate::acg::prompt_ir::PromptIR>>>,
28 stability: RwLock<HashMap<String, crate::acg::stability::StabilityAnalysisResult>>,
29}
30
31impl InMemoryBackend {
32 pub fn new() -> Self {
37 Self {
38 runs: RwLock::new(HashMap::new()),
39 plans: RwLock::new(HashMap::new()),
40 tries: RwLock::new(HashMap::new()),
41 accumulators: RwLock::new(HashMap::new()),
42 observations: RwLock::new(HashMap::new()),
43 stability: RwLock::new(HashMap::new()),
44 }
45 }
46}
47
48impl Default for InMemoryBackend {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl StorageBackend for InMemoryBackend {
55 fn store_run(&self, record: &RunRecord) -> impl Future<Output = Result<()>> + Send {
56 let result = {
57 let mut guard = self
58 .runs
59 .write()
60 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
61 match guard {
62 Ok(ref mut runs) => {
63 runs.entry(record.agent_id.clone())
64 .or_default()
65 .push(record.clone());
66 Ok(())
67 }
68 Err(error) => Err(error),
69 }
70 };
71 async move { result }
72 }
73
74 fn load_plan(
75 &self,
76 agent_id: &str,
77 ) -> impl Future<Output = Result<Option<ExecutionPlan>>> + Send {
78 let result = {
79 let guard = self
80 .plans
81 .read()
82 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
83 match guard {
84 Ok(ref plans) => Ok(plans.get(agent_id).cloned()),
85 Err(error) => Err(error),
86 }
87 };
88 async move { result }
89 }
90
91 fn list_runs(&self, agent_id: &str) -> impl Future<Output = Result<Vec<RunRecord>>> + Send {
92 let result = {
93 let guard = self
94 .runs
95 .read()
96 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
97 match guard {
98 Ok(ref runs) => Ok(runs.get(agent_id).cloned().unwrap_or_default()),
99 Err(error) => Err(error),
100 }
101 };
102 async move { result }
103 }
104}
105
106impl StorageBackendDyn for InMemoryBackend {
107 fn store_run_dyn<'a>(
108 &'a self,
109 record: &'a RunRecord,
110 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
111 Box::pin(self.store_run(record))
112 }
113
114 fn load_plan_dyn<'a>(
115 &'a self,
116 agent_id: &'a str,
117 ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionPlan>>> + Send + 'a>> {
118 Box::pin(self.load_plan(agent_id))
119 }
120
121 fn list_runs_dyn<'a>(
122 &'a self,
123 agent_id: &'a str,
124 ) -> Pin<Box<dyn Future<Output = Result<Vec<RunRecord>>> + Send + 'a>> {
125 Box::pin(self.list_runs(agent_id))
126 }
127
128 fn store_trie<'a>(
129 &'a self,
130 agent_id: &'a str,
131 envelope: &'a TrieEnvelope,
132 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
133 let result = {
134 let mut guard = self
135 .tries
136 .write()
137 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
138 match guard {
139 Ok(ref mut tries) => {
140 tries.insert(agent_id.to_string(), envelope.clone());
141 Ok(())
142 }
143 Err(error) => Err(error),
144 }
145 };
146 Box::pin(async move { result })
147 }
148
149 fn load_trie<'a>(
150 &'a self,
151 agent_id: &'a str,
152 ) -> Pin<Box<dyn Future<Output = Result<Option<TrieEnvelope>>> + Send + 'a>> {
153 let result = {
154 let guard = self
155 .tries
156 .read()
157 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
158 match guard {
159 Ok(ref tries) => Ok(tries.get(agent_id).cloned()),
160 Err(error) => Err(error),
161 }
162 };
163 Box::pin(async move { result })
164 }
165
166 fn store_accumulators<'a>(
167 &'a self,
168 agent_id: &'a str,
169 state: &'a AccumulatorState,
170 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
171 let result = {
172 let mut guard = self
173 .accumulators
174 .write()
175 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
176 match guard {
177 Ok(ref mut accumulators) => {
178 accumulators.insert(agent_id.to_string(), state.clone());
179 Ok(())
180 }
181 Err(error) => Err(error),
182 }
183 };
184 Box::pin(async move { result })
185 }
186
187 fn load_accumulators<'a>(
188 &'a self,
189 agent_id: &'a str,
190 ) -> Pin<Box<dyn Future<Output = Result<Option<AccumulatorState>>> + Send + 'a>> {
191 let result = {
192 let guard = self
193 .accumulators
194 .read()
195 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
196 match guard {
197 Ok(ref accumulators) => Ok(accumulators.get(agent_id).cloned()),
198 Err(error) => Err(error),
199 }
200 };
201 Box::pin(async move { result })
202 }
203
204 fn store_plan(&self, plan: &ExecutionPlan) -> Result<()> {
205 let mut guard = self
206 .plans
207 .write()
208 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")))?;
209 guard.insert(plan.agent_id.clone(), plan.clone());
210 Ok(())
211 }
212
213 fn store_observations<'a>(
214 &'a self,
215 agent_id: &'a str,
216 observations: &'a [crate::acg::prompt_ir::PromptIR],
217 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
218 let result = {
219 let mut guard = self
220 .observations
221 .write()
222 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
223 match guard {
224 Ok(ref mut observations_map) => {
225 observations_map.insert(agent_id.to_string(), observations.to_vec());
226 Ok(())
227 }
228 Err(error) => Err(error),
229 }
230 };
231 Box::pin(async move { result })
232 }
233
234 fn load_observations<'a>(
235 &'a self,
236 agent_id: &'a str,
237 ) -> Pin<
238 Box<dyn Future<Output = Result<Option<Vec<crate::acg::prompt_ir::PromptIR>>>> + Send + 'a>,
239 > {
240 let result = {
241 let guard = self
242 .observations
243 .read()
244 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
245 match guard {
246 Ok(ref observations_map) => Ok(observations_map.get(agent_id).cloned()),
247 Err(error) => Err(error),
248 }
249 };
250 Box::pin(async move { result })
251 }
252
253 fn store_stability<'a>(
254 &'a self,
255 agent_id: &'a str,
256 result: &'a crate::acg::stability::StabilityAnalysisResult,
257 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
258 let store_result = {
259 let mut guard = self
260 .stability
261 .write()
262 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
263 match guard {
264 Ok(ref mut stability_map) => {
265 stability_map.insert(agent_id.to_string(), result.clone());
266 Ok(())
267 }
268 Err(error) => Err(error),
269 }
270 };
271 Box::pin(async move { store_result })
272 }
273
274 fn load_stability<'a>(
275 &'a self,
276 agent_id: &'a str,
277 ) -> Pin<
278 Box<
279 dyn Future<Output = Result<Option<crate::acg::stability::StabilityAnalysisResult>>>
280 + Send
281 + 'a,
282 >,
283 > {
284 let result = {
285 let guard = self
286 .stability
287 .read()
288 .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
289 match guard {
290 Ok(ref stability_map) => Ok(stability_map.get(agent_id).cloned()),
291 Err(error) => Err(error),
292 }
293 };
294 Box::pin(async move { result })
295 }
296}
297
298#[cfg(test)]
299#[path = "../../tests/unit/storage_memory_internal_tests.rs"]
300mod tests;