Skip to main content

frp_persistence/
store.rs

1use std::collections::HashMap;
2use std::path::Path;
3
4use infinite_db::InfiniteDb;
5use infinite_db::infinitedb_core::address::DimensionVector;
6
7use frp_loom::error::StoreError;
8use frp_loom::query::{Query, QueryResult};
9use frp_loom::store::{AtomStore, BlockStore, EdgeStore};
10use frp_domain::atom::Atom;
11use frp_domain::block::Block;
12use frp_domain::edge::HyperEdge;
13use frp_plexus::{AtomId, BlockId, EdgeId};
14
15use crate::error::PersistenceError;
16use crate::spaces::{register_spaces, SPACE_ATOMS, SPACE_BLOCKS, SPACE_EDGES};
17
18// ---------------------------------------------------------------------------
19// InfiniteDbStore
20// ---------------------------------------------------------------------------
21
22/// A durable store for frp graph entities backed by [`InfiniteDb`].
23///
24/// Uses a **write-through cache**: all reads are served from in-memory
25/// `HashMap`s (required because the store traits return borrowed `&Self::*`
26/// references). Writes go to both the in-memory cache and the database WAL.
27///
28/// Call [`flush`](Self::flush) before process exit to seal buffered WAL
29/// records into on-disk blocks.
30pub struct InfiniteDbStore {
31    db:     InfiniteDb,
32    atoms:  HashMap<AtomId,  Atom>,
33    blocks: HashMap<BlockId, Block>,
34    edges:  HashMap<EdgeId,  HyperEdge>,
35}
36
37impl InfiniteDbStore {
38    /// Open (or create) the database at `dir`, register all frp spaces,
39    /// and warm the in-memory caches by replaying every live record.
40    pub fn open(dir: impl AsRef<Path>) -> Result<Self, StoreError> {
41        let mut db = InfiniteDb::open(dir)
42            .map_err(|e| StoreError::Io(format!("failed to open infinite-db: {e}")))?;
43
44        register_spaces(&mut db).map_err(StoreError::from)?;
45
46        let mut store = Self {
47            db,
48            atoms:  HashMap::new(),
49            blocks: HashMap::new(),
50            edges:  HashMap::new(),
51        };
52
53        store.warm_cache()?;
54        Ok(store)
55    }
56
57    /// Seal all buffered WAL records into on-disk blocks.
58    ///
59    /// Call this before process exit to ensure durability.
60    pub fn flush(&mut self) -> Result<(), StoreError> {
61        for space in [SPACE_ATOMS, SPACE_BLOCKS, SPACE_EDGES] {
62            self.db
63                .flush(space)
64                .map_err(|e| StoreError::Io(format!("flush failed: {e}")))?;
65        }
66        Ok(())
67    }
68
69    // -----------------------------------------------------------------------
70    // Private helpers
71    // -----------------------------------------------------------------------
72
73    /// Deserialise all live records from `infinite-db` into the caches.
74    fn warm_cache(&mut self) -> Result<(), StoreError> {
75        // Atoms
76        let records = self.db
77            .query(SPACE_ATOMS, None)
78            .map_err(|e| StoreError::Io(format!("cache warm (atoms): {e}")))?;
79        for rec in records {
80            let atom: Atom = serde_json::from_slice(&rec.data)
81                .map_err(|e| StoreError::Io(format!("deserialize atom: {e}")))?;
82            self.atoms.insert(atom.id, atom);
83        }
84
85        // Blocks
86        let records = self.db
87            .query(SPACE_BLOCKS, None)
88            .map_err(|e| StoreError::Io(format!("cache warm (blocks): {e}")))?;
89        for rec in records {
90            let block: Block = serde_json::from_slice(&rec.data)
91                .map_err(|e| StoreError::Io(format!("deserialize block: {e}")))?;
92            self.blocks.insert(block.id, block);
93        }
94
95        // Edges
96        let records = self.db
97            .query(SPACE_EDGES, None)
98            .map_err(|e| StoreError::Io(format!("cache warm (edges): {e}")))?;
99        for rec in records {
100            let edge: HyperEdge = serde_json::from_slice(&rec.data)
101                .map_err(|e| StoreError::Io(format!("deserialize edge: {e}")))?;
102            self.edges.insert(edge.id, edge);
103        }
104
105        Ok(())
106    }
107
108    #[inline]
109    fn id_point(kind: &str, raw: u64) -> Result<DimensionVector, StoreError> {
110        let id = u32::try_from(raw).map_err(|_| {
111            StoreError::Io(format!(
112                "{kind} id {raw} exceeds 1D coordinate limit (u32); refusing lossy u64->u32 mapping"
113            ))
114        })?;
115        Ok(DimensionVector::new(vec![id]))
116    }
117}
118
119// ---------------------------------------------------------------------------
120// AtomStore
121// ---------------------------------------------------------------------------
122
123impl AtomStore for InfiniteDbStore {
124    type Atom = Atom;
125
126    fn get_atom(&self, id: AtomId) -> Result<&Atom, StoreError> {
127        self.atoms.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
128    }
129
130    fn put_atom(&mut self, atom: Atom) -> Result<(), StoreError> {
131        let bytes = serde_json::to_vec(&atom)
132            .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
133        let point = Self::id_point("atom", atom.id.value())?;
134        self.db
135            .insert(SPACE_ATOMS, point, bytes)
136            .map_err(|e| StoreError::Io(format!("db insert atom: {e}")))?;
137        self.atoms.insert(atom.id, atom);
138        Ok(())
139    }
140
141    fn delete_atom(&mut self, id: AtomId) -> Result<(), StoreError> {
142        if !self.atoms.contains_key(&id) {
143            return Err(StoreError::not_found(id.value()));
144        }
145        let point = Self::id_point("atom", id.value())?;
146        self.db
147            .delete(SPACE_ATOMS, point)
148            .map_err(|e| StoreError::Io(format!("db delete atom: {e}")))?;
149        self.atoms.remove(&id);
150        Ok(())
151    }
152
153    fn query_atoms(&self, query: &Query) -> Result<QueryResult<&Atom>, StoreError> {
154        let filtered: Vec<&Atom> = self
155            .atoms
156            .values()
157            .filter(|a| {
158                if let Some(k) = &query.kind_filter {
159                    if a.kind.to_string() != *k {
160                        return false;
161                    }
162                }
163                for tag in &query.tag_filter {
164                    if !a.meta.tags.contains(tag) {
165                        return false;
166                    }
167                }
168                true
169            })
170            .collect();
171
172        let total = filtered.len();
173        let items = filtered
174            .into_iter()
175            .skip(query.offset)
176            .take(query.limit.unwrap_or(usize::MAX))
177            .collect();
178
179        Ok(QueryResult::new(items, total, query.offset))
180    }
181}
182
183// ---------------------------------------------------------------------------
184// BlockStore
185// ---------------------------------------------------------------------------
186
187impl BlockStore for InfiniteDbStore {
188    type Block = Block;
189
190    fn get_block(&self, id: BlockId) -> Result<&Block, StoreError> {
191        self.blocks.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
192    }
193
194    fn put_block(&mut self, block: Block) -> Result<(), StoreError> {
195        let bytes = serde_json::to_vec(&block)
196            .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
197        let point = Self::id_point("block", block.id.value())?;
198        self.db
199            .insert(SPACE_BLOCKS, point, bytes)
200            .map_err(|e| StoreError::Io(format!("db insert block: {e}")))?;
201        self.blocks.insert(block.id, block);
202        Ok(())
203    }
204
205    fn delete_block(&mut self, id: BlockId) -> Result<(), StoreError> {
206        if !self.blocks.contains_key(&id) {
207            return Err(StoreError::not_found(id.value()));
208        }
209        let point = Self::id_point("block", id.value())?;
210        self.db
211            .delete(SPACE_BLOCKS, point)
212            .map_err(|e| StoreError::Io(format!("db delete block: {e}")))?;
213        self.blocks.remove(&id);
214        Ok(())
215    }
216
217    fn query_blocks(&self, query: &Query) -> Result<QueryResult<&Block>, StoreError> {
218        let filtered: Vec<&Block> = self
219            .blocks
220            .values()
221            .filter(|b| {
222                // Block has no `kind` string — only tag filtering applies.
223                for tag in &query.tag_filter {
224                    if !b.meta.labels.contains_key(tag.as_str()) {
225                        return false;
226                    }
227                }
228                true
229            })
230            .collect();
231
232        let total = filtered.len();
233        let items = filtered
234            .into_iter()
235            .skip(query.offset)
236            .take(query.limit.unwrap_or(usize::MAX))
237            .collect();
238
239        Ok(QueryResult::new(items, total, query.offset))
240    }
241}
242
243// ---------------------------------------------------------------------------
244// EdgeStore
245// ---------------------------------------------------------------------------
246
247impl EdgeStore for InfiniteDbStore {
248    type Edge = HyperEdge;
249
250    fn get_edge(&self, id: EdgeId) -> Result<&HyperEdge, StoreError> {
251        self.edges.get(&id).ok_or_else(|| StoreError::not_found(id.value()))
252    }
253
254    fn put_edge(&mut self, edge: HyperEdge) -> Result<(), StoreError> {
255        let bytes = serde_json::to_vec(&edge)
256            .map_err(|e| StoreError::Io(PersistenceError::Serialize(e.to_string()).to_string()))?;
257        let point = Self::id_point("edge", edge.id.value())?;
258        self.db
259            .insert(SPACE_EDGES, point, bytes)
260            .map_err(|e| StoreError::Io(format!("db insert edge: {e}")))?;
261        self.edges.insert(edge.id, edge);
262        Ok(())
263    }
264
265    fn delete_edge(&mut self, id: EdgeId) -> Result<(), StoreError> {
266        if !self.edges.contains_key(&id) {
267            return Err(StoreError::not_found(id.value()));
268        }
269        let point = Self::id_point("edge", id.value())?;
270        self.db
271            .delete(SPACE_EDGES, point)
272            .map_err(|e| StoreError::Io(format!("db delete edge: {e}")))?;
273        self.edges.remove(&id);
274        Ok(())
275    }
276
277    fn query_edges(&self, query: &Query) -> Result<QueryResult<&HyperEdge>, StoreError> {
278        // Edges have no kind/tag metadata to filter on — return all with pagination.
279        let all: Vec<&HyperEdge> = self.edges.values().collect();
280        let total = all.len();
281        let items = all
282            .into_iter()
283            .skip(query.offset)
284            .take(query.limit.unwrap_or(usize::MAX))
285            .collect();
286        Ok(QueryResult::new(items, total, query.offset))
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use tempfile::TempDir;
293
294    use frp_domain::{
295        Atom, AtomKind, AtomMeta, Block, BlockSchema, EdgeSchedule, EdgeTransform, HyperEdge, Meta,
296    };
297    use frp_loom::store::{AtomStore, BlockStore, EdgeStore};
298    use frp_plexus::{AtomId, BlockId, EdgeId, LayerTag, PortId};
299
300    use super::InfiniteDbStore;
301
302    fn open_tmp_store() -> (TempDir, InfiniteDbStore) {
303        let dir = TempDir::new().expect("temp dir");
304        let store = InfiniteDbStore::open(dir.path()).expect("open store");
305        (dir, store)
306    }
307
308    #[test]
309    fn round_trip_atom_block_edge_through_store() {
310        let (_dir, mut store) = open_tmp_store();
311
312        let atom = Atom::new(
313            AtomId::new(7),
314            AtomKind::Transform,
315            AtomMeta::new("roundtrip", LayerTag::Core),
316        );
317        let block = Block {
318            id: BlockId::new(11),
319            schema: BlockSchema::new(vec![], vec![]),
320            atoms: vec![atom.id],
321            meta: Meta::default(),
322        };
323        let edge = HyperEdge::new(
324            EdgeId::new(13),
325            vec![PortId::new(1)],
326            vec![PortId::new(2)],
327            EdgeTransform::PassThrough,
328            EdgeSchedule::OnChange,
329        );
330
331        store.put_atom(atom.clone()).expect("put atom");
332        store.put_block(block.clone()).expect("put block");
333        store.put_edge(edge.clone()).expect("put edge");
334        store.flush().expect("flush");
335
336        assert_eq!(store.get_atom(atom.id).expect("get atom").id, atom.id);
337        assert_eq!(store.get_block(block.id).expect("get block").id, block.id);
338        assert_eq!(store.get_edge(edge.id).expect("get edge").id, edge.id);
339    }
340
341    #[test]
342    fn put_atom_rejects_u64_id_overflow_for_1d_space() {
343        let (_dir, mut store) = open_tmp_store();
344        let atom = Atom::new(
345            AtomId::new(u64::MAX),
346            AtomKind::Transform,
347            AtomMeta::new("overflow", LayerTag::Core),
348        );
349
350        let err = store.put_atom(atom).expect_err("overflow must fail");
351        assert!(err.to_string().contains("exceeds 1D coordinate limit"));
352    }
353}