use crate::error::{MarcError, Result};
use crate::reader::MarcReader;
use crate::record::Record;
use std::io::Cursor;
pub fn parse_batch_parallel(
record_boundaries: &[(usize, usize)],
buffer: &[u8],
) -> Result<Vec<Record>> {
use rayon::prelude::*;
for (offset, length) in record_boundaries {
if offset + length > buffer.len() {
return Err(MarcError::InvalidRecord(format!(
"Record boundary ({}, {}) exceeds buffer size {}",
offset,
length,
buffer.len()
)));
}
}
record_boundaries
.par_iter()
.enumerate()
.map(|(idx, (offset, length))| {
let record_bytes = &buffer[*offset..offset + length];
let cursor = Cursor::new(record_bytes);
let mut reader = MarcReader::new(cursor);
reader.read_record().and_then(|opt| {
opt.ok_or_else(|| {
MarcError::InvalidRecord(format!(
"Record {idx} at offset {offset} parsed as empty"
))
})
})
})
.collect::<Result<Vec<Record>>>()
}
pub fn parse_batch_parallel_limited(
record_boundaries: &[(usize, usize)],
buffer: &[u8],
limit: usize,
) -> Result<Vec<Record>> {
let limited: Vec<_> = record_boundaries.iter().take(limit).copied().collect();
parse_batch_parallel(&limited, buffer)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[cfg_attr(miri, ignore)]
fn test_parse_batch_parallel_single_record() {
let mut record_data = vec![0u8; 24]; record_data[0] = b'0';
record_data[1] = b'0';
record_data[5] = b'a'; record_data[6] = b'c'; record_data.push(0x1D);
let boundaries = vec![(0, record_data.len())];
let result = parse_batch_parallel(&boundaries, &record_data);
assert!(result.is_ok() || result.is_err()); }
#[test]
#[cfg_attr(miri, ignore)]
fn test_parse_batch_parallel_empty_boundaries() {
let buffer = vec![1, 2, 3];
let boundaries = vec![];
let result = parse_batch_parallel(&boundaries, &buffer);
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_parse_batch_parallel_boundary_out_of_bounds() {
let buffer = vec![1, 2, 3];
let boundaries = vec![(0, 10)];
let result = parse_batch_parallel(&boundaries, &buffer);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("exceed") || err_msg.contains("bound"));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_parse_batch_parallel_limited() {
let mut record_data = vec![];
for i in 0..5u8 {
record_data.push(i);
record_data.push(0x1D);
}
let boundaries: Vec<_> = (0..5).map(|i| (i * 2, 2)).collect();
let result = parse_batch_parallel_limited(&boundaries, &record_data, 2);
assert!(result.is_ok() || result.is_err());
}
}