dmap/
record.rs

1//! Defines the [`Record`] trait, which contains the shared behaviour that all DMAP records must have.
2
3use crate::compression::detect_bz2;
4use crate::error::DmapError;
5use crate::io;
6use crate::types::{
7    parse_scalar, parse_vector, parse_vector_header, read_data, DmapField, DmapType, DmapVec,
8    Fields,
9};
10use bzip2::read::BzDecoder;
11use indexmap::IndexMap;
12use itertools::izip;
13use rayon::iter::Either;
14use rayon::prelude::*;
15use std::fmt::Debug;
16use std::fs::File;
17use std::io::{Cursor, Read};
18use std::path::Path;
19
20/// DMAP record template.
21///
22/// This trait defines functionality for parsing bytes into records, converting records to bytes,
23/// and reading from / writing to files.
24pub trait Record<'a>:
25    Debug + Send + Sync + TryFrom<IndexMap<String, DmapField>, Error = DmapError>
26{
27    /// Creates a new object from the parsed scalars and vectors.
28    fn new(fields: &mut IndexMap<String, DmapField>) -> Result<Self, DmapError>
29    where
30        Self: Sized;
31
32    /// Gets the underlying data of `self`.
33    fn inner(self) -> IndexMap<String, DmapField>;
34
35    /// Returns the field with name `key`, if it exists in the record.
36    fn get(&self, key: &str) -> Option<&DmapField>;
37
38    /// Returns the names of all fields stored in the record.
39    fn keys(&self) -> Vec<&String>;
40
41    /// Returns whether `name` is a metadata field of the record.
42    fn is_metadata_field(name: &str) -> bool;
43
44    /// Reads from `dmap_data` and parses into `Vec<Self>`.
45    ///
46    /// Returns `DmapError` if `dmap_data` cannot be read or contains invalid data.
47    fn read_first_record(mut dmap_data: impl Read) -> Result<Self, DmapError>
48    where
49        Self: Sized,
50        Self: Send,
51    {
52        let mut stream: Box<dyn Read>;
53        let (is_bz2, chunk) = detect_bz2(&mut dmap_data)?;
54        if is_bz2 {
55            stream = Box::new(BzDecoder::new(chunk));
56        } else {
57            stream = Box::new(chunk);
58        }
59
60        let mut buffer = [0; 8]; // record size should be an i32 of the data
61        stream
62            .read_exact(&mut buffer)
63            .map_err(|_| DmapError::CorruptStream("Unable to read size of first record"))?;
64
65        let rec_size = i32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize; // advance 4 bytes, skipping the "code" field
66        if rec_size == 0 {
67            return Err(DmapError::InvalidRecord(format!(
68                "Record 0 starting at byte 0 has non-positive size {} <= 0",
69                rec_size
70            )));
71        }
72
73        let mut rec = vec![0; rec_size];
74        rec[0..8].clone_from_slice(&buffer[..]);
75        stream.read_exact(&mut rec[8..])?;
76        let first_rec = Self::parse_record(&mut Cursor::new(rec))?;
77
78        Ok(first_rec)
79    }
80
81    /// Reads from `dmap_data` and parses into `Vec<Self>`.
82    ///
83    /// Returns `DmapError` if `dmap_data` cannot be read or contains invalid data.
84    fn read_records(mut dmap_data: impl Read) -> Result<Vec<Self>, DmapError>
85    where
86        Self: Sized,
87        Self: Send,
88    {
89        let mut buffer: Vec<u8> = vec![];
90        let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
91        if is_bz2 {
92            let mut stream = BzDecoder::new(chunk);
93            stream.read_to_end(&mut buffer)?;
94        } else {
95            chunk.read_to_end(&mut buffer)?;
96        }
97
98        let mut slices: Vec<_> = vec![];
99        let mut rec_start: usize = 0;
100        let mut rec_size: usize;
101        let mut rec_end: usize;
102
103        while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
104            rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
105                as usize; // advance 4 bytes, skipping the "code" field
106            rec_end = rec_start + rec_size; // error-checking the size is conducted in Self::parse_record()
107            if rec_end > buffer.len() {
108                return Err(DmapError::InvalidRecord(format!("Record {} starting at byte {} has size greater than remaining length of buffer ({} > {})", slices.len(), rec_start, rec_size, buffer.len() - rec_start)));
109            } else if rec_size == 0 {
110                return Err(DmapError::InvalidRecord(format!(
111                    "Record {} starting at byte {} has non-positive size {} <= 0",
112                    slices.len(),
113                    rec_start,
114                    rec_size
115                )));
116            }
117            slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
118            rec_start = rec_end;
119        }
120        if rec_start != buffer.len() {
121            return Err(DmapError::InvalidRecord(format!(
122                "Record {} starting at byte {} incomplete; has size of {} bytes",
123                slices.len() + 1,
124                rec_start,
125                buffer.len() - rec_start
126            )));
127        }
128        let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
129        dmap_results.par_extend(
130            slices
131                .par_iter_mut()
132                .map(|cursor| Self::parse_record(cursor)),
133        );
134
135        let mut dmap_records: Vec<Self> = vec![];
136        let mut bad_recs: Vec<usize> = vec![];
137        let mut dmap_errors: Vec<DmapError> = vec![];
138        for (i, rec) in dmap_results.into_iter().enumerate() {
139            match rec {
140                Ok(x) => dmap_records.push(x),
141                Err(e) => {
142                    dmap_errors.push(e);
143                    bad_recs.push(i);
144                }
145            }
146        }
147        if !dmap_errors.is_empty() {
148            return Err(DmapError::BadRecords(bad_recs, dmap_errors[0].to_string()));
149        }
150        Ok(dmap_records)
151    }
152
153    /// Reads metadata of records from `dmap_data` and parses into `Vec<Self>`.
154    ///
155    /// Returns `DmapError` if `dmap_data` cannot be read or contains invalid data.
156    fn read_metadata(
157        mut dmap_data: impl Read,
158    ) -> Result<Vec<IndexMap<String, DmapField>>, DmapError>
159    where
160        Self: Sized,
161        Self: Send,
162    {
163        let mut buffer: Vec<u8> = vec![];
164        let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
165        if is_bz2 {
166            let mut stream = BzDecoder::new(chunk);
167            stream.read_to_end(&mut buffer)?;
168        } else {
169            chunk.read_to_end(&mut buffer)?;
170        }
171
172        let mut slices: Vec<_> = vec![];
173        let mut rec_start: usize = 0;
174        let mut rec_size: usize;
175        let mut rec_end: usize;
176
177        while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
178            rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
179                as usize; // advance 4 bytes, skipping the "code" field
180            rec_end = rec_start + rec_size; // error-checking the size is conducted in Self::parse_record()
181            if rec_end > buffer.len() {
182                return Err(DmapError::InvalidRecord(format!("Record {} starting at byte {} has size greater than remaining length of buffer ({} > {})", slices.len(), rec_start, rec_size, buffer.len() - rec_start)));
183            } else if rec_size == 0 {
184                return Err(DmapError::InvalidRecord(format!(
185                    "Record {} starting at byte {} has non-positive size {} <= 0",
186                    slices.len(),
187                    rec_start,
188                    rec_size
189                )));
190            }
191            slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
192            rec_start = rec_end;
193        }
194        if rec_start != buffer.len() {
195            return Err(DmapError::InvalidRecord(format!(
196                "Record {} starting at byte {} incomplete; has size of {} bytes",
197                slices.len() + 1,
198                rec_start,
199                buffer.len() - rec_start
200            )));
201        }
202        let mut dmap_results: Vec<Result<IndexMap<String, DmapField>, DmapError>> = vec![];
203        dmap_results.par_extend(
204            slices
205                .par_iter_mut()
206                .map(|cursor| Self::parse_metadata(cursor)),
207        );
208
209        let mut dmap_records: Vec<IndexMap<String, DmapField>> = vec![];
210        let mut bad_recs: Vec<usize> = vec![];
211        let mut dmap_errors: Vec<DmapError> = vec![];
212        for (i, rec) in dmap_results.into_iter().enumerate() {
213            match rec {
214                Ok(x) => dmap_records.push(x),
215                Err(e) => {
216                    dmap_errors.push(e);
217                    bad_recs.push(i);
218                }
219            }
220        }
221        if !dmap_errors.is_empty() {
222            return Err(DmapError::BadRecords(bad_recs, dmap_errors[0].to_string()));
223        }
224
225        Ok(dmap_records)
226    }
227
228    /// Reads from `dmap_data` and parses into `Vec<Self>`.
229    ///
230    /// Returns a 2-tuple, where the first entry is the good records from the front of the buffer,
231    /// and the second entry is the byte where the first corrupted record starts, if applicable.
232    fn read_records_lax(mut dmap_data: impl Read) -> Result<(Vec<Self>, Option<usize>), DmapError>
233    where
234        Self: Sized,
235        Self: Send,
236    {
237        let mut buffer: Vec<u8> = vec![];
238        let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
239        if is_bz2 {
240            let mut stream = BzDecoder::new(chunk);
241            stream.read_to_end(&mut buffer)?;
242        } else {
243            chunk.read_to_end(&mut buffer)?;
244        }
245
246        let mut dmap_records: Vec<Self> = vec![];
247        let mut bad_byte: Option<usize> = None;
248
249        let mut slices: Vec<_> = vec![];
250        let mut rec_start: usize = 0;
251        let mut rec_size: usize;
252        let mut rec_end: usize;
253
254        let mut rec_starts = vec![];
255        while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
256            rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
257                as usize; // advance 4 bytes, skipping the "code" field
258            rec_end = rec_start + rec_size; // error-checking the size is conducted in Self::parse_record()
259            if rec_end > buffer.len() || rec_size == 0 {
260                bad_byte = Some(rec_start);
261                break;
262                // rec_start = buffer.len(); // break from loop
263            } else {
264                rec_starts.push(rec_start);
265                slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
266                rec_start = rec_end;
267            }
268        }
269        if rec_start != buffer.len() {
270            bad_byte = Some(rec_start);
271        }
272        let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
273        dmap_results.par_extend(
274            slices
275                .par_iter_mut()
276                .map(|cursor| Self::parse_record(cursor)),
277        );
278
279        for (i, rec) in dmap_results.into_iter().enumerate() {
280            if let Ok(x) = rec {
281                dmap_records.push(x);
282            } else {
283                bad_byte = Some(rec_starts[i]);
284                break;
285            }
286        }
287        Ok((dmap_records, bad_byte))
288    }
289
290    /// Read a DMAP file of type `Self`
291    fn read_file<P: AsRef<Path>>(infile: P) -> Result<Vec<Self>, DmapError>
292    where
293        Self: Sized,
294        Self: Send,
295    {
296        let file = File::open(infile)?;
297        Self::read_records(file)
298    }
299
300    /// Read a DMAP file of type `Self`.
301    ///
302    /// If the file is corrupted, it will return the leading uncorrupted records as well as the
303    /// position corresponding to the start of the first corrupted record.
304    fn read_file_lax<P: AsRef<Path>>(infile: P) -> Result<(Vec<Self>, Option<usize>), DmapError>
305    where
306        Self: Sized,
307        Self: Send,
308    {
309        let file = File::open(infile)?;
310        Self::read_records_lax(file)
311    }
312
313    /// Reads the first record of a DMAP file of type `Self`.
314    fn sniff_file<P: AsRef<Path>>(infile: P) -> Result<Self, DmapError>
315    where
316        Self: Sized,
317        Self: Send,
318    {
319        let file = File::open(infile)?;
320        Self::read_first_record(file)
321    }
322
323    /// Read the metadata from a DMAP file of type `Self`
324    fn read_file_metadata<P: AsRef<Path>>(
325        infile: P,
326    ) -> Result<Vec<IndexMap<String, DmapField>>, DmapError>
327    where
328        Self: Sized,
329        Self: Send,
330    {
331        let file = File::open(infile)?;
332        Self::read_metadata(file)
333    }
334
335    /// Reads a record from `cursor`, only keeping the metadata fields.
336    fn parse_metadata(
337        cursor: &mut Cursor<Vec<u8>>,
338    ) -> Result<IndexMap<String, DmapField>, DmapError>
339    where
340        Self: Sized,
341    {
342        let bytes_already_read = cursor.position();
343        let _code = read_data::<i32>(cursor).map_err(|e| {
344            DmapError::InvalidRecord(format!(
345                "Cannot interpret code at byte {}: {e}",
346                bytes_already_read
347            ))
348        })?;
349        let size = read_data::<i32>(cursor).map_err(|e| {
350            DmapError::InvalidRecord(format!(
351                "Cannot interpret size at byte {}: {e}",
352                bytes_already_read + i32::size() as u64
353            ))
354        })?;
355
356        // adding 8 bytes because code and size are part of the record.
357        if size as u64 > cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
358        {
359            return Err(DmapError::InvalidRecord(format!(
360                "Record size {size} at byte {} bigger than remaining buffer {}",
361                cursor.position() - i32::size() as u64,
362                cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
363            )));
364        } else if size <= 0 {
365            return Err(DmapError::InvalidRecord(format!("Record size {size} <= 0")));
366        }
367
368        let num_scalars = read_data::<i32>(cursor).map_err(|e| {
369            DmapError::InvalidRecord(format!(
370                "Cannot interpret number of scalars at byte {}: {e}",
371                cursor.position() - i32::size() as u64
372            ))
373        })?;
374        let num_vectors = read_data::<i32>(cursor).map_err(|e| {
375            DmapError::InvalidRecord(format!(
376                "Cannot interpret number of vectors at byte {}: {e}",
377                cursor.position() - i32::size() as u64
378            ))
379        })?;
380        if num_scalars <= 0 {
381            return Err(DmapError::InvalidRecord(format!(
382                "Number of scalars {num_scalars} at byte {} <= 0",
383                cursor.position() - 2 * i32::size() as u64
384            )));
385        } else if num_vectors <= 0 {
386            return Err(DmapError::InvalidRecord(format!(
387                "Number of vectors {num_vectors} at byte {} <= 0",
388                cursor.position() - i32::size() as u64
389            )));
390        } else if num_scalars + num_vectors > size {
391            return Err(DmapError::InvalidRecord(format!(
392                "Number of scalars {num_scalars} plus vectors {num_vectors} greater than size '{size}'")));
393        }
394
395        let mut fields: IndexMap<String, DmapField> = IndexMap::new();
396        for _ in 0..num_scalars {
397            let (name, val) = parse_scalar(cursor)?;
398            fields.insert(name, val);
399        }
400        for _ in 0..num_vectors {
401            let here = cursor.position();
402            let (name, dtype, _dims, num_elements) = parse_vector_header(cursor, size)?;
403            if Self::is_metadata_field(&name) {
404                cursor.set_position(here);
405                let (_, val) = parse_vector(cursor, size)?;
406                fields.insert(name.to_string(), val);
407            } else {
408                let vec_data_size = dtype.size() as u64 * num_elements as u64;
409                let here = cursor.position();
410                cursor.set_position(here + vec_data_size);
411            }
412        }
413
414        if cursor.position() - bytes_already_read != size as u64 {
415            return Err(DmapError::InvalidRecord(format!(
416                "Bytes read {} does not match the records size field {}",
417                cursor.position() - bytes_already_read,
418                size
419            )));
420        }
421
422        Ok(fields)
423    }
424
425    /// Reads a record from `cursor`.
426    fn parse_record(cursor: &mut Cursor<Vec<u8>>) -> Result<Self, DmapError>
427    where
428        Self: Sized,
429    {
430        let bytes_already_read = cursor.position();
431        let _code = read_data::<i32>(cursor).map_err(|e| {
432            DmapError::InvalidRecord(format!(
433                "Cannot interpret code at byte {}: {e}",
434                bytes_already_read
435            ))
436        })?;
437        let size = read_data::<i32>(cursor).map_err(|e| {
438            DmapError::InvalidRecord(format!(
439                "Cannot interpret size at byte {}: {e}",
440                bytes_already_read + i32::size() as u64
441            ))
442        })?;
443
444        // adding 8 bytes because code and size are part of the record.
445        if size as u64 > cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
446        {
447            return Err(DmapError::InvalidRecord(format!(
448                "Record size {size} at byte {} bigger than remaining buffer {}",
449                cursor.position() - i32::size() as u64,
450                cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
451            )));
452        } else if size <= 0 {
453            return Err(DmapError::InvalidRecord(format!("Record size {size} <= 0")));
454        }
455
456        let num_scalars = read_data::<i32>(cursor).map_err(|e| {
457            DmapError::InvalidRecord(format!(
458                "Cannot interpret number of scalars at byte {}: {e}",
459                cursor.position() - i32::size() as u64
460            ))
461        })?;
462        let num_vectors = read_data::<i32>(cursor).map_err(|e| {
463            DmapError::InvalidRecord(format!(
464                "Cannot interpret number of vectors at byte {}: {e}",
465                cursor.position() - i32::size() as u64
466            ))
467        })?;
468        if num_scalars <= 0 {
469            return Err(DmapError::InvalidRecord(format!(
470                "Number of scalars {num_scalars} at byte {} <= 0",
471                cursor.position() - 2 * i32::size() as u64
472            )));
473        } else if num_vectors <= 0 {
474            return Err(DmapError::InvalidRecord(format!(
475                "Number of vectors {num_vectors} at byte {} <= 0",
476                cursor.position() - i32::size() as u64
477            )));
478        } else if num_scalars + num_vectors > size {
479            return Err(DmapError::InvalidRecord(format!(
480                "Number of scalars {num_scalars} plus vectors {num_vectors} greater than size '{size}'")));
481        }
482
483        let mut fields: IndexMap<String, DmapField> = IndexMap::new();
484        for _ in 0..num_scalars {
485            let (name, val) = parse_scalar(cursor)?;
486            fields.insert(name, val);
487        }
488        for _ in 0..num_vectors {
489            let (name, val) = parse_vector(cursor, size)?;
490            fields.insert(name, val);
491        }
492
493        if cursor.position() - bytes_already_read != size as u64 {
494            return Err(DmapError::InvalidRecord(format!(
495                "Bytes read {} does not match the records size field {}",
496                cursor.position() - bytes_already_read,
497                size
498            )));
499        }
500
501        Self::new(&mut fields)
502    }
503
504    /// Checks the validity of an `IndexMap` as a representation of a DMAP record.
505    ///
506    /// Validity checks include ensuring that no unfamiliar entries exist, that all required
507    /// scalar and vector fields exist, that all scalar and vector fields are of the expected
508    /// type, and that vector fields which are expected to have the same dimensions do indeed
509    /// have the same dimensions.
510    fn check_fields(
511        field_dict: &mut IndexMap<String, DmapField>,
512        fields_for_type: &Fields,
513    ) -> Result<(), DmapError> {
514        let unsupported_keys: Vec<&String> = field_dict
515            .keys()
516            .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
517            .collect();
518        if !unsupported_keys.is_empty() {
519            Err(DmapError::InvalidRecord(format!(
520                "Unsupported fields {:?}, fields supported are {:?}",
521                unsupported_keys, fields_for_type.all_fields
522            )))?
523        }
524
525        for (field, expected_type) in fields_for_type.scalars_required.iter() {
526            match field_dict.get(&field.to_string()) {
527                Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
528                Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
529                    "Field {} has incorrect type {}, expected {}",
530                    field,
531                    x.get_type(),
532                    expected_type
533                )))?,
534                Some(_) => Err(DmapError::InvalidRecord(format!(
535                    "Field {} is a vector, expected scalar",
536                    field
537                )))?,
538                None => Err(DmapError::InvalidRecord(format!(
539                    "Field {field:?} ({:?}) missing: fields {:?}",
540                    &field.to_string(),
541                    field_dict.keys()
542                )))?,
543            }
544        }
545        for (field, expected_type) in fields_for_type.scalars_optional.iter() {
546            match field_dict.get(&field.to_string()) {
547                Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
548                Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
549                    "Field {} has incorrect type {}, expected {}",
550                    field,
551                    x.get_type(),
552                    expected_type
553                )))?,
554                Some(_) => Err(DmapError::InvalidRecord(format!(
555                    "Field {} is a vector, expected scalar",
556                    field
557                )))?,
558                None => {}
559            }
560        }
561        for (field, expected_type) in fields_for_type.vectors_required.iter() {
562            match field_dict.get(&field.to_string()) {
563                Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
564                    "Field {} is a scalar, expected vector",
565                    field
566                )))?,
567                Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
568                    Err(DmapError::InvalidRecord(format!(
569                        "Field {field} has incorrect type {:?}, expected {expected_type:?}",
570                        x.get_type()
571                    )))?
572                }
573                Some(&DmapField::Vector(_)) => {}
574                None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
575            }
576        }
577        for (field, expected_type) in fields_for_type.vectors_optional.iter() {
578            match field_dict.get(&field.to_string()) {
579                Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
580                    "Field {} is a scalar, expected vector",
581                    field
582                )))?,
583                Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
584                    Err(DmapError::InvalidRecord(format!(
585                        "Field {field} has incorrect type {}, expected {expected_type}",
586                        x.get_type()
587                    )))?
588                }
589                _ => {}
590            }
591        }
592        // This block checks that grouped vector fields have the same dimensionality
593        for vec_group in fields_for_type.vector_dim_groups.iter() {
594            let vecs: Vec<(&str, &DmapVec)> = vec_group
595                .iter()
596                .filter_map(|&name| match field_dict.get(&name.to_string()) {
597                    Some(DmapField::Vector(ref x)) => Some((name, x)),
598                    Some(_) => None,
599                    None => None,
600                })
601                .collect();
602            if vecs.len() > 1 {
603                let mut vec_iter = vecs.iter();
604                let first = vec_iter.next().expect("Iterator broken");
605                if !vec_iter.all(|(_, v)| v.shape() == first.1.shape()) {
606                    let error_vec: Vec<(&str, &[usize])> =
607                        vecs.iter().map(|(k, v)| (*k, v.shape())).collect();
608                    Err(DmapError::InvalidRecord(format!(
609                        "Vector fields have inconsistent dimensions: {:?}",
610                        error_vec
611                    )))?
612                }
613            }
614        }
615        Ok(())
616    }
617
618    /// Attempts to massage the entries of an `IndexMap` into the proper types for a DMAP record.
619    fn coerce(
620        fields_dict: &mut IndexMap<String, DmapField>,
621        fields_for_type: &Fields,
622    ) -> Result<Self, DmapError> {
623        let unsupported_keys: Vec<&String> = fields_dict
624            .keys()
625            .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
626            .collect();
627        if !unsupported_keys.is_empty() {
628            Err(DmapError::InvalidRecord(format!(
629                "Unsupported fields {:?}, fields supported are {:?}",
630                unsupported_keys, fields_for_type.all_fields
631            )))?
632        }
633
634        for (field, expected_type) in fields_for_type.scalars_required.iter() {
635            match fields_dict.get(&field.to_string()) {
636                Some(DmapField::Scalar(x)) if &x.get_type() != expected_type => {
637                    fields_dict.insert(
638                        field.to_string(),
639                        DmapField::Scalar(x.cast_as(expected_type)?),
640                    );
641                }
642                Some(DmapField::Scalar(_)) => {}
643                Some(_) => Err(DmapError::InvalidRecord(format!(
644                    "Field {} is a vector, expected scalar",
645                    field
646                )))?,
647                None => Err(DmapError::InvalidRecord(format!(
648                    "Field {field:?} ({:?}) missing: fields {:?}",
649                    &field.to_string(),
650                    fields_dict.keys()
651                )))?,
652            }
653        }
654        for (field, expected_type) in fields_for_type.scalars_optional.iter() {
655            match fields_dict.get(&field.to_string()) {
656                Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
657                Some(DmapField::Scalar(x)) => {
658                    fields_dict.insert(
659                        field.to_string(),
660                        DmapField::Scalar(x.cast_as(expected_type)?),
661                    );
662                }
663                Some(_) => Err(DmapError::InvalidRecord(format!(
664                    "Field {} is a vector, expected scalar",
665                    field
666                )))?,
667                None => {}
668            }
669        }
670        for (field, expected_type) in fields_for_type.vectors_required.iter() {
671            match fields_dict.get(&field.to_string()) {
672                Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
673                    "Field {} is a scalar, expected vector",
674                    field
675                )))?,
676                Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
677                    Err(DmapError::InvalidRecord(format!(
678                        "Field {field} has incorrect type {:?}, expected {expected_type:?}",
679                        x.get_type()
680                    )))?
681                }
682                Some(DmapField::Vector(_)) => {}
683                None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
684            }
685        }
686        for (field, expected_type) in fields_for_type.vectors_optional.iter() {
687            match fields_dict.get(&field.to_string()) {
688                Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
689                    "Field {} is a scalar, expected vector",
690                    field
691                )))?,
692                Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
693                    Err(DmapError::InvalidRecord(format!(
694                        "Field {field} has incorrect type {}, expected {expected_type}",
695                        x.get_type()
696                    )))?
697                }
698                _ => {}
699            }
700        }
701
702        Self::new(fields_dict)
703    }
704
705    /// Attempts to copy `self` to a raw byte representation.
706    fn to_bytes(&self) -> Result<Vec<u8>, DmapError>;
707
708    /// Converts the entries of an `IndexMap` into a raw byte representation, including metadata
709    /// about the entries `(DMAP key, name\[, dimensions\])`.
710    ///
711    /// If all is good, returns a tuple containing:
712    /// * the number of scalar fields
713    /// * the number of vector fields
714    /// * the raw bytes
715    fn data_to_bytes(
716        data: &IndexMap<String, DmapField>,
717        fields_for_type: &Fields,
718    ) -> Result<(i32, i32, Vec<u8>), DmapError> {
719        let mut data_bytes: Vec<u8> = vec![];
720        let mut num_scalars: i32 = 0;
721        let mut num_vectors: i32 = 0;
722
723        // let scalar_fields = data.keys().filter(|k| )
724        for (field, _) in fields_for_type.scalars_required.iter() {
725            match data.get(&field.to_string()) {
726                Some(x @ DmapField::Scalar(_)) => {
727                    data_bytes.extend(field.as_bytes());
728                    data_bytes.extend([0]); // null-terminate string
729                    data_bytes.append(&mut x.as_bytes());
730                    num_scalars += 1;
731                }
732                Some(_) => Err(DmapError::InvalidScalar(format!(
733                    "Field {field} is a vector, expected scalar"
734                )))?,
735                None => Err(DmapError::InvalidRecord(format!(
736                    "Field {field} missing from record"
737                )))?,
738            }
739        }
740        for (field, _) in fields_for_type.scalars_optional.iter() {
741            if let Some(x) = data.get(&field.to_string()) {
742                match x {
743                    DmapField::Scalar(_) => {
744                        data_bytes.extend(field.as_bytes());
745                        data_bytes.extend([0]); // null-terminate string
746                        data_bytes.append(&mut x.as_bytes());
747                        num_scalars += 1;
748                    }
749                    DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
750                        "Field {field} is a vector, expected scalar"
751                    )))?,
752                }
753            }
754        }
755        for (field, _) in fields_for_type.vectors_required.iter() {
756            match data.get(&field.to_string()) {
757                Some(x @ DmapField::Vector(_)) => {
758                    data_bytes.extend(field.as_bytes());
759                    data_bytes.extend([0]); // null-terminate string
760                    data_bytes.append(&mut x.as_bytes());
761                    num_vectors += 1;
762                }
763                Some(_) => Err(DmapError::InvalidVector(format!(
764                    "Field {field} is a scalar, expected vector"
765                )))?,
766                None => Err(DmapError::InvalidRecord(format!(
767                    "Field {field} missing from record"
768                )))?,
769            }
770        }
771        for (field, _) in fields_for_type.vectors_optional.iter() {
772            if let Some(x) = data.get(&field.to_string()) {
773                match x {
774                    DmapField::Vector(_) => {
775                        data_bytes.extend(field.as_bytes());
776                        data_bytes.extend([0]); // null-terminate string
777                        data_bytes.append(&mut x.as_bytes());
778                        num_vectors += 1;
779                    }
780                    DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
781                        "Field {field} is a scalar, expected vector"
782                    )))?,
783                }
784            }
785        }
786
787        Ok((num_scalars, num_vectors, data_bytes))
788    }
789
790    /// Converts the entries of a `Record` into a raw byte representation, for debugging the conversion.
791    ///
792    /// If all is good, returns a vector containing tuples of:
793    /// * `String`: the name of the field (`"header"` denoting the record header)
794    /// * `usize`: where the serialized bytes of the field start in the record byte representation
795    /// * `Vec<u8>` the byte representation of the field.
796    fn inspect_bytes(
797        &self,
798        fields_for_type: &Fields,
799    ) -> Result<Vec<(String, usize, Vec<u8>)>, DmapError> {
800        let mut data_bytes: Vec<Vec<u8>> = vec![];
801        let mut indices: Vec<usize> = vec![16]; // start at 16 to account for header
802        let mut fields: Vec<String> = vec![];
803
804        let (mut num_scalars, mut num_vectors) = (0, 0);
805
806        for (field, _) in fields_for_type.scalars_required.iter() {
807            fields.push(field.to_string());
808            match self.get(field) {
809                Some(x @ DmapField::Scalar(_)) => {
810                    let mut bytes = vec![];
811                    bytes.extend(field.as_bytes());
812                    bytes.extend([0]); // null-terminate string
813                    bytes.append(&mut x.as_bytes());
814                    indices.push(indices[indices.len() - 1] + bytes.len());
815                    data_bytes.push(bytes);
816                    num_scalars += 1;
817                }
818                Some(_) => Err(DmapError::InvalidScalar(format!(
819                    "Field {field} is a vector, expected scalar"
820                )))?,
821                None => Err(DmapError::InvalidRecord(format!(
822                    "Field {field} missing from record"
823                )))?,
824            }
825        }
826        for (field, _) in fields_for_type.scalars_optional.iter() {
827            fields.push(field.to_string());
828            if let Some(x) = self.get(field) {
829                match x {
830                    DmapField::Scalar(_) => {
831                        let mut bytes = vec![];
832                        bytes.extend(field.as_bytes());
833                        bytes.extend([0]); // null-terminate string
834                        bytes.append(&mut x.as_bytes());
835                        indices.push(indices[indices.len() - 1] + bytes.len());
836                        data_bytes.push(bytes);
837                        num_scalars += 1;
838                    }
839                    DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
840                        "Field {field} is a vector, expected scalar"
841                    )))?,
842                }
843            }
844        }
845        for (field, _) in fields_for_type.vectors_required.iter() {
846            fields.push(field.to_string());
847            match self.get(field) {
848                Some(x @ DmapField::Vector(_)) => {
849                    let mut bytes = vec![];
850                    bytes.extend(field.as_bytes());
851                    bytes.extend([0]); // null-terminate string
852                    bytes.append(&mut x.as_bytes());
853                    indices.push(indices[indices.len() - 1] + bytes.len());
854                    data_bytes.push(bytes);
855                    num_vectors += 1;
856                }
857                Some(_) => Err(DmapError::InvalidVector(format!(
858                    "Field {field} is a scalar, expected vector"
859                )))?,
860                None => Err(DmapError::InvalidRecord(format!(
861                    "Field {field} missing from record"
862                )))?,
863            }
864        }
865        for (field, _) in fields_for_type.vectors_optional.iter() {
866            fields.push(field.to_string());
867            if let Some(x) = self.get(field) {
868                match x {
869                    DmapField::Vector(_) => {
870                        let mut bytes = vec![];
871                        bytes.extend(field.as_bytes());
872                        bytes.extend([0]); // null-terminate string
873                        bytes.append(&mut x.as_bytes());
874                        indices.push(indices[indices.len() - 1] + data_bytes.len());
875                        data_bytes.push(bytes);
876                        num_vectors += 1;
877                    }
878                    DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
879                        "Field {field} is a scalar, expected vector"
880                    )))?,
881                }
882            }
883        }
884
885        // Now build up the header
886        let num_bytes: usize = data_bytes.iter().map(|x| x.len()).sum();
887        let mut bytes: Vec<u8> = vec![];
888        bytes.extend((65537_i32).as_bytes()); // No idea why this is what it is, copied from backscatter
889        bytes.extend((num_bytes as i32 + 16).as_bytes()); // +16 for code, length, num_scalars, num_vectors
890        bytes.extend(num_scalars.as_bytes());
891        bytes.extend(num_vectors.as_bytes());
892
893        // Accumulate all the results into one big `Vec`
894        let mut field_info: Vec<(String, usize, Vec<u8>)> = vec![("header".to_string(), 0, bytes)];
895        for (f, (s, b)) in izip!(
896            fields.into_iter(),
897            izip!(indices[..indices.len() - 1].iter(), data_bytes.into_iter())
898        ) {
899            field_info.push((f, *s, b));
900        }
901
902        Ok(field_info)
903    }
904
905    /// Creates the byte represenation of a collection of [`Record`]s.
906    ///
907    /// Ordering of the members is preserved.
908    fn into_bytes(recs: &Vec<Self>) -> Result<Vec<u8>, DmapError> {
909        let mut bytes: Vec<u8> = vec![];
910        let (errors, rec_bytes): (Vec<_>, Vec<_>) =
911            recs.par_iter()
912                .enumerate()
913                .partition_map(|(i, rec)| match rec.to_bytes() {
914                    Err(e) => Either::Left((i, e)),
915                    Ok(y) => Either::Right(y),
916                });
917        if !errors.is_empty() {
918            Err(DmapError::InvalidRecord(format!(
919                "Corrupted records: {errors:?}"
920            )))?
921        }
922        bytes.par_extend(rec_bytes.into_par_iter().flatten());
923        Ok(bytes)
924    }
925
926    /// Attempts to convert `recs` to `Self` then convert to bytes.
927    fn try_into_bytes(recs: Vec<IndexMap<String, DmapField>>) -> Result<Vec<u8>, DmapError> {
928        let mut bytes: Vec<u8> = vec![];
929        let (errors, rec_bytes): (Vec<_>, Vec<_>) =
930            recs.into_par_iter()
931                .enumerate()
932                .partition_map(|(i, rec)| match Self::try_from(rec) {
933                    Err(e) => Either::Left((i, e)),
934                    Ok(x) => match x.to_bytes() {
935                        Err(e) => Either::Left((i, e)),
936                        Ok(y) => Either::Right(y),
937                    },
938                });
939        if !errors.is_empty() {
940            Err(DmapError::BadRecords(
941                errors.iter().map(|(i, _)| *i).collect(),
942                errors[0].1.to_string(),
943            ))?
944        }
945        bytes.par_extend(rec_bytes.into_par_iter().flatten());
946        Ok(bytes)
947    }
948
949    /// Writes a collection of `Record`s to `outfile`.
950    ///
951    /// Prefer using the specific functions, e.g. `write_dmap`, `write_rawacf`, etc. for their
952    /// specific field checks.
953    fn write_to_file<P: AsRef<Path>>(recs: &Vec<Self>, outfile: P) -> Result<(), DmapError> {
954        let bytes: Vec<u8> = Self::into_bytes(recs)?;
955        io::bytes_to_file(bytes, outfile)?;
956        Ok(())
957    }
958}
959
960macro_rules! create_record_type {
961    ($format:ident, $fields:ident) => {
962        paste::paste! {
963            use crate::types::{DmapType, DmapField};
964            use crate::error::DmapError;
965            use indexmap::IndexMap;
966            use crate::record::Record;
967
968            #[doc = "Struct containing the checked fields of a single `" $format:upper "` record." ]
969            #[derive(Debug, PartialEq, Clone)]
970            pub struct [< $format:camel Record >] {
971                pub data: IndexMap<String, DmapField>,
972            }
973
974            impl Record<'_> for [< $format:camel Record>] {
975                fn inner(self) -> IndexMap<String, DmapField> {
976                    self.data
977                }
978                fn get(&self, key: &str) -> Option<&DmapField> {
979                    self.data.get(key)
980                }
981                fn keys(&self) -> Vec<&String> {
982                    self.data.keys().collect()
983                }
984                fn new(fields: &mut IndexMap<String, DmapField>) -> Result<[< $format:camel Record>], DmapError> {
985                    match Self::check_fields(fields, &$fields) {
986                        Ok(_) => {}
987                        Err(e) => Err(e)?,
988                    }
989
990                    Ok([< $format:camel Record >] {
991                        data: fields.to_owned(),
992                    })
993                }
994                fn to_bytes(&self) -> Result<Vec<u8>, DmapError> {
995                    let (num_scalars, num_vectors, mut data_bytes) =
996                        Self::data_to_bytes(&self.data, &$fields)?;
997
998                    let mut bytes: Vec<u8> = vec![];
999                    bytes.extend((65537_i32).as_bytes()); // No idea why this is what it is, copied from backscatter
1000                    bytes.extend((data_bytes.len() as i32 + 16).as_bytes()); // +16 for code, length, num_scalars, num_vectors
1001                    bytes.extend(num_scalars.as_bytes());
1002                    bytes.extend(num_vectors.as_bytes());
1003                    bytes.append(&mut data_bytes); // consumes data_bytes
1004                    Ok(bytes)
1005                }
1006                fn is_metadata_field(name: &str) -> bool {
1007                    !$fields.data_fields.iter().any(|e| e == &name)
1008                }
1009            }
1010
1011            impl TryFrom<&mut IndexMap<String, DmapField>> for [< $format:camel Record >] {
1012                type Error = DmapError;
1013
1014                fn try_from(value: &mut IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
1015                    Self::coerce(value, &$fields)
1016                }
1017            }
1018
1019            impl TryFrom<IndexMap<String, DmapField>> for [< $format:camel Record >] {
1020                type Error = DmapError;
1021
1022                fn try_from(mut value: IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
1023                    Self::coerce(&mut value, &$fields)
1024                }
1025            }
1026
1027            #[cfg(test)]
1028            mod tests {
1029                use super::*;
1030                use std::path::PathBuf;
1031
1032                /// Creates a test to ensure that the record is still able to be read, even when missing
1033                /// some of the optional fields.
1034                #[test]
1035                fn test_missing_optional_fields() -> Result<(), DmapError> {
1036                    let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
1037                    let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
1038                    let recs = data.inner();
1039
1040                    for field in $fields.scalars_optional.iter().chain($fields.vectors_optional.iter()) {
1041                        let mut cloned_rec = recs.clone();
1042                        let _ = cloned_rec.shift_remove(field.0);
1043                        let _ = [< $format:camel Record >]::try_from(&mut cloned_rec)?;
1044                    }
1045                    Ok(())
1046                }
1047
1048                /// Creates a test to ensure that the record is not able to be read when missing
1049                /// some of the required fields.
1050                #[test]
1051                fn test_missing_required_fields() -> Result<(), DmapError> {
1052                    let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
1053                    let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
1054                    let recs = data.inner();
1055
1056                    for field in $fields.scalars_required.iter().chain($fields.vectors_required.iter()) {
1057                        let mut cloned_rec = recs.clone();
1058                        let _ = cloned_rec.shift_remove(field.0);
1059                        let res = [< $format:camel Record >]::try_from(&mut cloned_rec);
1060                        assert!(res.is_err());
1061                    }
1062                    Ok(())
1063                }
1064            }
1065        }
1066    }
1067}
1068
1069pub(crate) use create_record_type;