1use crate::{attachment::ChunkCursor, hash::digest32, EmbeddedError, EmbeddedResult};
2use alloc::collections::BTreeMap;
3use alloc::vec::Vec;
4
5const STORE_MAGIC: &[u8; 4] = b"RES1";
6const SCHEMA_V1: u8 = 1;
7const SCHEMA_V2: u8 = 2;
8
9pub trait EmbeddedStore {
10 fn load_replay_floor(&self, identity: &[u8; 32]) -> EmbeddedResult<u64>;
11 fn save_replay_floor(&mut self, identity: &[u8; 32], floor: u64) -> EmbeddedResult<()>;
12 fn load_chunk_cursor(&self, transfer_id: u32) -> EmbeddedResult<Option<ChunkCursor>>;
13 fn save_chunk_cursor(&mut self, cursor: &ChunkCursor) -> EmbeddedResult<()>;
14 fn clear_chunk_cursor(&mut self, transfer_id: u32) -> EmbeddedResult<()>;
15}
16
17#[derive(Debug, Clone)]
18pub struct JournaledEmbeddedStore {
19 replay_floors: BTreeMap<[u8; 32], u64>,
20 cursors: BTreeMap<u32, ChunkCursor>,
21 schema_version: u8,
22}
23
24impl JournaledEmbeddedStore {
25 pub fn new() -> Self {
26 Self { replay_floors: BTreeMap::new(), cursors: BTreeMap::new(), schema_version: SCHEMA_V2 }
27 }
28
29 pub fn schema_version(&self) -> u8 {
30 self.schema_version
31 }
32
33 pub fn snapshot_bytes(&self) -> EmbeddedResult<Vec<u8>> {
34 let mut body = Vec::new();
35 body.push(self.schema_version);
36
37 let replay_len =
38 u16::try_from(self.replay_floors.len()).map_err(|_| EmbeddedError::InvalidState)?;
39 body.extend_from_slice(&replay_len.to_le_bytes());
40 for (identity, floor) in &self.replay_floors {
41 body.extend_from_slice(identity);
42 body.extend_from_slice(&floor.to_le_bytes());
43 }
44
45 if self.schema_version >= SCHEMA_V2 {
46 let cursors_len =
47 u16::try_from(self.cursors.len()).map_err(|_| EmbeddedError::InvalidState)?;
48 body.extend_from_slice(&cursors_len.to_le_bytes());
49 for (transfer_id, cursor) in &self.cursors {
50 body.extend_from_slice(&transfer_id.to_le_bytes());
51 body.extend_from_slice(&cursor.total_size.to_le_bytes());
52 body.extend_from_slice(&cursor.next_offset.to_le_bytes());
53 body.extend_from_slice(&cursor.expected_sequence.to_le_bytes());
54 body.extend_from_slice(&cursor.chunk_size.to_le_bytes());
55 }
56 }
57
58 let checksum = digest32(&body);
59
60 let mut out = Vec::with_capacity(4 + 4 + body.len() + 32);
61 out.extend_from_slice(STORE_MAGIC);
62 let body_len_u32 = u32::try_from(body.len()).map_err(|_| EmbeddedError::InvalidState)?;
63 out.extend_from_slice(&body_len_u32.to_le_bytes());
64 out.extend_from_slice(&body);
65 out.extend_from_slice(&checksum);
66 Ok(out)
67 }
68
69 pub fn from_snapshot_bytes(bytes: &[u8]) -> EmbeddedResult<Self> {
70 if bytes.len() < 8 + 32 {
71 return Err(EmbeddedError::StorageCorruption);
72 }
73 if &bytes[0..4] != STORE_MAGIC {
74 return Err(EmbeddedError::StorageCorruption);
75 }
76 let body_len = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
77 let body_len = usize::try_from(body_len).map_err(|_| EmbeddedError::StorageCorruption)?;
78 if bytes.len() != 8 + body_len + 32 {
79 return Err(EmbeddedError::StorageCorruption);
80 }
81
82 let body = &bytes[8..8 + body_len];
83 let mut expected = [0_u8; 32];
84 expected.copy_from_slice(&bytes[8 + body_len..]);
85 if digest32(body) != expected {
86 return Err(EmbeddedError::StorageCorruption);
87 }
88
89 Self::decode_body(body)
90 }
91
92 fn decode_body(body: &[u8]) -> EmbeddedResult<Self> {
93 if body.is_empty() {
94 return Err(EmbeddedError::StorageCorruption);
95 }
96
97 let mut idx = 0usize;
98 let schema = body[idx];
99 idx += 1;
100
101 if schema != SCHEMA_V1 && schema != SCHEMA_V2 {
102 return Err(EmbeddedError::Unsupported);
103 }
104
105 let replay_count = read_u16(body, &mut idx)? as usize;
106 let mut replay_floors = BTreeMap::new();
107 for _ in 0..replay_count {
108 let identity = read_identity(body, &mut idx)?;
109 let floor = read_u64(body, &mut idx)?;
110 replay_floors.insert(identity, floor);
111 }
112
113 let mut cursors = BTreeMap::new();
114 if schema >= SCHEMA_V2 {
115 let cursor_count = read_u16(body, &mut idx)? as usize;
116 for _ in 0..cursor_count {
117 let transfer_id = read_u32(body, &mut idx)?;
118 let total_size = read_u32(body, &mut idx)?;
119 let next_offset = read_u32(body, &mut idx)?;
120 let expected_sequence = read_u16(body, &mut idx)?;
121 let chunk_size = read_u16(body, &mut idx)?;
122 let cursor = ChunkCursor {
123 transfer_id,
124 total_size,
125 next_offset,
126 expected_sequence,
127 chunk_size,
128 };
129 cursor.validate()?;
130 cursors.insert(transfer_id, cursor);
131 }
132 }
133
134 if idx != body.len() {
135 return Err(EmbeddedError::StorageCorruption);
136 }
137
138 Ok(Self { replay_floors, cursors, schema_version: SCHEMA_V2 })
139 }
140}
141
142impl Default for JournaledEmbeddedStore {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl EmbeddedStore for JournaledEmbeddedStore {
149 fn load_replay_floor(&self, identity: &[u8; 32]) -> EmbeddedResult<u64> {
150 Ok(*self.replay_floors.get(identity).unwrap_or(&0))
151 }
152
153 fn save_replay_floor(&mut self, identity: &[u8; 32], floor: u64) -> EmbeddedResult<()> {
154 self.replay_floors.insert(*identity, floor);
155 Ok(())
156 }
157
158 fn load_chunk_cursor(&self, transfer_id: u32) -> EmbeddedResult<Option<ChunkCursor>> {
159 Ok(self.cursors.get(&transfer_id).cloned())
160 }
161
162 fn save_chunk_cursor(&mut self, cursor: &ChunkCursor) -> EmbeddedResult<()> {
163 cursor.validate()?;
164 self.cursors.insert(cursor.transfer_id, cursor.clone());
165 Ok(())
166 }
167
168 fn clear_chunk_cursor(&mut self, transfer_id: u32) -> EmbeddedResult<()> {
169 self.cursors.remove(&transfer_id);
170 Ok(())
171 }
172}
173
174fn read_u16(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u16> {
175 if *idx + 2 > bytes.len() {
176 return Err(EmbeddedError::StorageCorruption);
177 }
178 let out = u16::from_le_bytes([bytes[*idx], bytes[*idx + 1]]);
179 *idx += 2;
180 Ok(out)
181}
182
183fn read_u32(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u32> {
184 if *idx + 4 > bytes.len() {
185 return Err(EmbeddedError::StorageCorruption);
186 }
187 let out = u32::from_le_bytes([bytes[*idx], bytes[*idx + 1], bytes[*idx + 2], bytes[*idx + 3]]);
188 *idx += 4;
189 Ok(out)
190}
191
192fn read_u64(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<u64> {
193 if *idx + 8 > bytes.len() {
194 return Err(EmbeddedError::StorageCorruption);
195 }
196 let out = u64::from_le_bytes([
197 bytes[*idx],
198 bytes[*idx + 1],
199 bytes[*idx + 2],
200 bytes[*idx + 3],
201 bytes[*idx + 4],
202 bytes[*idx + 5],
203 bytes[*idx + 6],
204 bytes[*idx + 7],
205 ]);
206 *idx += 8;
207 Ok(out)
208}
209
210fn read_identity(bytes: &[u8], idx: &mut usize) -> EmbeddedResult<[u8; 32]> {
211 if *idx + 32 > bytes.len() {
212 return Err(EmbeddedError::StorageCorruption);
213 }
214 let mut out = [0_u8; 32];
215 out.copy_from_slice(&bytes[*idx..*idx + 32]);
216 *idx += 32;
217 Ok(out)
218}
219
220#[cfg(test)]
221mod tests {
222 use super::{EmbeddedStore, JournaledEmbeddedStore, SCHEMA_V1};
223 use crate::{attachment::ChunkCursor, EmbeddedError};
224
225 #[test]
226 fn snapshot_roundtrip_preserves_state() {
227 let mut store = JournaledEmbeddedStore::new();
228 let identity = [7_u8; 32];
229 store.save_replay_floor(&identity, 42).expect("save replay");
230 store
231 .save_chunk_cursor(&ChunkCursor {
232 transfer_id: 99,
233 total_size: 1024,
234 next_offset: 256,
235 expected_sequence: 4,
236 chunk_size: 64,
237 })
238 .expect("save cursor");
239
240 let snapshot = store.snapshot_bytes().expect("snapshot");
241 let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
242
243 assert_eq!(restored.load_replay_floor(&identity).expect("load replay"), 42);
244 let cursor = restored.load_chunk_cursor(99).expect("load cursor").expect("cursor exists");
245 assert_eq!(cursor.next_offset, 256);
246 assert_eq!(cursor.expected_sequence, 4);
247 }
248
249 #[test]
250 fn detects_corrupted_snapshot() {
251 let mut store = JournaledEmbeddedStore::new();
252 store.save_replay_floor(&[1_u8; 32], 10).expect("save");
253 let mut snapshot = store.snapshot_bytes().expect("snapshot");
254 snapshot[12] ^= 0x55;
255
256 let err = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot)
257 .expect_err("corruption detected");
258 assert_eq!(err, EmbeddedError::StorageCorruption);
259 }
260
261 #[test]
262 fn migrates_v1_snapshot_without_cursors() {
263 let identity = [2_u8; 32];
264
265 let mut body = Vec::new();
267 body.push(SCHEMA_V1);
268 body.extend_from_slice(&1_u16.to_le_bytes());
269 body.extend_from_slice(&identity);
270 body.extend_from_slice(&123_u64.to_le_bytes());
271
272 let checksum = crate::hash::digest32(&body);
273 let mut snapshot = Vec::new();
274 snapshot.extend_from_slice(b"RES1");
275 snapshot.extend_from_slice(&(u32::try_from(body.len()).expect("len")).to_le_bytes());
276 snapshot.extend_from_slice(&body);
277 snapshot.extend_from_slice(&checksum);
278
279 let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
280 assert_eq!(restored.schema_version(), 2, "v1 migrates to v2 runtime schema");
281 assert_eq!(restored.load_replay_floor(&identity).expect("replay"), 123);
282 assert!(restored.load_chunk_cursor(1).expect("cursor read").is_none());
283 }
284
285 #[test]
286 fn clear_cursor_persists_after_restore() {
287 let mut store = JournaledEmbeddedStore::new();
288 store
289 .save_chunk_cursor(&ChunkCursor {
290 transfer_id: 5,
291 total_size: 100,
292 next_offset: 60,
293 expected_sequence: 3,
294 chunk_size: 20,
295 })
296 .expect("save cursor");
297 store.clear_chunk_cursor(5).expect("clear cursor");
298
299 let snapshot = store.snapshot_bytes().expect("snapshot");
300 let restored = JournaledEmbeddedStore::from_snapshot_bytes(&snapshot).expect("restore");
301 assert!(restored.load_chunk_cursor(5).expect("load cursor").is_none());
302 }
303}