1use std::collections::HashMap;
2use std::path::Path;
3
4use infinite_db::InfiniteDb;
5use infinite_db::infinitedb_core::address::DimensionVector;
6
7use frp_loom::error::StoreError;
8use frp_loom::query::{Query, QueryResult};
9use frp_loom::store::{AtomStore, BlockStore, EdgeStore};
10use frp_domain::atom::Atom;
11use frp_domain::block::Block;
12use frp_domain::edge::HyperEdge;
13use frp_plexus::{AtomId, BlockId, EdgeId};
14
15use crate::error::PersistenceError;
16use crate::spaces::{register_spaces, SPACE_ATOMS, SPACE_BLOCKS, SPACE_EDGES};
17
18pub struct InfiniteDbStore {
31 db: InfiniteDb,
32 atoms: HashMap<AtomId, Atom>,
33 blocks: HashMap<BlockId, Block>,
34 edges: HashMap<EdgeId, HyperEdge>,
35}
36
37impl InfiniteDbStore {
38 pub fn open(dir: impl AsRef<Path>) -> Result<Self, StoreError> {
41 let mut db = InfiniteDb::open(dir)
42 .map_err(|e| StoreError::Io(format!("failed to open infinite-db: {e}")))?;
43
44 register_spaces(&mut db).map_err(StoreError::from)?;
45
46 let mut store = Self {
47 db,
48 atoms: HashMap::new(),
49 blocks: HashMap::new(),
50 edges: HashMap::new(),
51 };
52
53 store.warm_cache()?;
54 Ok(store)
55 }
56
57 pub fn flush(&mut self) -> Result<(), StoreError> {
61 for space in [SPACE_ATOMS, SPACE_BLOCKS, SPACE_EDGES] {
62 self.db
63 .flush(space)
64 .map_err(|e| StoreError::Io(format!("flush failed: {e}")))?;
65 }
66 Ok(())
67 }
68
69 fn warm_cache(&mut self) -> Result<(), StoreError> {
75 let records = self.db
77 .query(SPACE_ATOMS, None)
78 .map_err(|e| StoreError::Io(format!("cache warm (atoms): {e}")))?;
79 for rec in records {
80 let atom: Atom = serde_json::from_slice(&rec.data)
81 .map_err(|e| StoreError::Io(format!("deserialize atom: {e}")))?;
82 self.atoms.insert(atom.id, atom);
83 }
84
85 let records = self.db
87 .query(SPACE_BLOCKS, None)
88 .map_err(|e| StoreError::Io(format!("cache warm (blocks): {e}")))?;
89 for rec in records {
90 let block: Block = serde_json::from_slice(&rec.data)
91 .map_err(|e| StoreError::Io(format!("deserialize block: {e}")))?;
92 self.blocks.insert(block.id, block);
93 }
94
95 let records = self.db
97 .query(SPACE_EDGES, None)
98 .map_err(|e| StoreError::Io(format!("cache warm (edges): {e}")))?;
99 for rec in records {
100 let edge: HyperEdge = serde_json::from_slice(&rec.data)
101 .map_err(|e| StoreError::Io(format!("deserialize edge: {e}")))?;
102 self.edges.insert(edge.id, edge);
103 }
104
105 Ok(())
106 }
107
108 #[inline]
109 fn id_point(kind: &str, raw: u64) -> Result<DimensionVector, StoreError> {
110 let id = u32::try_from(raw).map_err(|_| {
111 StoreError::Io(format!(
112 "{kind} id {raw} exceeds 1D coordinate limit (u32); refusing lossy u64->u32 mapping"
113 ))
114 })?;
115 Ok(DimensionVector::new(vec![id]))
116 }
117}
118
119impl AtomStore for InfiniteDbStore {
124 type Atom = Atom;
125
126 fn get_atom(&self, id: AtomId) -> Result<&Atom, StoreError> {
127 self.atoms.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
128 }
129
130 fn put_atom(&mut self, atom: Atom) -> Result<(), StoreError> {
131 let bytes = serde_json::to_vec(&atom)
132 .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
133 let point = Self::id_point("atom", atom.id.value())?;
134 self.db
135 .insert(SPACE_ATOMS, point, bytes)
136 .map_err(|e| StoreError::Io(format!("db insert atom: {e}")))?;
137 self.atoms.insert(atom.id, atom);
138 Ok(())
139 }
140
141 fn delete_atom(&mut self, id: AtomId) -> Result<(), StoreError> {
142 if !self.atoms.contains_key(&id) {
143 return Err(StoreError::not_found(id.value()));
144 }
145 let point = Self::id_point("atom", id.value())?;
146 self.db
147 .delete(SPACE_ATOMS, point)
148 .map_err(|e| StoreError::Io(format!("db delete atom: {e}")))?;
149 self.atoms.remove(&id);
150 Ok(())
151 }
152
153 fn query_atoms(&self, query: &Query) -> Result<QueryResult<&Atom>, StoreError> {
154 let filtered: Vec<&Atom> = self
155 .atoms
156 .values()
157 .filter(|a| {
158 if let Some(k) = &query.kind_filter {
159 if a.kind.to_string() != *k {
160 return false;
161 }
162 }
163 for tag in &query.tag_filter {
164 if !a.meta.tags.contains(tag) {
165 return false;
166 }
167 }
168 true
169 })
170 .collect();
171
172 let total = filtered.len();
173 let items = filtered
174 .into_iter()
175 .skip(query.offset)
176 .take(query.limit.unwrap_or(usize::MAX))
177 .collect();
178
179 Ok(QueryResult::new(items, total, query.offset))
180 }
181}
182
183impl BlockStore for InfiniteDbStore {
188 type Block = Block;
189
190 fn get_block(&self, id: BlockId) -> Result<&Block, StoreError> {
191 self.blocks.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
192 }
193
194 fn put_block(&mut self, block: Block) -> Result<(), StoreError> {
195 let bytes = serde_json::to_vec(&block)
196 .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
197 let point = Self::id_point("block", block.id.value())?;
198 self.db
199 .insert(SPACE_BLOCKS, point, bytes)
200 .map_err(|e| StoreError::Io(format!("db insert block: {e}")))?;
201 self.blocks.insert(block.id, block);
202 Ok(())
203 }
204
205 fn delete_block(&mut self, id: BlockId) -> Result<(), StoreError> {
206 if !self.blocks.contains_key(&id) {
207 return Err(StoreError::not_found(id.value()));
208 }
209 let point = Self::id_point("block", id.value())?;
210 self.db
211 .delete(SPACE_BLOCKS, point)
212 .map_err(|e| StoreError::Io(format!("db delete block: {e}")))?;
213 self.blocks.remove(&id);
214 Ok(())
215 }
216
217 fn query_blocks(&self, query: &Query) -> Result<QueryResult<&Block>, StoreError> {
218 let filtered: Vec<&Block> = self
219 .blocks
220 .values()
221 .filter(|b| {
222 for tag in &query.tag_filter {
224 if !b.meta.labels.contains_key(tag.as_str()) {
225 return false;
226 }
227 }
228 true
229 })
230 .collect();
231
232 let total = filtered.len();
233 let items = filtered
234 .into_iter()
235 .skip(query.offset)
236 .take(query.limit.unwrap_or(usize::MAX))
237 .collect();
238
239 Ok(QueryResult::new(items, total, query.offset))
240 }
241}
242
243impl EdgeStore for InfiniteDbStore {
248 type Edge = HyperEdge;
249
250 fn get_edge(&self, id: EdgeId) -> Result<&HyperEdge, StoreError> {
251 self.edges.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
252 }
253
254 fn put_edge(&mut self, edge: HyperEdge) -> Result<(), StoreError> {
255 let bytes = serde_json::to_vec(&edge)
256 .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
257 let point = Self::id_point("edge", edge.id.value())?;
258 self.db
259 .insert(SPACE_EDGES, point, bytes)
260 .map_err(|e| StoreError::Io(format!("db insert edge: {e}")))?;
261 self.edges.insert(edge.id, edge);
262 Ok(())
263 }
264
265 fn delete_edge(&mut self, id: EdgeId) -> Result<(), StoreError> {
266 if !self.edges.contains_key(&id) {
267 return Err(StoreError::not_found(id.value()));
268 }
269 let point = Self::id_point("edge", id.value())?;
270 self.db
271 .delete(SPACE_EDGES, point)
272 .map_err(|e| StoreError::Io(format!("db delete edge: {e}")))?;
273 self.edges.remove(&id);
274 Ok(())
275 }
276
277 fn query_edges(&self, query: &Query) -> Result<QueryResult<&HyperEdge>, StoreError> {
278 let all: Vec<&HyperEdge> = self.edges.values().collect();
280 let total = all.len();
281 let items = all
282 .into_iter()
283 .skip(query.offset)
284 .take(query.limit.unwrap_or(usize::MAX))
285 .collect();
286 Ok(QueryResult::new(items, total, query.offset))
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use tempfile::TempDir;
293
294 use frp_domain::{
295 Atom, AtomKind, AtomMeta, Block, BlockSchema, EdgeSchedule, EdgeTransform, HyperEdge, Meta,
296 };
297 use frp_loom::store::{AtomStore, BlockStore, EdgeStore};
298 use frp_plexus::{AtomId, BlockId, EdgeId, LayerTag, PortId};
299
300 use super::InfiniteDbStore;
301
302 fn open_tmp_store() -> (TempDir, InfiniteDbStore) {
303 let dir = TempDir::new().expect("temp dir");
304 let store = InfiniteDbStore::open(dir.path()).expect("open store");
305 (dir, store)
306 }
307
308 #[test]
309 fn round_trip_atom_block_edge_through_store() {
310 let (_dir, mut store) = open_tmp_store();
311
312 let atom = Atom::new(
313 AtomId::new(7),
314 AtomKind::Transform,
315 AtomMeta::new("roundtrip", LayerTag::Core),
316 );
317 let block = Block {
318 id: BlockId::new(11),
319 schema: BlockSchema::new(vec![], vec![]),
320 atoms: vec![atom.id],
321 meta: Meta::default(),
322 };
323 let edge = HyperEdge::new(
324 EdgeId::new(13),
325 vec![PortId::new(1)],
326 vec![PortId::new(2)],
327 EdgeTransform::PassThrough,
328 EdgeSchedule::OnChange,
329 );
330
331 store.put_atom(atom.clone()).expect("put atom");
332 store.put_block(block.clone()).expect("put block");
333 store.put_edge(edge.clone()).expect("put edge");
334 store.flush().expect("flush");
335
336 assert_eq!(store.get_atom(atom.id).expect("get atom").id, atom.id);
337 assert_eq!(store.get_block(block.id).expect("get block").id, block.id);
338 assert_eq!(store.get_edge(edge.id).expect("get edge").id, edge.id);
339 }
340
341 #[test]
342 fn put_atom_rejects_u64_id_overflow_for_1d_space() {
343 let (_dir, mut store) = open_tmp_store();
344 let atom = Atom::new(
345 AtomId::new(u64::MAX),
346 AtomKind::Transform,
347 AtomMeta::new("overflow", LayerTag::Core),
348 );
349
350 let err = store.put_atom(atom).expect_err("overflow must fail");
351 assert!(err.to_string().contains("exceeds 1D coordinate limit"));
352 }
353}