Skip to main content

reddb_file/
graph_store.rs

1//! Persisted graph-store envelope.
2//!
3//! The graph engine owns pages, registry semantics, and index rebuilds. This
4//! module owns the durable envelope around graph pages.
5
6pub const GRAPH_STORE_MAGIC: [u8; 4] = *b"RBGR";
7pub const GRAPH_STORE_VERSION_V1: u32 = 1;
8pub const GRAPH_STORE_VERSION_V2: u32 = 2;
9pub const GRAPH_STORE_HEADER_LEN: usize = 24;
10pub const GRAPH_STORE_REGISTRY_LEN_BYTES: usize = 4;
11pub const GRAPH_STORE_PAGE_COUNT_BYTES: usize = 4;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct GraphStoreFrame {
15    pub version: u32,
16    pub node_count: u64,
17    pub edge_count: u64,
18    pub registry_bytes: Option<Vec<u8>>,
19    pub node_pages: Vec<Vec<u8>>,
20    pub edge_pages: Vec<Vec<u8>>,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum GraphStoreFrameError {
25    UnsupportedVersion(u32),
26    RegistryRequired,
27    RegistryForbidden,
28    TooManyPages { len: usize, max: usize },
29    Malformed { offset: usize, reason: &'static str },
30}
31
32impl std::fmt::Display for GraphStoreFrameError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            Self::UnsupportedVersion(version) => {
36                write!(f, "unsupported graph store frame version {version}")
37            }
38            Self::RegistryRequired => write!(f, "graph store v2 frame requires registry bytes"),
39            Self::RegistryForbidden => {
40                write!(f, "graph store v1 frame cannot carry registry bytes")
41            }
42            Self::TooManyPages { len, max } => {
43                write!(f, "too many graph store pages: {len} (max {max})")
44            }
45            Self::Malformed { offset, reason } => {
46                write!(
47                    f,
48                    "malformed graph store frame at offset {offset}: {reason}"
49                )
50            }
51        }
52    }
53}
54
55impl std::error::Error for GraphStoreFrameError {}
56
57pub fn encode_graph_store_frame(frame: &GraphStoreFrame) -> Result<Vec<u8>, GraphStoreFrameError> {
58    match frame.version {
59        GRAPH_STORE_VERSION_V1 if frame.registry_bytes.is_some() => {
60            return Err(GraphStoreFrameError::RegistryForbidden);
61        }
62        GRAPH_STORE_VERSION_V1 => {}
63        GRAPH_STORE_VERSION_V2 if frame.registry_bytes.is_none() => {
64            return Err(GraphStoreFrameError::RegistryRequired);
65        }
66        GRAPH_STORE_VERSION_V2 => {}
67        version => return Err(GraphStoreFrameError::UnsupportedVersion(version)),
68    }
69
70    let node_page_count =
71        u32::try_from(frame.node_pages.len()).map_err(|_| GraphStoreFrameError::TooManyPages {
72            len: frame.node_pages.len(),
73            max: u32::MAX as usize,
74        })?;
75    let edge_page_count =
76        u32::try_from(frame.edge_pages.len()).map_err(|_| GraphStoreFrameError::TooManyPages {
77            len: frame.edge_pages.len(),
78            max: u32::MAX as usize,
79        })?;
80
81    let registry_len = frame.registry_bytes.as_ref().map(Vec::len).unwrap_or(0);
82    let page_bytes: usize = frame
83        .node_pages
84        .iter()
85        .chain(frame.edge_pages.iter())
86        .map(Vec::len)
87        .sum();
88    let mut buf = Vec::with_capacity(
89        GRAPH_STORE_HEADER_LEN
90            + if frame.version == GRAPH_STORE_VERSION_V2 {
91                GRAPH_STORE_REGISTRY_LEN_BYTES + registry_len
92            } else {
93                0
94            }
95            + GRAPH_STORE_PAGE_COUNT_BYTES * 2
96            + page_bytes,
97    );
98
99    buf.extend_from_slice(&GRAPH_STORE_MAGIC);
100    buf.extend_from_slice(&frame.version.to_le_bytes());
101    buf.extend_from_slice(&frame.node_count.to_le_bytes());
102    buf.extend_from_slice(&frame.edge_count.to_le_bytes());
103
104    if frame.version == GRAPH_STORE_VERSION_V2 {
105        let registry = frame
106            .registry_bytes
107            .as_ref()
108            .expect("v2 registry checked above");
109        let registry_len =
110            u32::try_from(registry.len()).map_err(|_| GraphStoreFrameError::Malformed {
111                offset: GRAPH_STORE_HEADER_LEN,
112                reason: "registry too large",
113            })?;
114        buf.extend_from_slice(&registry_len.to_le_bytes());
115        buf.extend_from_slice(registry);
116    }
117
118    buf.extend_from_slice(&node_page_count.to_le_bytes());
119    for page in &frame.node_pages {
120        buf.extend_from_slice(page);
121    }
122    buf.extend_from_slice(&edge_page_count.to_le_bytes());
123    for page in &frame.edge_pages {
124        buf.extend_from_slice(page);
125    }
126
127    Ok(buf)
128}
129
130pub fn decode_graph_store_frame(
131    data: &[u8],
132    page_size: usize,
133) -> Result<GraphStoreFrame, GraphStoreFrameError> {
134    if data.len() < GRAPH_STORE_HEADER_LEN {
135        return Err(GraphStoreFrameError::Malformed {
136            offset: 0,
137            reason: "header truncated",
138        });
139    }
140    if data[0..4] != GRAPH_STORE_MAGIC {
141        return Err(GraphStoreFrameError::Malformed {
142            offset: 0,
143            reason: "invalid magic",
144        });
145    }
146
147    let version = read_u32(data, 4, "version")?;
148    let node_count = read_u64(data, 8, "node count")?;
149    let edge_count = read_u64(data, 16, "edge count")?;
150    let mut offset = GRAPH_STORE_HEADER_LEN;
151
152    let registry_bytes = match version {
153        GRAPH_STORE_VERSION_V1 => None,
154        GRAPH_STORE_VERSION_V2 => {
155            let len = read_u32(data, offset, "registry length")? as usize;
156            offset += GRAPH_STORE_REGISTRY_LEN_BYTES;
157            if data.len() < offset + len {
158                return Err(GraphStoreFrameError::Malformed {
159                    offset,
160                    reason: "registry bytes truncated",
161                });
162            }
163            let bytes = data[offset..offset + len].to_vec();
164            offset += len;
165            Some(bytes)
166        }
167        version => return Err(GraphStoreFrameError::UnsupportedVersion(version)),
168    };
169
170    let node_page_count = read_u32(data, offset, "node page count")? as usize;
171    offset += GRAPH_STORE_PAGE_COUNT_BYTES;
172    let (node_pages, next_offset) = read_pages(data, offset, node_page_count, page_size, "node")?;
173    offset = next_offset;
174
175    let edge_page_count = read_u32(data, offset, "edge page count")? as usize;
176    offset += GRAPH_STORE_PAGE_COUNT_BYTES;
177    let (edge_pages, _next_offset) = read_pages(data, offset, edge_page_count, page_size, "edge")?;
178
179    Ok(GraphStoreFrame {
180        version,
181        node_count,
182        edge_count,
183        registry_bytes,
184        node_pages,
185        edge_pages,
186    })
187}
188
189fn read_pages(
190    data: &[u8],
191    mut offset: usize,
192    count: usize,
193    page_size: usize,
194    label: &'static str,
195) -> Result<(Vec<Vec<u8>>, usize), GraphStoreFrameError> {
196    let mut pages = Vec::with_capacity(count);
197    for _ in 0..count {
198        if data.len() < offset + page_size {
199            return Err(GraphStoreFrameError::Malformed {
200                offset,
201                reason: if label == "node" {
202                    "node pages truncated"
203                } else {
204                    "edge pages truncated"
205                },
206            });
207        }
208        pages.push(data[offset..offset + page_size].to_vec());
209        offset += page_size;
210    }
211    Ok((pages, offset))
212}
213
214fn read_u32(data: &[u8], offset: usize, reason: &'static str) -> Result<u32, GraphStoreFrameError> {
215    if data.len() < offset + 4 {
216        return Err(GraphStoreFrameError::Malformed { offset, reason });
217    }
218    Ok(u32::from_le_bytes(
219        data[offset..offset + 4]
220            .try_into()
221            .expect("u32 length checked"),
222    ))
223}
224
225fn read_u64(data: &[u8], offset: usize, reason: &'static str) -> Result<u64, GraphStoreFrameError> {
226    if data.len() < offset + 8 {
227        return Err(GraphStoreFrameError::Malformed { offset, reason });
228    }
229    Ok(u64::from_le_bytes(
230        data[offset..offset + 8]
231            .try_into()
232            .expect("u64 length checked"),
233    ))
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn graph_store_v2_frame_round_trips() {
242        let frame = GraphStoreFrame {
243            version: GRAPH_STORE_VERSION_V2,
244            node_count: 2,
245            edge_count: 1,
246            registry_bytes: Some(vec![1, 2, 3]),
247            node_pages: vec![vec![10; 8], vec![11; 8]],
248            edge_pages: vec![vec![20; 8]],
249        };
250
251        let encoded = encode_graph_store_frame(&frame).unwrap();
252        let decoded = decode_graph_store_frame(&encoded, 8).unwrap();
253        assert_eq!(decoded, frame);
254    }
255
256    #[test]
257    fn graph_store_v1_frame_has_no_registry() {
258        let frame = GraphStoreFrame {
259            version: GRAPH_STORE_VERSION_V1,
260            node_count: 1,
261            edge_count: 1,
262            registry_bytes: None,
263            node_pages: vec![vec![10; 8]],
264            edge_pages: vec![vec![20; 8]],
265        };
266
267        let encoded = encode_graph_store_frame(&frame).unwrap();
268        let decoded = decode_graph_store_frame(&encoded, 8).unwrap();
269        assert_eq!(decoded.registry_bytes, None);
270        assert_eq!(decoded.node_pages.len(), 1);
271        assert_eq!(decoded.edge_pages.len(), 1);
272    }
273
274    #[test]
275    fn graph_store_frame_rejects_truncated_input() {
276        assert!(matches!(
277            decode_graph_store_frame(&[1, 2, 3], 8),
278            Err(GraphStoreFrameError::Malformed { .. })
279        ));
280
281        let frame = GraphStoreFrame {
282            version: GRAPH_STORE_VERSION_V2,
283            node_count: 1,
284            edge_count: 0,
285            registry_bytes: Some(vec![]),
286            node_pages: vec![vec![10; 8]],
287            edge_pages: vec![],
288        };
289        let encoded = encode_graph_store_frame(&frame).unwrap();
290        assert!(matches!(
291            decode_graph_store_frame(&encoded[..encoded.len() - 1], 8),
292            Err(GraphStoreFrameError::Malformed { .. })
293        ));
294    }
295}