Skip to main content

nemo_flow_adaptive/storage/
memory.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! In-memory adaptive storage backend.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::RwLock;
10
11use crate::error::{AdaptiveError, Result};
12use crate::storage::traits::{StorageBackend, StorageBackendDyn};
13use crate::trie::accumulator::AccumulatorState;
14use crate::trie::serialization::TrieEnvelope;
15use crate::types::plan::ExecutionPlan;
16use crate::types::records::RunRecord;
17
18/// In-memory backend storing adaptive state in process-local hash maps.
19///
20/// This backend is primarily useful for tests, examples, or deployments where
21/// adaptive state does not need to survive process restarts.
22pub struct InMemoryBackend {
23    runs: RwLock<HashMap<String, Vec<RunRecord>>>,
24    plans: RwLock<HashMap<String, ExecutionPlan>>,
25    tries: RwLock<HashMap<String, TrieEnvelope>>,
26    accumulators: RwLock<HashMap<String, AccumulatorState>>,
27    observations: RwLock<HashMap<String, Vec<crate::acg::prompt_ir::PromptIR>>>,
28    stability: RwLock<HashMap<String, crate::acg::stability::StabilityAnalysisResult>>,
29}
30
31impl InMemoryBackend {
32    /// Create an empty in-memory backend.
33    ///
34    /// # Returns
35    /// A new [`InMemoryBackend`] with empty storage maps.
36    pub fn new() -> Self {
37        Self {
38            runs: RwLock::new(HashMap::new()),
39            plans: RwLock::new(HashMap::new()),
40            tries: RwLock::new(HashMap::new()),
41            accumulators: RwLock::new(HashMap::new()),
42            observations: RwLock::new(HashMap::new()),
43            stability: RwLock::new(HashMap::new()),
44        }
45    }
46}
47
48impl Default for InMemoryBackend {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl StorageBackend for InMemoryBackend {
55    fn store_run(&self, record: &RunRecord) -> impl Future<Output = Result<()>> + Send {
56        let result = {
57            let mut guard = self
58                .runs
59                .write()
60                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
61            match guard {
62                Ok(ref mut runs) => {
63                    runs.entry(record.agent_id.clone())
64                        .or_default()
65                        .push(record.clone());
66                    Ok(())
67                }
68                Err(error) => Err(error),
69            }
70        };
71        async move { result }
72    }
73
74    fn load_plan(
75        &self,
76        agent_id: &str,
77    ) -> impl Future<Output = Result<Option<ExecutionPlan>>> + Send {
78        let result = {
79            let guard = self
80                .plans
81                .read()
82                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
83            match guard {
84                Ok(ref plans) => Ok(plans.get(agent_id).cloned()),
85                Err(error) => Err(error),
86            }
87        };
88        async move { result }
89    }
90
91    fn list_runs(&self, agent_id: &str) -> impl Future<Output = Result<Vec<RunRecord>>> + Send {
92        let result = {
93            let guard = self
94                .runs
95                .read()
96                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
97            match guard {
98                Ok(ref runs) => Ok(runs.get(agent_id).cloned().unwrap_or_default()),
99                Err(error) => Err(error),
100            }
101        };
102        async move { result }
103    }
104}
105
106impl StorageBackendDyn for InMemoryBackend {
107    fn store_run_dyn<'a>(
108        &'a self,
109        record: &'a RunRecord,
110    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
111        Box::pin(self.store_run(record))
112    }
113
114    fn load_plan_dyn<'a>(
115        &'a self,
116        agent_id: &'a str,
117    ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionPlan>>> + Send + 'a>> {
118        Box::pin(self.load_plan(agent_id))
119    }
120
121    fn list_runs_dyn<'a>(
122        &'a self,
123        agent_id: &'a str,
124    ) -> Pin<Box<dyn Future<Output = Result<Vec<RunRecord>>> + Send + 'a>> {
125        Box::pin(self.list_runs(agent_id))
126    }
127
128    fn store_trie<'a>(
129        &'a self,
130        agent_id: &'a str,
131        envelope: &'a TrieEnvelope,
132    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
133        let result = {
134            let mut guard = self
135                .tries
136                .write()
137                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
138            match guard {
139                Ok(ref mut tries) => {
140                    tries.insert(agent_id.to_string(), envelope.clone());
141                    Ok(())
142                }
143                Err(error) => Err(error),
144            }
145        };
146        Box::pin(async move { result })
147    }
148
149    fn load_trie<'a>(
150        &'a self,
151        agent_id: &'a str,
152    ) -> Pin<Box<dyn Future<Output = Result<Option<TrieEnvelope>>> + Send + 'a>> {
153        let result = {
154            let guard = self
155                .tries
156                .read()
157                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
158            match guard {
159                Ok(ref tries) => Ok(tries.get(agent_id).cloned()),
160                Err(error) => Err(error),
161            }
162        };
163        Box::pin(async move { result })
164    }
165
166    fn store_accumulators<'a>(
167        &'a self,
168        agent_id: &'a str,
169        state: &'a AccumulatorState,
170    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
171        let result = {
172            let mut guard = self
173                .accumulators
174                .write()
175                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
176            match guard {
177                Ok(ref mut accumulators) => {
178                    accumulators.insert(agent_id.to_string(), state.clone());
179                    Ok(())
180                }
181                Err(error) => Err(error),
182            }
183        };
184        Box::pin(async move { result })
185    }
186
187    fn load_accumulators<'a>(
188        &'a self,
189        agent_id: &'a str,
190    ) -> Pin<Box<dyn Future<Output = Result<Option<AccumulatorState>>> + Send + 'a>> {
191        let result = {
192            let guard = self
193                .accumulators
194                .read()
195                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
196            match guard {
197                Ok(ref accumulators) => Ok(accumulators.get(agent_id).cloned()),
198                Err(error) => Err(error),
199            }
200        };
201        Box::pin(async move { result })
202    }
203
204    fn store_plan(&self, plan: &ExecutionPlan) -> Result<()> {
205        let mut guard = self
206            .plans
207            .write()
208            .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")))?;
209        guard.insert(plan.agent_id.clone(), plan.clone());
210        Ok(())
211    }
212
213    fn store_observations<'a>(
214        &'a self,
215        agent_id: &'a str,
216        observations: &'a [crate::acg::prompt_ir::PromptIR],
217    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
218        let result = {
219            let mut guard = self
220                .observations
221                .write()
222                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
223            match guard {
224                Ok(ref mut observations_map) => {
225                    observations_map.insert(agent_id.to_string(), observations.to_vec());
226                    Ok(())
227                }
228                Err(error) => Err(error),
229            }
230        };
231        Box::pin(async move { result })
232    }
233
234    fn load_observations<'a>(
235        &'a self,
236        agent_id: &'a str,
237    ) -> Pin<
238        Box<dyn Future<Output = Result<Option<Vec<crate::acg::prompt_ir::PromptIR>>>> + Send + 'a>,
239    > {
240        let result = {
241            let guard = self
242                .observations
243                .read()
244                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
245            match guard {
246                Ok(ref observations_map) => Ok(observations_map.get(agent_id).cloned()),
247                Err(error) => Err(error),
248            }
249        };
250        Box::pin(async move { result })
251    }
252
253    fn store_stability<'a>(
254        &'a self,
255        agent_id: &'a str,
256        result: &'a crate::acg::stability::StabilityAnalysisResult,
257    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
258        let store_result = {
259            let mut guard = self
260                .stability
261                .write()
262                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
263            match guard {
264                Ok(ref mut stability_map) => {
265                    stability_map.insert(agent_id.to_string(), result.clone());
266                    Ok(())
267                }
268                Err(error) => Err(error),
269            }
270        };
271        Box::pin(async move { store_result })
272    }
273
274    fn load_stability<'a>(
275        &'a self,
276        agent_id: &'a str,
277    ) -> Pin<
278        Box<
279            dyn Future<Output = Result<Option<crate::acg::stability::StabilityAnalysisResult>>>
280                + Send
281                + 'a,
282        >,
283    > {
284        let result = {
285            let guard = self
286                .stability
287                .read()
288                .map_err(|error| AdaptiveError::Internal(format!("lock poisoned: {error}")));
289            match guard {
290                Ok(ref stability_map) => Ok(stability_map.get(agent_id).cloned()),
291                Err(error) => Err(error),
292            }
293        };
294        Box::pin(async move { result })
295    }
296}
297
298#[cfg(test)]
299#[path = "../../tests/unit/storage_memory_internal_tests.rs"]
300mod tests;