1use crate::error::{Error, Result};
8use crate::io::Cursor;
9
10#[derive(Debug, Clone)]
12pub enum ChunkIndexing {
13 SingleChunk { filtered_size: u64, filters: u32 },
15 Implicit,
17 FixedArray { page_bits: u8, chunk_size_len: u8 },
19 ExtensibleArray {
21 max_bits: u8,
22 index_bits: u8,
23 min_pointers: u8,
24 min_elements: u8,
25 chunk_size_len: u8,
26 },
27 BTreeV2,
29}
30
31#[derive(Debug, Clone)]
33pub enum DataLayout {
34 Compact { data: Vec<u8> },
36 Contiguous { address: u64, size: u64 },
38 Chunked {
40 address: u64,
42 dims: Vec<u32>,
44 element_size: u32,
46 chunk_indexing: Option<ChunkIndexing>,
48 },
49}
50
51#[derive(Debug, Clone)]
53pub struct DataLayoutMessage {
54 pub layout: DataLayout,
55}
56
57pub fn parse(
59 cursor: &mut Cursor<'_>,
60 offset_size: u8,
61 length_size: u8,
62 msg_size: usize,
63) -> Result<DataLayoutMessage> {
64 let start = cursor.position();
65 let version = cursor.read_u8()?;
66
67 let layout = match version {
68 1 | 2 => parse_v1_v2(cursor, offset_size, length_size, version)?,
69 3 => parse_v3(cursor, offset_size, length_size)?,
70 4 | 5 => parse_v4_v5(cursor, offset_size, length_size, version)?,
71 v => return Err(Error::UnsupportedLayoutVersion(v)),
72 };
73
74 let consumed = (cursor.position() - start) as usize;
75 if consumed < msg_size {
76 cursor.skip(msg_size - consumed)?;
77 }
78
79 Ok(DataLayoutMessage { layout })
80}
81
82fn parse_v1_v2(
87 cursor: &mut Cursor<'_>,
88 offset_size: u8,
89 _length_size: u8,
90 version: u8,
91) -> Result<DataLayout> {
92 let dimensionality = cursor.read_u8()?;
93 let layout_class = cursor.read_u8()?;
94 let _reserved = cursor.read_bytes(if version == 1 { 5 } else { 3 })?;
95
96 let data_address = if layout_class != 0 {
99 cursor.read_offset(offset_size)?
100 } else {
101 cursor.read_offset(offset_size)?
103 };
104
105 let mut dim_values = Vec::with_capacity(dimensionality as usize);
109 for _ in 0..dimensionality {
110 dim_values.push(cursor.read_u32_le()?);
111 }
112
113 match layout_class {
114 0 => {
115 let compact_size = cursor.read_u32_le()? as usize;
117 let data = cursor.read_bytes(compact_size)?.to_vec();
118 Ok(DataLayout::Compact { data })
119 }
120 1 => {
121 let size = if dim_values.is_empty() {
126 0
127 } else {
128 dim_values.iter().map(|d| *d as u64).product()
129 };
130 Ok(DataLayout::Contiguous {
131 address: data_address,
132 size,
133 })
134 }
135 2 => {
136 let (element_size, chunk_dims) = if dim_values.is_empty() {
138 (0u32, vec![])
139 } else {
140 let es = *dim_values.last().unwrap();
141 let cd: Vec<u32> = dim_values[..dim_values.len() - 1].to_vec();
142 (es, cd)
143 };
144 Ok(DataLayout::Chunked {
145 address: data_address,
146 dims: chunk_dims,
147 element_size,
148 chunk_indexing: None,
149 })
150 }
151 c => Err(Error::UnsupportedLayoutClass(c)),
152 }
153}
154
155fn parse_v3(cursor: &mut Cursor<'_>, offset_size: u8, length_size: u8) -> Result<DataLayout> {
160 let layout_class = cursor.read_u8()?;
161
162 match layout_class {
163 0 => {
164 let size = cursor.read_u16_le()? as usize;
166 let data = cursor.read_bytes(size)?.to_vec();
167 Ok(DataLayout::Compact { data })
168 }
169 1 => {
170 let address = cursor.read_offset(offset_size)?;
172 let size = cursor.read_length(length_size)?;
173 Ok(DataLayout::Contiguous { address, size })
174 }
175 2 => {
176 let dimensionality = cursor.read_u8()?;
178 let address = cursor.read_offset(offset_size)?;
179
180 let n = dimensionality as usize;
182 let mut raw_dims = Vec::with_capacity(n);
183 for _ in 0..n {
184 raw_dims.push(cursor.read_u32_le()?);
185 }
186
187 let (element_size, chunk_dims) = if raw_dims.is_empty() {
188 (0, vec![])
189 } else {
190 let es = *raw_dims.last().unwrap();
191 let cd = raw_dims[..raw_dims.len() - 1].to_vec();
192 (es, cd)
193 };
194
195 Ok(DataLayout::Chunked {
196 address,
197 dims: chunk_dims,
198 element_size,
199 chunk_indexing: None,
200 })
201 }
202 c => Err(Error::UnsupportedLayoutClass(c)),
203 }
204}
205
206fn parse_v4_v5(
216 cursor: &mut Cursor<'_>,
217 offset_size: u8,
218 length_size: u8,
219 version: u8,
220) -> Result<DataLayout> {
221 let layout_class = cursor.read_u8()?;
222
223 match layout_class {
224 0 => {
225 let size = cursor.read_u16_le()? as usize;
227 let data = cursor.read_bytes(size)?.to_vec();
228 Ok(DataLayout::Compact { data })
229 }
230 1 => {
231 let address = cursor.read_offset(offset_size)?;
233 let size = cursor.read_u64_le()?;
234 Ok(DataLayout::Contiguous { address, size })
235 }
236 2 => {
237 let start = cursor.clone();
238 let direct = parse_v4_v5_chunked(cursor, offset_size, length_size, version, false);
239 match direct {
240 Ok(layout) => Ok(layout),
241 Err(err) if version == 4 && should_retry_v4_chunked_parse(&err) => {
242 *cursor = start;
243 parse_v4_v5_chunked(cursor, offset_size, length_size, version, true)
244 }
245 Err(err) => Err(err),
246 }
247 }
248 c => Err(Error::UnsupportedLayoutClass(c)),
249 }
250}
251
252fn parse_v4_v5_chunked(
253 cursor: &mut Cursor<'_>,
254 offset_size: u8,
255 length_size: u8,
256 version: u8,
257 legacy_dim_size_encoding: bool,
258) -> Result<DataLayout> {
259 let flags = cursor.read_u8()?;
260 let ndims_raw = cursor.read_u8()? as usize;
261 let dim_size_enc = cursor.read_u8()?;
262 let dim_bytes = if legacy_dim_size_encoding {
263 dim_size_enc as usize + 1
264 } else {
265 dim_size_enc as usize
266 };
267
268 let mut dims = Vec::with_capacity(ndims_raw);
269 for _ in 0..ndims_raw {
270 dims.push(cursor.read_uvar(dim_bytes)? as u32);
271 }
272
273 let index_type = cursor.read_u8()?;
274 let chunk_size_len = if version >= 5 {
275 offset_size
276 } else {
277 length_size
278 };
279 let chunk_indexing = parse_chunk_indexing_v4_v5(cursor, flags, index_type, chunk_size_len)?;
280 let address = cursor.read_offset(offset_size)?;
281
282 Ok(DataLayout::Chunked {
283 address,
284 dims,
285 element_size: 0,
286 chunk_indexing: Some(chunk_indexing),
287 })
288}
289
290fn should_retry_v4_chunked_parse(err: &Error) -> bool {
291 match err {
292 Error::UnexpectedEof { .. } | Error::UnsupportedChunkIndexType(_) => true,
293 Error::InvalidData(msg) => msg.starts_with("unsupported variable integer size:"),
294 _ => false,
295 }
296}
297
298fn parse_chunk_indexing_v4_v5(
301 cursor: &mut Cursor<'_>,
302 flags: u8,
303 index_type: u8,
304 chunk_size_len: u8,
305) -> Result<ChunkIndexing> {
306 match index_type {
307 1 => {
308 let idx_flags = if (flags & 0x01) != 0 {
310 let filtered_size = cursor.read_u64_le()?;
311 let filter_mask = cursor.read_u32_le()?;
312 Some((filtered_size, filter_mask))
313 } else {
314 None
315 };
316 let (fs, fm) = idx_flags.unwrap_or((0, 0));
317 Ok(ChunkIndexing::SingleChunk {
318 filtered_size: fs,
319 filters: fm,
320 })
321 }
322 2 => Ok(ChunkIndexing::Implicit),
323 3 => {
324 let page_bits = cursor.read_u8()?;
325 Ok(ChunkIndexing::FixedArray {
326 page_bits,
327 chunk_size_len,
328 })
329 }
330 4 => {
331 let max_bits = cursor.read_u8()?;
332 let index_bits = cursor.read_u8()?;
333 let min_pointers = cursor.read_u8()?;
334 let min_elements = cursor.read_u8()?;
335 let _max_dblk_page_bits = cursor.read_u8()?;
336 Ok(ChunkIndexing::ExtensibleArray {
337 max_bits,
338 index_bits,
339 min_pointers,
340 min_elements,
341 chunk_size_len,
342 })
343 }
344 5 => {
345 cursor.skip(6)?;
346 Ok(ChunkIndexing::BTreeV2)
347 }
348 t => Err(Error::UnsupportedChunkIndexType(t)),
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_parse_v3_contiguous() {
358 let mut data = vec![
359 0x03, 0x01, ];
362 data.extend_from_slice(&0x1000u64.to_le_bytes());
364 data.extend_from_slice(&4096u64.to_le_bytes());
366
367 let mut cursor = Cursor::new(&data);
368 let msg = parse(&mut cursor, 8, 8, data.len()).unwrap();
369 match &msg.layout {
370 DataLayout::Contiguous { address, size } => {
371 assert_eq!(*address, 0x1000);
372 assert_eq!(*size, 4096);
373 }
374 other => panic!("expected Contiguous, got {:?}", other),
375 }
376 }
377
378 #[test]
379 fn test_parse_v3_compact() {
380 let mut data = vec![
381 0x03, 0x00, ];
384 data.extend_from_slice(&4u16.to_le_bytes());
386 data.extend_from_slice(&[0x01, 0x02, 0x03, 0x04]);
388
389 let mut cursor = Cursor::new(&data);
390 let msg = parse(&mut cursor, 8, 8, data.len()).unwrap();
391 match &msg.layout {
392 DataLayout::Compact { data } => {
393 assert_eq!(data, &[0x01, 0x02, 0x03, 0x04]);
394 }
395 other => panic!("expected Compact, got {:?}", other),
396 }
397 }
398
399 #[test]
400 fn test_parse_v3_chunked() {
401 let mut data = vec![
402 0x03, 0x02, 0x03, ];
406 data.extend_from_slice(&0x2000u64.to_le_bytes());
408 data.extend_from_slice(&256u32.to_le_bytes());
410 data.extend_from_slice(&128u32.to_le_bytes());
412 data.extend_from_slice(&4u32.to_le_bytes());
414
415 let mut cursor = Cursor::new(&data);
416 let msg = parse(&mut cursor, 8, 8, data.len()).unwrap();
417 match &msg.layout {
418 DataLayout::Chunked {
419 address,
420 dims,
421 element_size,
422 chunk_indexing,
423 } => {
424 assert_eq!(*address, 0x2000);
425 assert_eq!(dims, &[256, 128]);
426 assert_eq!(*element_size, 4);
427 assert!(chunk_indexing.is_none());
428 }
429 other => panic!("expected Chunked, got {:?}", other),
430 }
431 }
432
433 #[test]
434 fn test_parse_v4_chunked_direct_dim_size_encoding() {
435 let mut data = vec![
436 0x04, 0x02, 0x00, 0x02, 0x04, ];
442 data.extend_from_slice(&3u32.to_le_bytes());
443 data.extend_from_slice(&5u32.to_le_bytes());
444 data.push(0x03); data.push(0x00); data.extend_from_slice(&0x1122_3344_5566_7788u64.to_le_bytes());
447
448 let mut cursor = Cursor::new(&data);
449 let msg = parse(&mut cursor, 8, 8, data.len()).unwrap();
450 match &msg.layout {
451 DataLayout::Chunked {
452 address,
453 dims,
454 element_size,
455 chunk_indexing,
456 } => {
457 assert_eq!(*address, 0x1122_3344_5566_7788);
458 assert_eq!(dims, &[3, 5]);
459 assert_eq!(*element_size, 0);
460 match chunk_indexing {
461 Some(ChunkIndexing::FixedArray {
462 page_bits,
463 chunk_size_len,
464 }) => {
465 assert_eq!(*page_bits, 0);
466 assert_eq!(*chunk_size_len, 8);
467 }
468 other => panic!("expected FixedArray indexing, got {:?}", other),
469 }
470 }
471 other => panic!("expected Chunked, got {:?}", other),
472 }
473 }
474
475 #[test]
476 fn test_parse_v4_chunked_legacy_dim_size_encoding() {
477 let mut data = vec![
478 0x04, 0x02, 0x00, 0x02, 0x03, ];
484 data.extend_from_slice(&3u32.to_le_bytes());
485 data.extend_from_slice(&5u32.to_le_bytes());
486 data.push(0x03); data.push(0x00); data.extend_from_slice(&0x8877_6655_4433_2211u64.to_le_bytes());
489
490 let mut cursor = Cursor::new(&data);
491 let msg = parse(&mut cursor, 8, 8, data.len()).unwrap();
492 match &msg.layout {
493 DataLayout::Chunked {
494 address,
495 dims,
496 element_size,
497 chunk_indexing,
498 } => {
499 assert_eq!(*address, 0x8877_6655_4433_2211);
500 assert_eq!(dims, &[3, 5]);
501 assert_eq!(*element_size, 0);
502 match chunk_indexing {
503 Some(ChunkIndexing::FixedArray {
504 page_bits,
505 chunk_size_len,
506 }) => {
507 assert_eq!(*page_bits, 0);
508 assert_eq!(*chunk_size_len, 8);
509 }
510 other => panic!("expected FixedArray indexing, got {:?}", other),
511 }
512 }
513 other => panic!("expected Chunked, got {:?}", other),
514 }
515 }
516}