1use crate::compression::detect_bz2;
4use crate::error::DmapError;
5use crate::io;
6use crate::types::{
7 parse_scalar, parse_vector, parse_vector_header, read_data, DmapField, DmapType, DmapVec,
8 Fields,
9};
10use bzip2::read::BzDecoder;
11use indexmap::IndexMap;
12use itertools::izip;
13use rayon::iter::Either;
14use rayon::prelude::*;
15use std::fmt::Debug;
16use std::fs::File;
17use std::io::{Cursor, Read};
18use std::path::Path;
19
20pub trait Record<'a>:
25 Debug + Send + Sync + TryFrom<IndexMap<String, DmapField>, Error = DmapError>
26{
27 fn new(fields: &mut IndexMap<String, DmapField>) -> Result<Self, DmapError>
29 where
30 Self: Sized;
31
32 fn inner(self) -> IndexMap<String, DmapField>;
34
35 fn get(&self, key: &str) -> Option<&DmapField>;
37
38 fn keys(&self) -> Vec<&String>;
40
41 fn is_metadata_field(name: &str) -> bool;
43
44 fn read_first_record(mut dmap_data: impl Read) -> Result<Self, DmapError>
48 where
49 Self: Sized,
50 Self: Send,
51 {
52 let mut stream: Box<dyn Read>;
53 let (is_bz2, chunk) = detect_bz2(&mut dmap_data)?;
54 if is_bz2 {
55 stream = Box::new(BzDecoder::new(chunk));
56 } else {
57 stream = Box::new(chunk);
58 }
59
60 let mut buffer = [0; 8]; stream
62 .read_exact(&mut buffer)
63 .map_err(|_| DmapError::CorruptStream("Unable to read size of first record"))?;
64
65 let rec_size = i32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize; if rec_size == 0 {
67 return Err(DmapError::InvalidRecord(format!(
68 "Record 0 starting at byte 0 has non-positive size {} <= 0",
69 rec_size
70 )));
71 }
72
73 let mut rec = vec![0; rec_size];
74 rec[0..8].clone_from_slice(&buffer[..]);
75 stream.read_exact(&mut rec[8..])?;
76 let first_rec = Self::parse_record(&mut Cursor::new(rec))?;
77
78 Ok(first_rec)
79 }
80
81 fn read_records(mut dmap_data: impl Read) -> Result<Vec<Self>, DmapError>
85 where
86 Self: Sized,
87 Self: Send,
88 {
89 let mut buffer: Vec<u8> = vec![];
90 let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
91 if is_bz2 {
92 let mut stream = BzDecoder::new(chunk);
93 stream.read_to_end(&mut buffer)?;
94 } else {
95 chunk.read_to_end(&mut buffer)?;
96 }
97
98 let mut slices: Vec<_> = vec![];
99 let mut rec_start: usize = 0;
100 let mut rec_size: usize;
101 let mut rec_end: usize;
102
103 while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
104 rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
105 as usize; rec_end = rec_start + rec_size; if rec_end > buffer.len() {
108 return Err(DmapError::InvalidRecord(format!("Record {} starting at byte {} has size greater than remaining length of buffer ({} > {})", slices.len(), rec_start, rec_size, buffer.len() - rec_start)));
109 } else if rec_size == 0 {
110 return Err(DmapError::InvalidRecord(format!(
111 "Record {} starting at byte {} has non-positive size {} <= 0",
112 slices.len(),
113 rec_start,
114 rec_size
115 )));
116 }
117 slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
118 rec_start = rec_end;
119 }
120 if rec_start != buffer.len() {
121 return Err(DmapError::InvalidRecord(format!(
122 "Record {} starting at byte {} incomplete; has size of {} bytes",
123 slices.len() + 1,
124 rec_start,
125 buffer.len() - rec_start
126 )));
127 }
128 let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
129 dmap_results.par_extend(
130 slices
131 .par_iter_mut()
132 .map(|cursor| Self::parse_record(cursor)),
133 );
134
135 let mut dmap_records: Vec<Self> = vec![];
136 let mut bad_recs: Vec<usize> = vec![];
137 let mut dmap_errors: Vec<DmapError> = vec![];
138 for (i, rec) in dmap_results.into_iter().enumerate() {
139 match rec {
140 Ok(x) => dmap_records.push(x),
141 Err(e) => {
142 dmap_errors.push(e);
143 bad_recs.push(i);
144 }
145 }
146 }
147 if !dmap_errors.is_empty() {
148 return Err(DmapError::BadRecords(bad_recs, dmap_errors[0].to_string()));
149 }
150 Ok(dmap_records)
151 }
152
153 fn read_metadata(
157 mut dmap_data: impl Read,
158 ) -> Result<Vec<IndexMap<String, DmapField>>, DmapError>
159 where
160 Self: Sized,
161 Self: Send,
162 {
163 let mut buffer: Vec<u8> = vec![];
164 let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
165 if is_bz2 {
166 let mut stream = BzDecoder::new(chunk);
167 stream.read_to_end(&mut buffer)?;
168 } else {
169 chunk.read_to_end(&mut buffer)?;
170 }
171
172 let mut slices: Vec<_> = vec![];
173 let mut rec_start: usize = 0;
174 let mut rec_size: usize;
175 let mut rec_end: usize;
176
177 while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
178 rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
179 as usize; rec_end = rec_start + rec_size; if rec_end > buffer.len() {
182 return Err(DmapError::InvalidRecord(format!("Record {} starting at byte {} has size greater than remaining length of buffer ({} > {})", slices.len(), rec_start, rec_size, buffer.len() - rec_start)));
183 } else if rec_size == 0 {
184 return Err(DmapError::InvalidRecord(format!(
185 "Record {} starting at byte {} has non-positive size {} <= 0",
186 slices.len(),
187 rec_start,
188 rec_size
189 )));
190 }
191 slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
192 rec_start = rec_end;
193 }
194 if rec_start != buffer.len() {
195 return Err(DmapError::InvalidRecord(format!(
196 "Record {} starting at byte {} incomplete; has size of {} bytes",
197 slices.len() + 1,
198 rec_start,
199 buffer.len() - rec_start
200 )));
201 }
202 let mut dmap_results: Vec<Result<IndexMap<String, DmapField>, DmapError>> = vec![];
203 dmap_results.par_extend(
204 slices
205 .par_iter_mut()
206 .map(|cursor| Self::parse_metadata(cursor)),
207 );
208
209 let mut dmap_records: Vec<IndexMap<String, DmapField>> = vec![];
210 let mut bad_recs: Vec<usize> = vec![];
211 let mut dmap_errors: Vec<DmapError> = vec![];
212 for (i, rec) in dmap_results.into_iter().enumerate() {
213 match rec {
214 Ok(x) => dmap_records.push(x),
215 Err(e) => {
216 dmap_errors.push(e);
217 bad_recs.push(i);
218 }
219 }
220 }
221 if !dmap_errors.is_empty() {
222 return Err(DmapError::BadRecords(bad_recs, dmap_errors[0].to_string()));
223 }
224
225 Ok(dmap_records)
226 }
227
228 fn read_records_lax(mut dmap_data: impl Read) -> Result<(Vec<Self>, Option<usize>), DmapError>
233 where
234 Self: Sized,
235 Self: Send,
236 {
237 let mut buffer: Vec<u8> = vec![];
238 let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
239 if is_bz2 {
240 let mut stream = BzDecoder::new(chunk);
241 stream.read_to_end(&mut buffer)?;
242 } else {
243 chunk.read_to_end(&mut buffer)?;
244 }
245
246 let mut dmap_records: Vec<Self> = vec![];
247 let mut bad_byte: Option<usize> = None;
248
249 let mut slices: Vec<_> = vec![];
250 let mut rec_start: usize = 0;
251 let mut rec_size: usize;
252 let mut rec_end: usize;
253
254 let mut rec_starts = vec![];
255 while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
256 rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
257 as usize; rec_end = rec_start + rec_size; if rec_end > buffer.len() || rec_size == 0 {
260 bad_byte = Some(rec_start);
261 break;
262 } else {
264 rec_starts.push(rec_start);
265 slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
266 rec_start = rec_end;
267 }
268 }
269 if rec_start != buffer.len() {
270 bad_byte = Some(rec_start);
271 }
272 let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
273 dmap_results.par_extend(
274 slices
275 .par_iter_mut()
276 .map(|cursor| Self::parse_record(cursor)),
277 );
278
279 for (i, rec) in dmap_results.into_iter().enumerate() {
280 if let Ok(x) = rec {
281 dmap_records.push(x);
282 } else {
283 bad_byte = Some(rec_starts[i]);
284 break;
285 }
286 }
287 Ok((dmap_records, bad_byte))
288 }
289
290 fn read_file<P: AsRef<Path>>(infile: P) -> Result<Vec<Self>, DmapError>
292 where
293 Self: Sized,
294 Self: Send,
295 {
296 let file = File::open(infile)?;
297 Self::read_records(file)
298 }
299
300 fn read_file_lax<P: AsRef<Path>>(infile: P) -> Result<(Vec<Self>, Option<usize>), DmapError>
305 where
306 Self: Sized,
307 Self: Send,
308 {
309 let file = File::open(infile)?;
310 Self::read_records_lax(file)
311 }
312
313 fn sniff_file<P: AsRef<Path>>(infile: P) -> Result<Self, DmapError>
315 where
316 Self: Sized,
317 Self: Send,
318 {
319 let file = File::open(infile)?;
320 Self::read_first_record(file)
321 }
322
323 fn read_file_metadata<P: AsRef<Path>>(
325 infile: P,
326 ) -> Result<Vec<IndexMap<String, DmapField>>, DmapError>
327 where
328 Self: Sized,
329 Self: Send,
330 {
331 let file = File::open(infile)?;
332 Self::read_metadata(file)
333 }
334
335 fn parse_metadata(
337 cursor: &mut Cursor<Vec<u8>>,
338 ) -> Result<IndexMap<String, DmapField>, DmapError>
339 where
340 Self: Sized,
341 {
342 let bytes_already_read = cursor.position();
343 let _code = read_data::<i32>(cursor).map_err(|e| {
344 DmapError::InvalidRecord(format!(
345 "Cannot interpret code at byte {}: {e}",
346 bytes_already_read
347 ))
348 })?;
349 let size = read_data::<i32>(cursor).map_err(|e| {
350 DmapError::InvalidRecord(format!(
351 "Cannot interpret size at byte {}: {e}",
352 bytes_already_read + i32::size() as u64
353 ))
354 })?;
355
356 if size as u64 > cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
358 {
359 return Err(DmapError::InvalidRecord(format!(
360 "Record size {size} at byte {} bigger than remaining buffer {}",
361 cursor.position() - i32::size() as u64,
362 cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
363 )));
364 } else if size <= 0 {
365 return Err(DmapError::InvalidRecord(format!("Record size {size} <= 0")));
366 }
367
368 let num_scalars = read_data::<i32>(cursor).map_err(|e| {
369 DmapError::InvalidRecord(format!(
370 "Cannot interpret number of scalars at byte {}: {e}",
371 cursor.position() - i32::size() as u64
372 ))
373 })?;
374 let num_vectors = read_data::<i32>(cursor).map_err(|e| {
375 DmapError::InvalidRecord(format!(
376 "Cannot interpret number of vectors at byte {}: {e}",
377 cursor.position() - i32::size() as u64
378 ))
379 })?;
380 if num_scalars <= 0 {
381 return Err(DmapError::InvalidRecord(format!(
382 "Number of scalars {num_scalars} at byte {} <= 0",
383 cursor.position() - 2 * i32::size() as u64
384 )));
385 } else if num_vectors <= 0 {
386 return Err(DmapError::InvalidRecord(format!(
387 "Number of vectors {num_vectors} at byte {} <= 0",
388 cursor.position() - i32::size() as u64
389 )));
390 } else if num_scalars + num_vectors > size {
391 return Err(DmapError::InvalidRecord(format!(
392 "Number of scalars {num_scalars} plus vectors {num_vectors} greater than size '{size}'")));
393 }
394
395 let mut fields: IndexMap<String, DmapField> = IndexMap::new();
396 for _ in 0..num_scalars {
397 let (name, val) = parse_scalar(cursor)?;
398 fields.insert(name, val);
399 }
400 for _ in 0..num_vectors {
401 let here = cursor.position();
402 let (name, dtype, _dims, num_elements) = parse_vector_header(cursor, size)?;
403 if Self::is_metadata_field(&name) {
404 cursor.set_position(here);
405 let (_, val) = parse_vector(cursor, size)?;
406 fields.insert(name.to_string(), val);
407 } else {
408 let vec_data_size = dtype.size() as u64 * num_elements as u64;
409 let here = cursor.position();
410 cursor.set_position(here + vec_data_size);
411 }
412 }
413
414 if cursor.position() - bytes_already_read != size as u64 {
415 return Err(DmapError::InvalidRecord(format!(
416 "Bytes read {} does not match the records size field {}",
417 cursor.position() - bytes_already_read,
418 size
419 )));
420 }
421
422 Ok(fields)
423 }
424
425 fn parse_record(cursor: &mut Cursor<Vec<u8>>) -> Result<Self, DmapError>
427 where
428 Self: Sized,
429 {
430 let bytes_already_read = cursor.position();
431 let _code = read_data::<i32>(cursor).map_err(|e| {
432 DmapError::InvalidRecord(format!(
433 "Cannot interpret code at byte {}: {e}",
434 bytes_already_read
435 ))
436 })?;
437 let size = read_data::<i32>(cursor).map_err(|e| {
438 DmapError::InvalidRecord(format!(
439 "Cannot interpret size at byte {}: {e}",
440 bytes_already_read + i32::size() as u64
441 ))
442 })?;
443
444 if size as u64 > cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
446 {
447 return Err(DmapError::InvalidRecord(format!(
448 "Record size {size} at byte {} bigger than remaining buffer {}",
449 cursor.position() - i32::size() as u64,
450 cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
451 )));
452 } else if size <= 0 {
453 return Err(DmapError::InvalidRecord(format!("Record size {size} <= 0")));
454 }
455
456 let num_scalars = read_data::<i32>(cursor).map_err(|e| {
457 DmapError::InvalidRecord(format!(
458 "Cannot interpret number of scalars at byte {}: {e}",
459 cursor.position() - i32::size() as u64
460 ))
461 })?;
462 let num_vectors = read_data::<i32>(cursor).map_err(|e| {
463 DmapError::InvalidRecord(format!(
464 "Cannot interpret number of vectors at byte {}: {e}",
465 cursor.position() - i32::size() as u64
466 ))
467 })?;
468 if num_scalars <= 0 {
469 return Err(DmapError::InvalidRecord(format!(
470 "Number of scalars {num_scalars} at byte {} <= 0",
471 cursor.position() - 2 * i32::size() as u64
472 )));
473 } else if num_vectors <= 0 {
474 return Err(DmapError::InvalidRecord(format!(
475 "Number of vectors {num_vectors} at byte {} <= 0",
476 cursor.position() - i32::size() as u64
477 )));
478 } else if num_scalars + num_vectors > size {
479 return Err(DmapError::InvalidRecord(format!(
480 "Number of scalars {num_scalars} plus vectors {num_vectors} greater than size '{size}'")));
481 }
482
483 let mut fields: IndexMap<String, DmapField> = IndexMap::new();
484 for _ in 0..num_scalars {
485 let (name, val) = parse_scalar(cursor)?;
486 fields.insert(name, val);
487 }
488 for _ in 0..num_vectors {
489 let (name, val) = parse_vector(cursor, size)?;
490 fields.insert(name, val);
491 }
492
493 if cursor.position() - bytes_already_read != size as u64 {
494 return Err(DmapError::InvalidRecord(format!(
495 "Bytes read {} does not match the records size field {}",
496 cursor.position() - bytes_already_read,
497 size
498 )));
499 }
500
501 Self::new(&mut fields)
502 }
503
504 fn check_fields(
511 field_dict: &mut IndexMap<String, DmapField>,
512 fields_for_type: &Fields,
513 ) -> Result<(), DmapError> {
514 let unsupported_keys: Vec<&String> = field_dict
515 .keys()
516 .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
517 .collect();
518 if !unsupported_keys.is_empty() {
519 Err(DmapError::InvalidRecord(format!(
520 "Unsupported fields {:?}, fields supported are {:?}",
521 unsupported_keys, fields_for_type.all_fields
522 )))?
523 }
524
525 for (field, expected_type) in fields_for_type.scalars_required.iter() {
526 match field_dict.get(&field.to_string()) {
527 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
528 Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
529 "Field {} has incorrect type {}, expected {}",
530 field,
531 x.get_type(),
532 expected_type
533 )))?,
534 Some(_) => Err(DmapError::InvalidRecord(format!(
535 "Field {} is a vector, expected scalar",
536 field
537 )))?,
538 None => Err(DmapError::InvalidRecord(format!(
539 "Field {field:?} ({:?}) missing: fields {:?}",
540 &field.to_string(),
541 field_dict.keys()
542 )))?,
543 }
544 }
545 for (field, expected_type) in fields_for_type.scalars_optional.iter() {
546 match field_dict.get(&field.to_string()) {
547 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
548 Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
549 "Field {} has incorrect type {}, expected {}",
550 field,
551 x.get_type(),
552 expected_type
553 )))?,
554 Some(_) => Err(DmapError::InvalidRecord(format!(
555 "Field {} is a vector, expected scalar",
556 field
557 )))?,
558 None => {}
559 }
560 }
561 for (field, expected_type) in fields_for_type.vectors_required.iter() {
562 match field_dict.get(&field.to_string()) {
563 Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
564 "Field {} is a scalar, expected vector",
565 field
566 )))?,
567 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
568 Err(DmapError::InvalidRecord(format!(
569 "Field {field} has incorrect type {:?}, expected {expected_type:?}",
570 x.get_type()
571 )))?
572 }
573 Some(&DmapField::Vector(_)) => {}
574 None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
575 }
576 }
577 for (field, expected_type) in fields_for_type.vectors_optional.iter() {
578 match field_dict.get(&field.to_string()) {
579 Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
580 "Field {} is a scalar, expected vector",
581 field
582 )))?,
583 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
584 Err(DmapError::InvalidRecord(format!(
585 "Field {field} has incorrect type {}, expected {expected_type}",
586 x.get_type()
587 )))?
588 }
589 _ => {}
590 }
591 }
592 for vec_group in fields_for_type.vector_dim_groups.iter() {
594 let vecs: Vec<(&str, &DmapVec)> = vec_group
595 .iter()
596 .filter_map(|&name| match field_dict.get(&name.to_string()) {
597 Some(DmapField::Vector(ref x)) => Some((name, x)),
598 Some(_) => None,
599 None => None,
600 })
601 .collect();
602 if vecs.len() > 1 {
603 let mut vec_iter = vecs.iter();
604 let first = vec_iter.next().expect("Iterator broken");
605 if !vec_iter.all(|(_, v)| v.shape() == first.1.shape()) {
606 let error_vec: Vec<(&str, &[usize])> =
607 vecs.iter().map(|(k, v)| (*k, v.shape())).collect();
608 Err(DmapError::InvalidRecord(format!(
609 "Vector fields have inconsistent dimensions: {:?}",
610 error_vec
611 )))?
612 }
613 }
614 }
615 Ok(())
616 }
617
618 fn coerce(
620 fields_dict: &mut IndexMap<String, DmapField>,
621 fields_for_type: &Fields,
622 ) -> Result<Self, DmapError> {
623 let unsupported_keys: Vec<&String> = fields_dict
624 .keys()
625 .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
626 .collect();
627 if !unsupported_keys.is_empty() {
628 Err(DmapError::InvalidRecord(format!(
629 "Unsupported fields {:?}, fields supported are {:?}",
630 unsupported_keys, fields_for_type.all_fields
631 )))?
632 }
633
634 for (field, expected_type) in fields_for_type.scalars_required.iter() {
635 match fields_dict.get(&field.to_string()) {
636 Some(DmapField::Scalar(x)) if &x.get_type() != expected_type => {
637 fields_dict.insert(
638 field.to_string(),
639 DmapField::Scalar(x.cast_as(expected_type)?),
640 );
641 }
642 Some(DmapField::Scalar(_)) => {}
643 Some(_) => Err(DmapError::InvalidRecord(format!(
644 "Field {} is a vector, expected scalar",
645 field
646 )))?,
647 None => Err(DmapError::InvalidRecord(format!(
648 "Field {field:?} ({:?}) missing: fields {:?}",
649 &field.to_string(),
650 fields_dict.keys()
651 )))?,
652 }
653 }
654 for (field, expected_type) in fields_for_type.scalars_optional.iter() {
655 match fields_dict.get(&field.to_string()) {
656 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
657 Some(DmapField::Scalar(x)) => {
658 fields_dict.insert(
659 field.to_string(),
660 DmapField::Scalar(x.cast_as(expected_type)?),
661 );
662 }
663 Some(_) => Err(DmapError::InvalidRecord(format!(
664 "Field {} is a vector, expected scalar",
665 field
666 )))?,
667 None => {}
668 }
669 }
670 for (field, expected_type) in fields_for_type.vectors_required.iter() {
671 match fields_dict.get(&field.to_string()) {
672 Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
673 "Field {} is a scalar, expected vector",
674 field
675 )))?,
676 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
677 Err(DmapError::InvalidRecord(format!(
678 "Field {field} has incorrect type {:?}, expected {expected_type:?}",
679 x.get_type()
680 )))?
681 }
682 Some(DmapField::Vector(_)) => {}
683 None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
684 }
685 }
686 for (field, expected_type) in fields_for_type.vectors_optional.iter() {
687 match fields_dict.get(&field.to_string()) {
688 Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
689 "Field {} is a scalar, expected vector",
690 field
691 )))?,
692 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
693 Err(DmapError::InvalidRecord(format!(
694 "Field {field} has incorrect type {}, expected {expected_type}",
695 x.get_type()
696 )))?
697 }
698 _ => {}
699 }
700 }
701
702 Self::new(fields_dict)
703 }
704
705 fn to_bytes(&self) -> Result<Vec<u8>, DmapError>;
707
708 fn data_to_bytes(
716 data: &IndexMap<String, DmapField>,
717 fields_for_type: &Fields,
718 ) -> Result<(i32, i32, Vec<u8>), DmapError> {
719 let mut data_bytes: Vec<u8> = vec![];
720 let mut num_scalars: i32 = 0;
721 let mut num_vectors: i32 = 0;
722
723 for (field, _) in fields_for_type.scalars_required.iter() {
725 match data.get(&field.to_string()) {
726 Some(x @ DmapField::Scalar(_)) => {
727 data_bytes.extend(field.as_bytes());
728 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
730 num_scalars += 1;
731 }
732 Some(_) => Err(DmapError::InvalidScalar(format!(
733 "Field {field} is a vector, expected scalar"
734 )))?,
735 None => Err(DmapError::InvalidRecord(format!(
736 "Field {field} missing from record"
737 )))?,
738 }
739 }
740 for (field, _) in fields_for_type.scalars_optional.iter() {
741 if let Some(x) = data.get(&field.to_string()) {
742 match x {
743 DmapField::Scalar(_) => {
744 data_bytes.extend(field.as_bytes());
745 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
747 num_scalars += 1;
748 }
749 DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
750 "Field {field} is a vector, expected scalar"
751 )))?,
752 }
753 }
754 }
755 for (field, _) in fields_for_type.vectors_required.iter() {
756 match data.get(&field.to_string()) {
757 Some(x @ DmapField::Vector(_)) => {
758 data_bytes.extend(field.as_bytes());
759 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
761 num_vectors += 1;
762 }
763 Some(_) => Err(DmapError::InvalidVector(format!(
764 "Field {field} is a scalar, expected vector"
765 )))?,
766 None => Err(DmapError::InvalidRecord(format!(
767 "Field {field} missing from record"
768 )))?,
769 }
770 }
771 for (field, _) in fields_for_type.vectors_optional.iter() {
772 if let Some(x) = data.get(&field.to_string()) {
773 match x {
774 DmapField::Vector(_) => {
775 data_bytes.extend(field.as_bytes());
776 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
778 num_vectors += 1;
779 }
780 DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
781 "Field {field} is a scalar, expected vector"
782 )))?,
783 }
784 }
785 }
786
787 Ok((num_scalars, num_vectors, data_bytes))
788 }
789
790 fn inspect_bytes(
797 &self,
798 fields_for_type: &Fields,
799 ) -> Result<Vec<(String, usize, Vec<u8>)>, DmapError> {
800 let mut data_bytes: Vec<Vec<u8>> = vec![];
801 let mut indices: Vec<usize> = vec![16]; let mut fields: Vec<String> = vec![];
803
804 let (mut num_scalars, mut num_vectors) = (0, 0);
805
806 for (field, _) in fields_for_type.scalars_required.iter() {
807 fields.push(field.to_string());
808 match self.get(field) {
809 Some(x @ DmapField::Scalar(_)) => {
810 let mut bytes = vec![];
811 bytes.extend(field.as_bytes());
812 bytes.extend([0]); bytes.append(&mut x.as_bytes());
814 indices.push(indices[indices.len() - 1] + bytes.len());
815 data_bytes.push(bytes);
816 num_scalars += 1;
817 }
818 Some(_) => Err(DmapError::InvalidScalar(format!(
819 "Field {field} is a vector, expected scalar"
820 )))?,
821 None => Err(DmapError::InvalidRecord(format!(
822 "Field {field} missing from record"
823 )))?,
824 }
825 }
826 for (field, _) in fields_for_type.scalars_optional.iter() {
827 fields.push(field.to_string());
828 if let Some(x) = self.get(field) {
829 match x {
830 DmapField::Scalar(_) => {
831 let mut bytes = vec![];
832 bytes.extend(field.as_bytes());
833 bytes.extend([0]); bytes.append(&mut x.as_bytes());
835 indices.push(indices[indices.len() - 1] + bytes.len());
836 data_bytes.push(bytes);
837 num_scalars += 1;
838 }
839 DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
840 "Field {field} is a vector, expected scalar"
841 )))?,
842 }
843 }
844 }
845 for (field, _) in fields_for_type.vectors_required.iter() {
846 fields.push(field.to_string());
847 match self.get(field) {
848 Some(x @ DmapField::Vector(_)) => {
849 let mut bytes = vec![];
850 bytes.extend(field.as_bytes());
851 bytes.extend([0]); bytes.append(&mut x.as_bytes());
853 indices.push(indices[indices.len() - 1] + bytes.len());
854 data_bytes.push(bytes);
855 num_vectors += 1;
856 }
857 Some(_) => Err(DmapError::InvalidVector(format!(
858 "Field {field} is a scalar, expected vector"
859 )))?,
860 None => Err(DmapError::InvalidRecord(format!(
861 "Field {field} missing from record"
862 )))?,
863 }
864 }
865 for (field, _) in fields_for_type.vectors_optional.iter() {
866 fields.push(field.to_string());
867 if let Some(x) = self.get(field) {
868 match x {
869 DmapField::Vector(_) => {
870 let mut bytes = vec![];
871 bytes.extend(field.as_bytes());
872 bytes.extend([0]); bytes.append(&mut x.as_bytes());
874 indices.push(indices[indices.len() - 1] + data_bytes.len());
875 data_bytes.push(bytes);
876 num_vectors += 1;
877 }
878 DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
879 "Field {field} is a scalar, expected vector"
880 )))?,
881 }
882 }
883 }
884
885 let num_bytes: usize = data_bytes.iter().map(|x| x.len()).sum();
887 let mut bytes: Vec<u8> = vec![];
888 bytes.extend((65537_i32).as_bytes()); bytes.extend((num_bytes as i32 + 16).as_bytes()); bytes.extend(num_scalars.as_bytes());
891 bytes.extend(num_vectors.as_bytes());
892
893 let mut field_info: Vec<(String, usize, Vec<u8>)> = vec![("header".to_string(), 0, bytes)];
895 for (f, (s, b)) in izip!(
896 fields.into_iter(),
897 izip!(indices[..indices.len() - 1].iter(), data_bytes.into_iter())
898 ) {
899 field_info.push((f, *s, b));
900 }
901
902 Ok(field_info)
903 }
904
905 fn into_bytes(recs: &Vec<Self>) -> Result<Vec<u8>, DmapError> {
909 let mut bytes: Vec<u8> = vec![];
910 let (errors, rec_bytes): (Vec<_>, Vec<_>) =
911 recs.par_iter()
912 .enumerate()
913 .partition_map(|(i, rec)| match rec.to_bytes() {
914 Err(e) => Either::Left((i, e)),
915 Ok(y) => Either::Right(y),
916 });
917 if !errors.is_empty() {
918 Err(DmapError::InvalidRecord(format!(
919 "Corrupted records: {errors:?}"
920 )))?
921 }
922 bytes.par_extend(rec_bytes.into_par_iter().flatten());
923 Ok(bytes)
924 }
925
926 fn try_into_bytes(recs: Vec<IndexMap<String, DmapField>>) -> Result<Vec<u8>, DmapError> {
928 let mut bytes: Vec<u8> = vec![];
929 let (errors, rec_bytes): (Vec<_>, Vec<_>) =
930 recs.into_par_iter()
931 .enumerate()
932 .partition_map(|(i, rec)| match Self::try_from(rec) {
933 Err(e) => Either::Left((i, e)),
934 Ok(x) => match x.to_bytes() {
935 Err(e) => Either::Left((i, e)),
936 Ok(y) => Either::Right(y),
937 },
938 });
939 if !errors.is_empty() {
940 Err(DmapError::BadRecords(
941 errors.iter().map(|(i, _)| *i).collect(),
942 errors[0].1.to_string(),
943 ))?
944 }
945 bytes.par_extend(rec_bytes.into_par_iter().flatten());
946 Ok(bytes)
947 }
948
949 fn write_to_file<P: AsRef<Path>>(recs: &Vec<Self>, outfile: P) -> Result<(), DmapError> {
954 let bytes: Vec<u8> = Self::into_bytes(recs)?;
955 io::bytes_to_file(bytes, outfile)?;
956 Ok(())
957 }
958}
959
960macro_rules! create_record_type {
961 ($format:ident, $fields:ident) => {
962 paste::paste! {
963 use crate::types::{DmapType, DmapField};
964 use crate::error::DmapError;
965 use indexmap::IndexMap;
966 use crate::record::Record;
967
968 #[doc = "Struct containing the checked fields of a single `" $format:upper "` record." ]
969 #[derive(Debug, PartialEq, Clone)]
970 pub struct [< $format:camel Record >] {
971 pub data: IndexMap<String, DmapField>,
972 }
973
974 impl Record<'_> for [< $format:camel Record>] {
975 fn inner(self) -> IndexMap<String, DmapField> {
976 self.data
977 }
978 fn get(&self, key: &str) -> Option<&DmapField> {
979 self.data.get(key)
980 }
981 fn keys(&self) -> Vec<&String> {
982 self.data.keys().collect()
983 }
984 fn new(fields: &mut IndexMap<String, DmapField>) -> Result<[< $format:camel Record>], DmapError> {
985 match Self::check_fields(fields, &$fields) {
986 Ok(_) => {}
987 Err(e) => Err(e)?,
988 }
989
990 Ok([< $format:camel Record >] {
991 data: fields.to_owned(),
992 })
993 }
994 fn to_bytes(&self) -> Result<Vec<u8>, DmapError> {
995 let (num_scalars, num_vectors, mut data_bytes) =
996 Self::data_to_bytes(&self.data, &$fields)?;
997
998 let mut bytes: Vec<u8> = vec![];
999 bytes.extend((65537_i32).as_bytes()); bytes.extend((data_bytes.len() as i32 + 16).as_bytes()); bytes.extend(num_scalars.as_bytes());
1002 bytes.extend(num_vectors.as_bytes());
1003 bytes.append(&mut data_bytes); Ok(bytes)
1005 }
1006 fn is_metadata_field(name: &str) -> bool {
1007 !$fields.data_fields.iter().any(|e| e == &name)
1008 }
1009 }
1010
1011 impl TryFrom<&mut IndexMap<String, DmapField>> for [< $format:camel Record >] {
1012 type Error = DmapError;
1013
1014 fn try_from(value: &mut IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
1015 Self::coerce(value, &$fields)
1016 }
1017 }
1018
1019 impl TryFrom<IndexMap<String, DmapField>> for [< $format:camel Record >] {
1020 type Error = DmapError;
1021
1022 fn try_from(mut value: IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
1023 Self::coerce(&mut value, &$fields)
1024 }
1025 }
1026
1027 #[cfg(test)]
1028 mod tests {
1029 use super::*;
1030 use std::path::PathBuf;
1031
1032 #[test]
1035 fn test_missing_optional_fields() -> Result<(), DmapError> {
1036 let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
1037 let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
1038 let recs = data.inner();
1039
1040 for field in $fields.scalars_optional.iter().chain($fields.vectors_optional.iter()) {
1041 let mut cloned_rec = recs.clone();
1042 let _ = cloned_rec.shift_remove(field.0);
1043 let _ = [< $format:camel Record >]::try_from(&mut cloned_rec)?;
1044 }
1045 Ok(())
1046 }
1047
1048 #[test]
1051 fn test_missing_required_fields() -> Result<(), DmapError> {
1052 let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
1053 let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
1054 let recs = data.inner();
1055
1056 for field in $fields.scalars_required.iter().chain($fields.vectors_required.iter()) {
1057 let mut cloned_rec = recs.clone();
1058 let _ = cloned_rec.shift_remove(field.0);
1059 let res = [< $format:camel Record >]::try_from(&mut cloned_rec);
1060 assert!(res.is_err());
1061 }
1062 Ok(())
1063 }
1064 }
1065 }
1066 }
1067}
1068
1069pub(crate) use create_record_type;