Skip to main content

rns_embedded_core/
store.rs

1use crate::{attachment::ChunkCursor, hash::digest32, EmbeddedError, EmbeddedResult};
2use alloc::collections::BTreeMap;
3use alloc::vec::Vec;
4
5const STORE_MAGIC: &[u8; 4] = b"RES1";
6const SCHEMA_V1: u8 = 1;
7const SCHEMA_V2: u8 = 2;
8
9pub trait EmbeddedStore {
10    fn load_replay_floor(&self, identity: &[u8; 32]) -> EmbeddedResult<u64>;
11    fn save_replay_floor(&mut self, identity: &[u8; 32], floor: u64) -> EmbeddedResult<()>;
12    fn load_chunk_cursor(&self, transfer_id: u32) -> EmbeddedResult<Option<ChunkCursor>>;
13    fn save_chunk_cursor(&mut self, cursor: &ChunkCursor) -> EmbeddedResult<()>;
14    fn clear_chunk_cursor(&mut self, transfer_id: u32) -> EmbeddedResult<()>;
15}
16
17#[derive(Debug, Clone)]
18pub struct JournaledEmbeddedStore {
19    replay_floors: BTreeMap<[u8; 32], u64>,
20    cursors: BTreeMap<u32, ChunkCursor>,
21    schema_version: u8,
22}
23
24impl JournaledEmbeddedStore {
25    pub fn new() -> Self {
26        Self { replay_floors: BTreeMap::new(), cursors: BTreeMap::new(), schema_version: SCHEMA_V2 }
27    }
28
29    pub fn schema_version(&self) -> u8 {
30        self.schema_version
31    }
32
33    pub fn snapshot_bytes(&self) -> EmbeddedResult<Vec<u8>> {
34        let mut body = Vec::new();
35        body.push(self.schema_version);
36
37        let replay_len =
38            u16::try_from(self.replay_floors.len()).map_err(|_| EmbeddedError::InvalidState)?;
39        body.extend_from_slice(&replay_len.to_le_bytes());
40        for (identity, floor) in &self.replay_floors {
41            body.extend_from_slice(identity);
42            body.extend_from_slice(&floor.to_le_bytes());
43        }
44
45        if self.schema_version >= SCHEMA_V2 {
46            let cursors_len =
47                u16::try_from(self.cursors.len()).map_err(|_| EmbeddedError::InvalidState)?;
48            body.extend_from_slice(&cursors_len.to_le_bytes());
49            for (transfer_id, cursor) in &self.cursors {
50                body.extend_from_slice(&transfer_id.to_le_bytes());
51                body.extend_from_slice(&cursor.total_size.to_le_bytes());
52                body.extend_from_slice(&cursor.next_offset.to_le_bytes());
53                body.extend_from_slice(&cursor.expected_sequence.to_le_bytes());
54                body.extend_from_slice(&cursor.chunk_size.to_le_bytes());
55            }
56        }
57
58        let checksum = digest32(&body);
59
60        let mut out = Vec::with_capacity(4 + 4 + body.len() + 32);
61        out.extend_from_slice(STORE_MAGIC);
62        let body_len_u32 = u32::try_from(body.len()).map_err(|_| EmbeddedError::InvalidState)?;
63        out.extend_from_slice(&body_len_u32.to_le_bytes());
64        out.extend_from_slice(&body);
65        out.extend_from_slice(&checksum);
66        Ok(out)
67    }
68
69    pub fn from_snapshot_bytes(bytes: &[u8]) -> EmbeddedResult<Self> {
70        if bytes.len() < 8 + 32 {
71            return Err(EmbeddedError::StorageCorruption);
72        }
73        if &bytes[0..4] != STORE_MAGIC {
74            return Err(EmbeddedError::StorageCorruption);
75        }
76        let body_len = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
77        let body_len = usize::try_from(body_len).map_err(|_| EmbeddedError::StorageCorruption)?;
78        if bytes.len() != 8 + body_len + 32 {
79            return Err(EmbeddedError::StorageCorruption);
80        }
81
82        let body = &bytes[8..8 + body_len];
83        let mut expected = [0_u8; 32];
84        expected.copy_from_slice(&bytes[8 + body_len..]);
85        if digest32(body) != expected {
86            return Err(EmbeddedError::StorageCorruption);
87        }
88
89        Self::decode_body(body)
90    }
91
92    fn decode_body(body: &[u8]) -> EmbeddedResult<Self> {
93        if body.is_empty() {
94            return Err(EmbeddedError::StorageCorruption);
95        }
96
97        let mut idx = 0usize;
98        let schema = body[idx];
99        idx += 1;
100
101        if schema != SCHEMA_V1 && schema != SCHEMA_V2 {
102            return Err(EmbeddedError::Unsupported);
103        }
104
105        let replay_count = read_u16(body, &mut idx)? as usize;
106        let mut replay_floors = BTreeMap::new();
107        for _ in 0..replay_count {
108            let identity = read_identity(body, &mut idx)?;
109            let floor = read_u64(body, &mut idx)?;
110            replay_floors.insert(identity, floor);
111        }
112
113        let mut cursors = BTreeMap::new();
114        if schema >= SCHEMA_V2 {
115            let cursor_count = read_u16(body, &mut idx)? as usize;
116            for _ in 0..cursor_count {
117                let transfer_id = read_u32(body, &mut idx)?;
118                let total_size = read_u32(body, &mut idx)?;
119                let next_offset = read_u32(body, &mut idx)?;
120                let expected_sequence = read_u16(body, &mut idx)?;
121                let chunk_size = read_u16(body, &mut idx)?;
122                let cursor = ChunkCursor {
123                    transfer_id,
124                    total_size,
125                    next_offset,
126                    expected_sequence,
127                    chunk_size,
128                };
129                cursor.validate()?;
130                cursors.insert(transfer_id, cursor);
131            }
132        }
133
134        if idx != body.len() {
135            return Err(EmbeddedError::StorageCorruption);
136        }
137
138        Ok(Self { replay_floors, cursors, schema_version: SCHEMA_V2 })
139    }
140}
141
142impl Default for JournaledEmbeddedStore {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl EmbeddedStore for JournaledEmbeddedStore {
149    fn load_replay_floor(&self, identity: &[u8; 32]) -> EmbeddedResult<u64> {
150        Ok(*self.replay_floors.get(identity).unwrap_or(&0))
151    }
152
153    fn save_replay_floor(&mut self, identity: &[u8; 32], floor: u64) -> EmbeddedResult<()> {
154        self.replay_floors.insert(*identity, floor);
155        Ok(())
156    }
157
158    fn load_chunk_cursor(&self, transfer_id: u32) -> EmbeddedResult<Option<ChunkCursor>> {
159        Ok(self.cursors.get(&transfer_id).cloned())
160    }
161
162    fn save_chunk_cursor(&mut self, cursor: &ChunkCursor) -> EmbeddedResult<()> {
163        cursor.validate()?;
164        self.cursors.insert(cursor.transfer_id, cursor.clone());
165        Ok(())
166    }
167
168    fn clear_chunk_cursor(&mut self, transfer_id: u32) -> EmbeddedResult<()> {
169        self.cursors.remove(&transfer_id);
170        Ok(())
171    }
172}
173
174fn read_u16(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u16> {
175    if *idx + 2 > bytes.len() {
176        return Err(EmbeddedError::StorageCorruption);
177    }
178    let out = u16::from_le_bytes([bytes[*idx], bytes[*idx + 1]]);
179    *idx += 2;
180    Ok(out)
181}
182
183fn read_u32(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u32> {
184    if *idx + 4 > bytes.len() {
185        return Err(EmbeddedError::StorageCorruption);
186    }
187    let out = u32::from_le_bytes([bytes[*idx], bytes[*idx + 1], bytes[*idx + 2], bytes[*idx + 3]]);
188    *idx += 4;
189    Ok(out)
190}
191
192fn read_u64(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u64> {
193    if *idx + 8 > bytes.len() {
194        return Err(EmbeddedError::StorageCorruption);
195    }
196    let out = u64::from_le_bytes([
197        bytes[*idx],
198        bytes[*idx + 1],
199        bytes[*idx + 2],
200        bytes[*idx + 3],
201        bytes[*idx + 4],
202        bytes[*idx + 5],
203        bytes[*idx + 6],
204        bytes[*idx + 7],
205    ]);
206    *idx += 8;
207    Ok(out)
208}
209
210fn read_identity(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<[u8; 32]> {
211    if *idx + 32 > bytes.len() {
212        return Err(EmbeddedError::StorageCorruption);
213    }
214    let mut out = [0_u8; 32];
215    out.copy_from_slice(&bytes[*idx..*idx + 32]);
216    *idx += 32;
217    Ok(out)
218}
219
220#[cfg(test)]
221mod tests {
222    use super::{EmbeddedStore, JournaledEmbeddedStore, SCHEMA_V1};
223    use crate::{attachment::ChunkCursor, EmbeddedError};
224
225    #[test]
226    fn snapshot_roundtrip_preserves_state() {
227        let mut store = JournaledEmbeddedStore::new();
228        let identity = [7_u8; 32];
229        store.save_replay_floor(&identity, 42).expect("save replay");
230        store
231            .save_chunk_cursor(&ChunkCursor {
232                transfer_id: 99,
233                total_size: 1024,
234                next_offset: 256,
235                expected_sequence: 4,
236                chunk_size: 64,
237            })
238            .expect("save cursor");
239
240        let snapshot = store.snapshot_bytes().expect("snapshot");
241        let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
242
243        assert_eq!(restored.load_replay_floor(&identity).expect("load replay"), 42);
244        let cursor = restored.load_chunk_cursor(99).expect("load cursor").expect("cursor exists");
245        assert_eq!(cursor.next_offset, 256);
246        assert_eq!(cursor.expected_sequence, 4);
247    }
248
249    #[test]
250    fn detects_corrupted_snapshot() {
251        let mut store = JournaledEmbeddedStore::new();
252        store.save_replay_floor(&[1_u8; 32], 10).expect("save");
253        let mut snapshot = store.snapshot_bytes().expect("snapshot");
254        snapshot[12] ^= 0x55;
255
256        let err = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot)
257            .expect_err("corruption detected");
258        assert_eq!(err, EmbeddedError::StorageCorruption);
259    }
260
261    #[test]
262    fn migrates_v1_snapshot_without_cursors() {
263        let identity = [2_u8; 32];
264
265        // Build a valid v1 body: schema + replay_count + identity + floor.
266        let mut body = Vec::new();
267        body.push(SCHEMA_V1);
268        body.extend_from_slice(&1_u16.to_le_bytes());
269        body.extend_from_slice(&identity);
270        body.extend_from_slice(&123_u64.to_le_bytes());
271
272        let checksum = crate::hash::digest32(&body);
273        let mut snapshot = Vec::new();
274        snapshot.extend_from_slice(b"RES1");
275        snapshot.extend_from_slice(&(u32::try_from(body.len()).expect("len")).to_le_bytes());
276        snapshot.extend_from_slice(&body);
277        snapshot.extend_from_slice(&checksum);
278
279        let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
280        assert_eq!(restored.schema_version(), 2, "v1 migrates to v2 runtime schema");
281        assert_eq!(restored.load_replay_floor(&identity).expect("replay"), 123);
282        assert!(restored.load_chunk_cursor(1).expect("cursor read").is_none());
283    }
284
285    #[test]
286    fn clear_cursor_persists_after_restore() {
287        let mut store = JournaledEmbeddedStore::new();
288        store
289            .save_chunk_cursor(&ChunkCursor {
290                transfer_id: 5,
291                total_size: 100,
292                next_offset: 60,
293                expected_sequence: 3,
294                chunk_size: 20,
295            })
296            .expect("save cursor");
297        store.clear_chunk_cursor(5).expect("clear cursor");
298
299        let snapshot = store.snapshot_bytes().expect("snapshot");
300        let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
301        assert!(restored.load_chunk_cursor(5).expect("load cursor").is_none());
302    }
303}