1use crate::compression::detect_bz2;
4use crate::error::DmapError;
5use crate::io;
6use crate::types::{parse_scalar, parse_vector, read_data, DmapField, DmapType, DmapVec, Fields};
7use bzip2::read::BzDecoder;
8use indexmap::IndexMap;
9use itertools::izip;
10use rayon::iter::Either;
11use rayon::prelude::*;
12use std::fmt::Debug;
13use std::fs::File;
14use std::io::{Cursor, Read};
15use std::path::Path;
16
17pub trait Record<'a>:
22 Debug + Send + Sync + TryFrom<IndexMap<String, DmapField>, Error = DmapError>
23{
24 fn new(fields: &mut IndexMap<String, DmapField>) -> Result<Self, DmapError>
26 where
27 Self: Sized;
28
29 fn inner(self) -> IndexMap<String, DmapField>;
31
32 fn get(&self, key: &str) -> Option<&DmapField>;
34
35 fn keys(&self) -> Vec<&String>;
37
38 fn read_first_record(mut dmap_data: impl Read) -> Result<Self, DmapError>
42 where
43 Self: Sized,
44 Self: Send,
45 {
46 let mut stream: Box<dyn Read>;
47 let (is_bz2, chunk) = detect_bz2(&mut dmap_data)?;
48 if is_bz2 {
49 stream = Box::new(BzDecoder::new(chunk));
50 } else {
51 stream = Box::new(chunk);
52 }
53
54 let mut buffer = [0; 8]; stream
56 .read_exact(&mut buffer)
57 .map_err(|_| DmapError::CorruptStream("Unable to read size of first record"))?;
58
59 let rec_size = i32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize; if rec_size == 0 {
61 return Err(DmapError::InvalidRecord(format!(
62 "Record 0 starting at byte 0 has non-positive size {} <= 0",
63 rec_size
64 )));
65 }
66
67 let mut rec = vec![0; rec_size];
68 rec[0..8].clone_from_slice(&buffer[..]);
69 stream.read_exact(&mut rec[8..])?;
70 let first_rec = Self::parse_record(&mut Cursor::new(rec))?;
71
72 Ok(first_rec)
73 }
74
75 fn read_records(mut dmap_data: impl Read) -> Result<Vec<Self>, DmapError>
79 where
80 Self: Sized,
81 Self: Send,
82 {
83 let mut buffer: Vec<u8> = vec![];
84 let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
85 if is_bz2 {
86 let mut stream = BzDecoder::new(chunk);
87 stream.read_to_end(&mut buffer)?;
88 } else {
89 chunk.read_to_end(&mut buffer)?;
90 }
91
92 let mut slices: Vec<_> = vec![];
93 let mut rec_start: usize = 0;
94 let mut rec_size: usize;
95 let mut rec_end: usize;
96
97 while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
98 rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
99 as usize; rec_end = rec_start + rec_size; if rec_end > buffer.len() {
102 return Err(DmapError::InvalidRecord(format!("Record {} starting at byte {} has size greater than remaining length of buffer ({} > {})", slices.len(), rec_start, rec_size, buffer.len() - rec_start)));
103 } else if rec_size == 0 {
104 return Err(DmapError::InvalidRecord(format!(
105 "Record {} starting at byte {} has non-positive size {} <= 0",
106 slices.len(),
107 rec_start,
108 rec_size
109 )));
110 }
111 slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
112 rec_start = rec_end;
113 }
114 if rec_start != buffer.len() {
115 return Err(DmapError::InvalidRecord(format!(
116 "Record {} starting at byte {} incomplete; has size of {} bytes",
117 slices.len() + 1,
118 rec_start,
119 buffer.len() - rec_start
120 )));
121 }
122 let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
123 dmap_results.par_extend(
124 slices
125 .par_iter_mut()
126 .map(|cursor| Self::parse_record(cursor)),
127 );
128
129 let mut dmap_records: Vec<Self> = vec![];
130 let mut bad_recs: Vec<usize> = vec![];
131 let mut dmap_errors: Vec<DmapError> = vec![];
132 for (i, rec) in dmap_results.into_iter().enumerate() {
133 match rec {
134 Ok(x) => dmap_records.push(x),
135 Err(e) => {
136 dmap_errors.push(e);
137 bad_recs.push(i);
138 }
139 }
140 }
141 if !dmap_errors.is_empty() {
142 return Err(DmapError::BadRecords(bad_recs, dmap_errors[0].to_string()));
143 }
144 Ok(dmap_records)
145 }
146
147 fn read_records_lax(mut dmap_data: impl Read) -> Result<(Vec<Self>, Option<usize>), DmapError>
152 where
153 Self: Sized,
154 Self: Send,
155 {
156 let mut buffer: Vec<u8> = vec![];
157 let (is_bz2, mut chunk) = detect_bz2(&mut dmap_data)?;
158 if is_bz2 {
159 let mut stream = BzDecoder::new(chunk);
160 stream.read_to_end(&mut buffer)?;
161 } else {
162 chunk.read_to_end(&mut buffer)?;
163 }
164
165 let mut dmap_records: Vec<Self> = vec![];
166 let mut bad_byte: Option<usize> = None;
167
168 let mut slices: Vec<_> = vec![];
169 let mut rec_start: usize = 0;
170 let mut rec_size: usize;
171 let mut rec_end: usize;
172
173 let mut rec_starts = vec![];
174 while ((rec_start + 2 * i32::size()) as u64) < buffer.len() as u64 {
175 rec_size = i32::from_le_bytes(buffer[rec_start + 4..rec_start + 8].try_into().unwrap())
176 as usize; rec_end = rec_start + rec_size; if rec_end > buffer.len() || rec_size == 0 {
179 bad_byte = Some(rec_start);
180 break;
181 } else {
183 rec_starts.push(rec_start);
184 slices.push(Cursor::new(buffer[rec_start..rec_end].to_vec()));
185 rec_start = rec_end;
186 }
187 }
188 if rec_start != buffer.len() {
189 bad_byte = Some(rec_start);
190 }
191 let mut dmap_results: Vec<Result<Self, DmapError>> = vec![];
192 dmap_results.par_extend(
193 slices
194 .par_iter_mut()
195 .map(|cursor| Self::parse_record(cursor)),
196 );
197
198 for (i, rec) in dmap_results.into_iter().enumerate() {
199 if let Ok(x) = rec {
200 dmap_records.push(x);
201 } else {
202 bad_byte = Some(rec_starts[i]);
203 break;
204 }
205 }
206 Ok((dmap_records, bad_byte))
207 }
208
209 fn read_file<P: AsRef<Path>>(infile: P) -> Result<Vec<Self>, DmapError>
211 where
212 Self: Sized,
213 Self: Send,
214 {
215 let file = File::open(infile)?;
216 Self::read_records(file)
217 }
218
219 fn read_file_lax<P: AsRef<Path>>(infile: P) -> Result<(Vec<Self>, Option<usize>), DmapError>
224 where
225 Self: Sized,
226 Self: Send,
227 {
228 let file = File::open(infile)?;
229 Self::read_records_lax(file)
230 }
231
232 fn sniff_file<P: AsRef<Path>>(infile: P) -> Result<Self, DmapError>
234 where
235 Self: Sized,
236 Self: Send,
237 {
238 let file = File::open(infile)?;
239 Self::read_first_record(file)
240 }
241
242 fn parse_record(cursor: &mut Cursor<Vec<u8>>) -> Result<Self, DmapError>
244 where
245 Self: Sized,
246 {
247 let bytes_already_read = cursor.position();
248 let _code = read_data::<i32>(cursor).map_err(|e| {
249 DmapError::InvalidRecord(format!(
250 "Cannot interpret code at byte {}: {e}",
251 bytes_already_read
252 ))
253 })?;
254 let size = read_data::<i32>(cursor).map_err(|e| {
255 DmapError::InvalidRecord(format!(
256 "Cannot interpret size at byte {}: {e}",
257 bytes_already_read + i32::size() as u64
258 ))
259 })?;
260
261 if size as u64 > cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
263 {
264 return Err(DmapError::InvalidRecord(format!(
265 "Record size {size} at byte {} bigger than remaining buffer {}",
266 cursor.position() - i32::size() as u64,
267 cursor.get_ref().len() as u64 - cursor.position() + 2 * i32::size() as u64
268 )));
269 } else if size <= 0 {
270 return Err(DmapError::InvalidRecord(format!("Record size {size} <= 0")));
271 }
272
273 let num_scalars = read_data::<i32>(cursor).map_err(|e| {
274 DmapError::InvalidRecord(format!(
275 "Cannot interpret number of scalars at byte {}: {e}",
276 cursor.position() - i32::size() as u64
277 ))
278 })?;
279 let num_vectors = read_data::<i32>(cursor).map_err(|e| {
280 DmapError::InvalidRecord(format!(
281 "Cannot interpret number of vectors at byte {}: {e}",
282 cursor.position() - i32::size() as u64
283 ))
284 })?;
285 if num_scalars <= 0 {
286 return Err(DmapError::InvalidRecord(format!(
287 "Number of scalars {num_scalars} at byte {} <= 0",
288 cursor.position() - 2 * i32::size() as u64
289 )));
290 } else if num_vectors <= 0 {
291 return Err(DmapError::InvalidRecord(format!(
292 "Number of vectors {num_vectors} at byte {} <= 0",
293 cursor.position() - i32::size() as u64
294 )));
295 } else if num_scalars + num_vectors > size {
296 return Err(DmapError::InvalidRecord(format!(
297 "Number of scalars {num_scalars} plus vectors {num_vectors} greater than size '{size}'")));
298 }
299
300 let mut fields: IndexMap<String, DmapField> = IndexMap::new();
301 for _ in 0..num_scalars {
302 let (name, val) = parse_scalar(cursor)?;
303 fields.insert(name, val);
304 }
305 for _ in 0..num_vectors {
306 let (name, val) = parse_vector(cursor, size)?;
307 fields.insert(name, val);
308 }
309
310 if cursor.position() - bytes_already_read != size as u64 {
311 return Err(DmapError::InvalidRecord(format!(
312 "Bytes read {} does not match the records size field {}",
313 cursor.position() - bytes_already_read,
314 size
315 )));
316 }
317
318 Self::new(&mut fields)
319 }
320
321 fn check_fields(
328 field_dict: &mut IndexMap<String, DmapField>,
329 fields_for_type: &Fields,
330 ) -> Result<(), DmapError> {
331 let unsupported_keys: Vec<&String> = field_dict
332 .keys()
333 .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
334 .collect();
335 if !unsupported_keys.is_empty() {
336 Err(DmapError::InvalidRecord(format!(
337 "Unsupported fields {:?}, fields supported are {:?}",
338 unsupported_keys, fields_for_type.all_fields
339 )))?
340 }
341
342 for (field, expected_type) in fields_for_type.scalars_required.iter() {
343 match field_dict.get(&field.to_string()) {
344 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
345 Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
346 "Field {} has incorrect type {}, expected {}",
347 field,
348 x.get_type(),
349 expected_type
350 )))?,
351 Some(_) => Err(DmapError::InvalidRecord(format!(
352 "Field {} is a vector, expected scalar",
353 field
354 )))?,
355 None => Err(DmapError::InvalidRecord(format!(
356 "Field {field:?} ({:?}) missing: fields {:?}",
357 &field.to_string(),
358 field_dict.keys()
359 )))?,
360 }
361 }
362 for (field, expected_type) in fields_for_type.scalars_optional.iter() {
363 match field_dict.get(&field.to_string()) {
364 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
365 Some(DmapField::Scalar(x)) => Err(DmapError::InvalidRecord(format!(
366 "Field {} has incorrect type {}, expected {}",
367 field,
368 x.get_type(),
369 expected_type
370 )))?,
371 Some(_) => Err(DmapError::InvalidRecord(format!(
372 "Field {} is a vector, expected scalar",
373 field
374 )))?,
375 None => {}
376 }
377 }
378 for (field, expected_type) in fields_for_type.vectors_required.iter() {
379 match field_dict.get(&field.to_string()) {
380 Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
381 "Field {} is a scalar, expected vector",
382 field
383 )))?,
384 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
385 Err(DmapError::InvalidRecord(format!(
386 "Field {field} has incorrect type {:?}, expected {expected_type:?}",
387 x.get_type()
388 )))?
389 }
390 Some(&DmapField::Vector(_)) => {}
391 None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
392 }
393 }
394 for (field, expected_type) in fields_for_type.vectors_optional.iter() {
395 match field_dict.get(&field.to_string()) {
396 Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
397 "Field {} is a scalar, expected vector",
398 field
399 )))?,
400 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
401 Err(DmapError::InvalidRecord(format!(
402 "Field {field} has incorrect type {}, expected {expected_type}",
403 x.get_type()
404 )))?
405 }
406 _ => {}
407 }
408 }
409 for vec_group in fields_for_type.vector_dim_groups.iter() {
411 let vecs: Vec<(&str, &DmapVec)> = vec_group
412 .iter()
413 .filter_map(|&name| match field_dict.get(&name.to_string()) {
414 Some(DmapField::Vector(ref x)) => Some((name, x)),
415 Some(_) => None,
416 None => None,
417 })
418 .collect();
419 if vecs.len() > 1 {
420 let mut vec_iter = vecs.iter();
421 let first = vec_iter.next().expect("Iterator broken");
422 if !vec_iter.all(|(_, v)| v.shape() == first.1.shape()) {
423 let error_vec: Vec<(&str, &[usize])> =
424 vecs.iter().map(|(k, v)| (*k, v.shape())).collect();
425 Err(DmapError::InvalidRecord(format!(
426 "Vector fields have inconsistent dimensions: {:?}",
427 error_vec
428 )))?
429 }
430 }
431 }
432 Ok(())
433 }
434
435 fn coerce(
437 fields_dict: &mut IndexMap<String, DmapField>,
438 fields_for_type: &Fields,
439 ) -> Result<Self, DmapError> {
440 let unsupported_keys: Vec<&String> = fields_dict
441 .keys()
442 .filter(|&k| !fields_for_type.all_fields.contains(&&**k))
443 .collect();
444 if !unsupported_keys.is_empty() {
445 Err(DmapError::InvalidRecord(format!(
446 "Unsupported fields {:?}, fields supported are {:?}",
447 unsupported_keys, fields_for_type.all_fields
448 )))?
449 }
450
451 for (field, expected_type) in fields_for_type.scalars_required.iter() {
452 match fields_dict.get(&field.to_string()) {
453 Some(DmapField::Scalar(x)) if &x.get_type() != expected_type => {
454 fields_dict.insert(
455 field.to_string(),
456 DmapField::Scalar(x.cast_as(expected_type)?),
457 );
458 }
459 Some(DmapField::Scalar(_)) => {}
460 Some(_) => Err(DmapError::InvalidRecord(format!(
461 "Field {} is a vector, expected scalar",
462 field
463 )))?,
464 None => Err(DmapError::InvalidRecord(format!(
465 "Field {field:?} ({:?}) missing: fields {:?}",
466 &field.to_string(),
467 fields_dict.keys()
468 )))?,
469 }
470 }
471 for (field, expected_type) in fields_for_type.scalars_optional.iter() {
472 match fields_dict.get(&field.to_string()) {
473 Some(DmapField::Scalar(x)) if &x.get_type() == expected_type => {}
474 Some(DmapField::Scalar(x)) => {
475 fields_dict.insert(
476 field.to_string(),
477 DmapField::Scalar(x.cast_as(expected_type)?),
478 );
479 }
480 Some(_) => Err(DmapError::InvalidRecord(format!(
481 "Field {} is a vector, expected scalar",
482 field
483 )))?,
484 None => {}
485 }
486 }
487 for (field, expected_type) in fields_for_type.vectors_required.iter() {
488 match fields_dict.get(&field.to_string()) {
489 Some(DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
490 "Field {} is a scalar, expected vector",
491 field
492 )))?,
493 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
494 Err(DmapError::InvalidRecord(format!(
495 "Field {field} has incorrect type {:?}, expected {expected_type:?}",
496 x.get_type()
497 )))?
498 }
499 Some(DmapField::Vector(_)) => {}
500 None => Err(DmapError::InvalidRecord(format!("Field {field} missing")))?,
501 }
502 }
503 for (field, expected_type) in fields_for_type.vectors_optional.iter() {
504 match fields_dict.get(&field.to_string()) {
505 Some(&DmapField::Scalar(_)) => Err(DmapError::InvalidRecord(format!(
506 "Field {} is a scalar, expected vector",
507 field
508 )))?,
509 Some(DmapField::Vector(x)) if &x.get_type() != expected_type => {
510 Err(DmapError::InvalidRecord(format!(
511 "Field {field} has incorrect type {}, expected {expected_type}",
512 x.get_type()
513 )))?
514 }
515 _ => {}
516 }
517 }
518
519 Self::new(fields_dict)
520 }
521
522 fn to_bytes(&self) -> Result<Vec<u8>, DmapError>;
524
525 fn data_to_bytes(
533 data: &IndexMap<String, DmapField>,
534 fields_for_type: &Fields,
535 ) -> Result<(i32, i32, Vec<u8>), DmapError> {
536 let mut data_bytes: Vec<u8> = vec![];
537 let mut num_scalars: i32 = 0;
538 let mut num_vectors: i32 = 0;
539
540 for (field, _) in fields_for_type.scalars_required.iter() {
542 match data.get(&field.to_string()) {
543 Some(x @ DmapField::Scalar(_)) => {
544 data_bytes.extend(field.as_bytes());
545 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
547 num_scalars += 1;
548 }
549 Some(_) => Err(DmapError::InvalidScalar(format!(
550 "Field {field} is a vector, expected scalar"
551 )))?,
552 None => Err(DmapError::InvalidRecord(format!(
553 "Field {field} missing from record"
554 )))?,
555 }
556 }
557 for (field, _) in fields_for_type.scalars_optional.iter() {
558 if let Some(x) = data.get(&field.to_string()) {
559 match x {
560 DmapField::Scalar(_) => {
561 data_bytes.extend(field.as_bytes());
562 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
564 num_scalars += 1;
565 }
566 DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
567 "Field {field} is a vector, expected scalar"
568 )))?,
569 }
570 }
571 }
572 for (field, _) in fields_for_type.vectors_required.iter() {
573 match data.get(&field.to_string()) {
574 Some(x @ DmapField::Vector(_)) => {
575 data_bytes.extend(field.as_bytes());
576 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
578 num_vectors += 1;
579 }
580 Some(_) => Err(DmapError::InvalidVector(format!(
581 "Field {field} is a scalar, expected vector"
582 )))?,
583 None => Err(DmapError::InvalidRecord(format!(
584 "Field {field} missing from record"
585 )))?,
586 }
587 }
588 for (field, _) in fields_for_type.vectors_optional.iter() {
589 if let Some(x) = data.get(&field.to_string()) {
590 match x {
591 DmapField::Vector(_) => {
592 data_bytes.extend(field.as_bytes());
593 data_bytes.extend([0]); data_bytes.append(&mut x.as_bytes());
595 num_vectors += 1;
596 }
597 DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
598 "Field {field} is a scalar, expected vector"
599 )))?,
600 }
601 }
602 }
603
604 Ok((num_scalars, num_vectors, data_bytes))
605 }
606
607 fn inspect_bytes(
614 &self,
615 fields_for_type: &Fields,
616 ) -> Result<Vec<(String, usize, Vec<u8>)>, DmapError> {
617 let mut data_bytes: Vec<Vec<u8>> = vec![];
618 let mut indices: Vec<usize> = vec![16]; let mut fields: Vec<String> = vec![];
620
621 let (mut num_scalars, mut num_vectors) = (0, 0);
622
623 for (field, _) in fields_for_type.scalars_required.iter() {
624 fields.push(field.to_string());
625 match self.get(field) {
626 Some(x @ DmapField::Scalar(_)) => {
627 let mut bytes = vec![];
628 bytes.extend(field.as_bytes());
629 bytes.extend([0]); bytes.append(&mut x.as_bytes());
631 indices.push(indices[indices.len() - 1] + bytes.len());
632 data_bytes.push(bytes);
633 num_scalars += 1;
634 }
635 Some(_) => Err(DmapError::InvalidScalar(format!(
636 "Field {field} is a vector, expected scalar"
637 )))?,
638 None => Err(DmapError::InvalidRecord(format!(
639 "Field {field} missing from record"
640 )))?,
641 }
642 }
643 for (field, _) in fields_for_type.scalars_optional.iter() {
644 fields.push(field.to_string());
645 if let Some(x) = self.get(field) {
646 match x {
647 DmapField::Scalar(_) => {
648 let mut bytes = vec![];
649 bytes.extend(field.as_bytes());
650 bytes.extend([0]); bytes.append(&mut x.as_bytes());
652 indices.push(indices[indices.len() - 1] + bytes.len());
653 data_bytes.push(bytes);
654 num_scalars += 1;
655 }
656 DmapField::Vector(_) => Err(DmapError::InvalidScalar(format!(
657 "Field {field} is a vector, expected scalar"
658 )))?,
659 }
660 }
661 }
662 for (field, _) in fields_for_type.vectors_required.iter() {
663 fields.push(field.to_string());
664 match self.get(field) {
665 Some(x @ DmapField::Vector(_)) => {
666 let mut bytes = vec![];
667 bytes.extend(field.as_bytes());
668 bytes.extend([0]); bytes.append(&mut x.as_bytes());
670 indices.push(indices[indices.len() - 1] + bytes.len());
671 data_bytes.push(bytes);
672 num_vectors += 1;
673 }
674 Some(_) => Err(DmapError::InvalidVector(format!(
675 "Field {field} is a scalar, expected vector"
676 )))?,
677 None => Err(DmapError::InvalidRecord(format!(
678 "Field {field} missing from record"
679 )))?,
680 }
681 }
682 for (field, _) in fields_for_type.vectors_optional.iter() {
683 fields.push(field.to_string());
684 if let Some(x) = self.get(field) {
685 match x {
686 DmapField::Vector(_) => {
687 let mut bytes = vec![];
688 bytes.extend(field.as_bytes());
689 bytes.extend([0]); bytes.append(&mut x.as_bytes());
691 indices.push(indices[indices.len() - 1] + data_bytes.len());
692 data_bytes.push(bytes);
693 num_vectors += 1;
694 }
695 DmapField::Scalar(_) => Err(DmapError::InvalidVector(format!(
696 "Field {field} is a scalar, expected vector"
697 )))?,
698 }
699 }
700 }
701
702 let num_bytes: usize = data_bytes.iter().map(|x| x.len()).sum();
704 let mut bytes: Vec<u8> = vec![];
705 bytes.extend((65537_i32).as_bytes()); bytes.extend((num_bytes as i32 + 16).as_bytes()); bytes.extend(num_scalars.as_bytes());
708 bytes.extend(num_vectors.as_bytes());
709
710 let mut field_info: Vec<(String, usize, Vec<u8>)> = vec![("header".to_string(), 0, bytes)];
712 for (f, (s, b)) in izip!(
713 fields.into_iter(),
714 izip!(indices[..indices.len() - 1].iter(), data_bytes.into_iter())
715 ) {
716 field_info.push((f, *s, b));
717 }
718
719 Ok(field_info)
720 }
721
722 fn into_bytes(recs: &Vec<Self>) -> Result<Vec<u8>, DmapError> {
726 let mut bytes: Vec<u8> = vec![];
727 let (errors, rec_bytes): (Vec<_>, Vec<_>) =
728 recs.par_iter()
729 .enumerate()
730 .partition_map(|(i, rec)| match rec.to_bytes() {
731 Err(e) => Either::Left((i, e)),
732 Ok(y) => Either::Right(y),
733 });
734 if !errors.is_empty() {
735 Err(DmapError::InvalidRecord(format!(
736 "Corrupted records: {errors:?}"
737 )))?
738 }
739 bytes.par_extend(rec_bytes.into_par_iter().flatten());
740 Ok(bytes)
741 }
742
743 fn try_into_bytes(recs: Vec<IndexMap<String, DmapField>>) -> Result<Vec<u8>, DmapError> {
745 let mut bytes: Vec<u8> = vec![];
746 let (errors, rec_bytes): (Vec<_>, Vec<_>) =
747 recs.into_par_iter()
748 .enumerate()
749 .partition_map(|(i, rec)| match Self::try_from(rec) {
750 Err(e) => Either::Left((i, e)),
751 Ok(x) => match x.to_bytes() {
752 Err(e) => Either::Left((i, e)),
753 Ok(y) => Either::Right(y),
754 },
755 });
756 if !errors.is_empty() {
757 Err(DmapError::BadRecords(
758 errors.iter().map(|(i, _)| *i).collect(),
759 errors[0].1.to_string(),
760 ))?
761 }
762 bytes.par_extend(rec_bytes.into_par_iter().flatten());
763 Ok(bytes)
764 }
765
766 fn write_to_file<P: AsRef<Path>>(recs: &Vec<Self>, outfile: P) -> Result<(), DmapError> {
771 let bytes: Vec<u8> = Self::into_bytes(recs)?;
772 io::bytes_to_file(bytes, outfile)?;
773 Ok(())
774 }
775}
776
777macro_rules! create_record_type {
778 ($format:ident, $fields:ident) => {
779 paste::paste! {
780 use crate::types::{DmapType, DmapField};
781 use crate::error::DmapError;
782 use indexmap::IndexMap;
783 use crate::record::Record;
784
785 #[doc = "Struct containing the checked fields of a single `" $format:upper "` record." ]
786 #[derive(Debug, PartialEq, Clone)]
787 pub struct [< $format:camel Record >] {
788 pub data: IndexMap<String, DmapField>,
789 }
790
791 impl Record<'_> for [< $format:camel Record>] {
792 fn inner(self) -> IndexMap<String, DmapField> {
793 self.data
794 }
795 fn get(&self, key: &str) -> Option<&DmapField> {
796 self.data.get(key)
797 }
798 fn keys(&self) -> Vec<&String> {
799 self.data.keys().collect()
800 }
801 fn new(fields: &mut IndexMap<String, DmapField>) -> Result<[< $format:camel Record>], DmapError> {
802 match Self::check_fields(fields, &$fields) {
803 Ok(_) => {}
804 Err(e) => Err(e)?,
805 }
806
807 Ok([< $format:camel Record >] {
808 data: fields.to_owned(),
809 })
810 }
811 fn to_bytes(&self) -> Result<Vec<u8>, DmapError> {
812 let (num_scalars, num_vectors, mut data_bytes) =
813 Self::data_to_bytes(&self.data, &$fields)?;
814
815 let mut bytes: Vec<u8> = vec![];
816 bytes.extend((65537_i32).as_bytes()); bytes.extend((data_bytes.len() as i32 + 16).as_bytes()); bytes.extend(num_scalars.as_bytes());
819 bytes.extend(num_vectors.as_bytes());
820 bytes.append(&mut data_bytes); Ok(bytes)
822 }
823 }
824
825 impl TryFrom<&mut IndexMap<String, DmapField>> for [< $format:camel Record >] {
826 type Error = DmapError;
827
828 fn try_from(value: &mut IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
829 Self::coerce(value, &$fields)
830 }
831 }
832
833 impl TryFrom<IndexMap<String, DmapField>> for [< $format:camel Record >] {
834 type Error = DmapError;
835
836 fn try_from(mut value: IndexMap<String, DmapField>) -> Result<Self, Self::Error> {
837 Self::coerce(&mut value, &$fields)
838 }
839 }
840
841 #[cfg(test)]
842 mod tests {
843 use super::*;
844 use std::path::PathBuf;
845
846 #[test]
849 fn test_missing_optional_fields() -> Result<(), DmapError> {
850 let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
851 let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
852 let recs = data.inner();
853
854 for field in $fields.scalars_optional.iter().chain($fields.vectors_optional.iter()) {
855 let mut cloned_rec = recs.clone();
856 let _ = cloned_rec.shift_remove(field.0);
857 let _ = [< $format:camel Record >]::try_from(&mut cloned_rec)?;
858 }
859 Ok(())
860 }
861
862 #[test]
865 fn test_missing_required_fields() -> Result<(), DmapError> {
866 let filename: PathBuf = PathBuf::from(format!("tests/test_files/test.{}", stringify!($format)));
867 let data = [< $format:camel Record >]::sniff_file(&filename).expect("Unable to sniff file");
868 let recs = data.inner();
869
870 for field in $fields.scalars_required.iter().chain($fields.vectors_required.iter()) {
871 let mut cloned_rec = recs.clone();
872 let _ = cloned_rec.shift_remove(field.0);
873 let res = [< $format:camel Record >]::try_from(&mut cloned_rec);
874 assert!(res.is_err());
875 }
876 Ok(())
877 }
878 }
879 }
880 }
881}
882
883pub(crate) use create_record_type;