Skip to main content

pcd_rs/
reader.rs

1//! Types for reading PCD data.
2//!
3//! [Reader](crate::reader::Reader) lets you load points sequentially with
4//! [Iterator](std::iter::Iterator) interface. The points are stored in
5//! types implementing [PcdDeserialize](crate::record::PcdDeserialize) trait.
6//! See [record](crate::record) moduel doc to implement your own point type.
7#![cfg_attr(
8    feature = "derive",
9    doc = r##"
10```rust
11use pcd_rs::{PcdDeserialize, Reader};
12use std::path::Path;
13
14#[derive(PcdDeserialize)]
15pub struct Point {
16    x: f32,
17    y: f32,
18    z: f32,
19    rgb: f32,
20}
21
22fn main() -> pcd_rs::Result<()> {
23    let reader = Reader::open("test_files/ascii.pcd")?;
24    let points: pcd_rs::Result<Vec<Point>> = reader.collect();
25    assert_eq!(points?.len(), 213);
26    Ok(())
27}
28```
29"##
30)]
31
32use crate::{
33    error::Error,
34    lzf,
35    metas::{DataKind, FieldDef, PcdMeta},
36    record::{DynRecord, PcdDeserialize},
37    Result,
38};
39use byteorder::{LittleEndian, ReadBytesExt};
40use std::{
41    fs::File,
42    io::{prelude::*, BufReader, Cursor},
43    marker::PhantomData,
44    path::Path,
45};
46
47/// The `DynReader` struct loads points with schema determined in runtime.
48pub type DynReader<R> = Reader<DynRecord, R>;
49
50/// The `Reader<T, R>` struct loads points into type `T` from reader `R`.
51pub struct Reader<T, R>
52where
53    R: Read,
54{
55    meta: PcdMeta,
56    record_count: usize,
57    finished: bool,
58    reader: R,
59    decompressed_buffer: Option<Cursor<Vec<u8>>>,
60    _phantom: PhantomData<T>,
61}
62
63impl<'a, Record> Reader<Record, BufReader<Cursor<&'a [u8]>>>
64where
65    Record: PcdDeserialize,
66{
67    pub fn from_bytes(buf: &'a [u8]) -> Result<Self> {
68        let reader = BufReader::new(Cursor::new(buf));
69        Self::from_reader(reader)
70    }
71}
72
73impl<Record, R> Reader<Record, R>
74where
75    Record: PcdDeserialize,
76    R: BufRead,
77{
78    pub fn from_reader(mut reader: R) -> Result<Self> {
79        let mut line_count = 0;
80        let meta = crate::utils::load_meta(&mut reader, &mut line_count)?;
81
82        // Checks whether the record schema matches the file meta
83        if !Record::is_dynamic() {
84            let record_spec = Record::read_spec();
85
86            macro_rules! bail {
87                () => {
88                    return Err(Error::new_reader_schema_mismatch_error(
89                        record_spec.clone(),
90                        meta.field_defs.fields.clone(),
91                    ));
92                };
93            }
94
95            if record_spec.len() != meta.field_defs.len() {
96                bail!();
97            }
98
99            for (record_field, meta_field) in record_spec.iter().zip(meta.field_defs.iter()) {
100                let (ref name_opt, record_kind, record_count_opt) = *record_field;
101                let FieldDef {
102                    name: ref meta_name,
103                    kind: meta_kind,
104                    count: meta_count,
105                } = *meta_field;
106
107                if record_kind != meta_kind {
108                    bail!();
109                }
110
111                if let Some(name) = &name_opt {
112                    if name != meta_name {
113                        bail!();
114                    }
115                }
116
117                if let Some(record_count) = record_count_opt {
118                    if record_count != meta_count as usize {
119                        bail!();
120                    }
121                }
122            }
123        }
124
125        // For compressed data, read and decompress the entire data section
126        let decompressed_buffer = if meta.data == DataKind::BinaryCompressed {
127            // Read compressed size and uncompressed size
128            let compressed_size = reader.read_u32::<LittleEndian>()?;
129            let uncompressed_size = reader.read_u32::<LittleEndian>()?;
130
131            if compressed_size == 0 && uncompressed_size == 0 {
132                // Empty compressed data
133                Some(Cursor::new(Vec::new()))
134            } else {
135                // Read compressed data
136                let mut compressed_data = vec![0u8; compressed_size as usize];
137                reader.read_exact(&mut compressed_data)?;
138
139                // Decompress (data is in column-major / SoA layout)
140                let col_major = lzf::decompress(&compressed_data, uncompressed_size as usize)?;
141
142                // Transpose from column-major to row-major so read_chunk works
143                let num_points = meta.num_points as usize;
144                if num_points == 0 {
145                    Some(Cursor::new(col_major))
146                } else {
147                    // Calculate per-field byte sizes and record size
148                    let field_byte_sizes: Vec<usize> = meta
149                        .field_defs
150                        .iter()
151                        .map(|f| f.kind.byte_size() * f.count as usize)
152                        .collect();
153                    let record_size: usize = field_byte_sizes.iter().sum();
154
155                    let mut row_major = vec![0u8; col_major.len()];
156
157                    // column_start[f] is the byte offset where field f's column begins
158                    let mut column_start = Vec::with_capacity(field_byte_sizes.len());
159                    let mut offset = 0usize;
160                    for &fbs in &field_byte_sizes {
161                        column_start.push(offset);
162                        offset += fbs * num_points;
163                    }
164
165                    // field_offset_in_record[f] is the byte offset of field f within a single record
166                    let mut field_offset_in_record = Vec::with_capacity(field_byte_sizes.len());
167                    let mut rec_offset = 0usize;
168                    for &fbs in &field_byte_sizes {
169                        field_offset_in_record.push(rec_offset);
170                        rec_offset += fbs;
171                    }
172
173                    for i in 0..num_points {
174                        for (f, &fbs) in field_byte_sizes.iter().enumerate() {
175                            let src = column_start[f] + i * fbs;
176                            let dst = i * record_size + field_offset_in_record[f];
177                            row_major[dst..dst + fbs].copy_from_slice(&col_major[src..src + fbs]);
178                        }
179                    }
180
181                    Some(Cursor::new(row_major))
182                }
183            }
184        } else {
185            None
186        };
187
188        let pcd_reader = Reader {
189            meta,
190            reader,
191            record_count: 0,
192            finished: false,
193            decompressed_buffer,
194            _phantom: PhantomData,
195        };
196
197        Ok(pcd_reader)
198    }
199}
200
201impl<Record> Reader<Record, BufReader<File>>
202where
203    Record: PcdDeserialize,
204{
205    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
206        let file = BufReader::new(File::open(path.as_ref())?);
207        Self::from_reader(file)
208    }
209}
210
211impl<R, Record> Reader<Record, R>
212where
213    R: BufRead,
214{
215    /// Get meta data.
216    pub fn meta(&self) -> &PcdMeta {
217        &self.meta
218    }
219}
220
221impl<R, Record> Iterator for Reader<Record, R>
222where
223    R: BufRead,
224    Record: PcdDeserialize,
225{
226    type Item = Result<Record>;
227
228    fn next(&mut self) -> Option<Self::Item> {
229        if self.finished {
230            return None;
231        }
232
233        // Check if we've already read all points or if there are no points
234        if self.record_count >= self.meta.num_points as usize {
235            self.finished = true;
236            return None;
237        }
238
239        let record_result = match self.meta.data {
240            DataKind::Ascii => Record::read_line(&mut self.reader, &self.meta.field_defs),
241            DataKind::Binary => Record::read_chunk(&mut self.reader, &self.meta.field_defs),
242            DataKind::BinaryCompressed => {
243                // Read from decompressed buffer
244                if let Some(ref mut buffer) = self.decompressed_buffer {
245                    Record::read_chunk(buffer, &self.meta.field_defs)
246                } else {
247                    return Some(Err(Error::ParseError {
248                        line: 0,
249                        desc: "Compressed data buffer not initialized".into(),
250                    }));
251                }
252            }
253        };
254
255        match record_result {
256            Ok(_) => {
257                self.record_count += 1;
258                if self.record_count == self.meta.num_points as usize {
259                    self.finished = true;
260                }
261            }
262            Err(_) => {
263                self.finished = true;
264            }
265        }
266
267        Some(record_result)
268    }
269
270    fn size_hint(&self) -> (usize, Option<usize>) {
271        let size = self.meta.num_points as usize;
272        (size, Some(size))
273    }
274}