Skip to main content

pcd_rs/
writer.rs

1//! Types for writing PCD data.
2//!
3//! [Writer](crate::writer::Writer) lets you write points sequentially to
4//! PCD file or writer given by user. The written point type must implement
5//! [PcdSerialize](crate::record::PcdSerialize) trait.
6//! See [record](crate::record) moduel doc to implement your own point type.
7#![cfg_attr(
8    feature = "derive",
9    doc = r##"
10```rust
11use eyre::Result;
12use pcd_rs::{DataKind, PcdSerialize, Writer, WriterInit};
13use std::path::Path;
14
15#[derive(PcdSerialize)]
16pub struct Point {
17    x: f32,
18    y: f32,
19    z: f32,
20}
21
22fn main() -> Result<()> {
23    let mut writer: Writer<Point, _> = WriterInit {
24        height: 300,
25        width: 1,
26        viewpoint: Default::default(),
27        data_kind: DataKind::Ascii,
28        schema: None,
29        version: None,
30    }
31    .create("test_files/dump.pcd")?;
32
33    let point = Point {
34        x: 3.14159,
35        y: 2.71828,
36        z: -5.0,
37    };
38
39    writer.push(&point)?;
40    writer.finish()?;
41
42#  std::fs::remove_file("test_files/dump.pcd").unwrap();
43
44    Ok(())
45}
46```
47"##
48)]
49
50use crate::{
51    lzf,
52    metas::{DataKind, FieldDef, Schema, ValueKind, ViewPoint},
53    record::{DynRecord, PcdSerialize},
54    Error, Result,
55};
56use byteorder::{LittleEndian, WriteBytesExt};
57use std::{
58    collections::HashSet,
59    fs::File,
60    io::{prelude::*, BufWriter, Cursor, SeekFrom},
61    marker::PhantomData,
62    path::Path,
63};
64
65/// The `DynReader` struct writes points with schema determined in runtime.
66pub type DynWriter<W> = Writer<DynRecord, W>;
67
68/// A builder type that builds [Writer](crate::writer::Writer).
69pub struct WriterInit {
70    pub width: u64,
71    pub height: u64,
72    pub viewpoint: ViewPoint,
73    pub data_kind: DataKind,
74    pub schema: Option<Schema>,
75    /// PCD version to write (defaults to "0.7")
76    pub version: Option<String>,
77}
78
79impl WriterInit {
80    /// Builds new [Writer](crate::writer::Writer) object from a writer.
81    /// The writer must implement both [Write](std::io::Write) and [Write](std::io::Seek)
82    /// traits.
83    pub fn build_from_writer<Record: PcdSerialize, W: Write + Seek>(
84        self,
85        writer: W,
86    ) -> Result<Writer<Record, W>, Error> {
87        let record_spec = if Record::is_dynamic() {
88            // Check if the schema is set.
89            let Some(schema) = self.schema else {
90                return Err(Error::new_invalid_writer_configuration_error(
91                    "The schema is not set on the writer. It is required for the dynamic record type."
92                ));
93            };
94
95            schema
96        } else {
97            if self.schema.is_some() {
98                return Err(Error::new_invalid_writer_configuration_error(
99                    "schema should not be set for static record type",
100                ));
101            }
102            Record::write_spec()
103        };
104
105        let version = self.version.unwrap_or_else(|| "0.7".to_string());
106
107        let seq_writer = Writer::new(
108            self.width,
109            self.height,
110            self.data_kind,
111            self.viewpoint,
112            record_spec,
113            writer,
114            version,
115        )?;
116        Ok(seq_writer)
117    }
118
119    /// Builds new [Writer](crate::writer::Writer) by creating a new file.
120    pub fn create<Record, P>(self, path: P) -> Result<Writer<Record, BufWriter<File>>>
121    where
122        Record: PcdSerialize,
123        P: AsRef<Path>,
124    {
125        let writer = BufWriter::new(File::create(path.as_ref())?);
126        let seq_writer = self.build_from_writer(writer)?;
127        Ok(seq_writer)
128    }
129}
130
131/// The `Writer` struct writes points in type `T` to writer `W`.
132pub struct Writer<T, W>
133where
134    W: Write + Seek,
135{
136    data_kind: DataKind,
137    record_spec: Schema,
138    writer: W,
139    num_records: usize,
140    points_arg_begin: u64,
141    points_arg_width: usize,
142    finished: bool,
143    compressed_buffer: Option<Vec<u8>>,
144    _phantom: PhantomData<T>,
145}
146
147impl<W, Record> Writer<Record, W>
148where
149    Record: PcdSerialize,
150    W: Write + Seek,
151{
152    fn new(
153        width: u64,
154        height: u64,
155        data_kind: DataKind,
156        viewpoint: ViewPoint,
157        record_spec: Schema,
158        mut writer: W,
159        version: String,
160    ) -> Result<Self, Error> {
161        macro_rules! ensure {
162            ($cond:expr, $desc:expr) => {
163                if !$cond {
164                    return Err(Error::new_invalid_writer_configuration_error($desc));
165                }
166            };
167        }
168
169        // Validate version
170        match version.as_str() {
171            "0.5" | ".5" | "0.6" | ".6" | "0.7" | ".7" => {}
172            _ => {
173                return Err(Error::new_invalid_writer_configuration_error(
174                    "Unsupported PCD version. Supported versions: 0.5, 0.6, 0.7",
175                ))
176            }
177        }
178
179        // Check version-specific constraints
180        if version == "0.5" || version == ".5" || version == "0.6" || version == ".6" {
181            // Legacy versions don't support binary_compressed
182            if matches!(data_kind, DataKind::BinaryCompressed) {
183                return Err(Error::new_invalid_writer_configuration_error(
184                    "binary_compressed format is only supported in PCD v0.7",
185                ));
186            }
187        }
188
189        // Run sanity check on the schema.
190        {
191            for FieldDef { name, count, .. } in &record_spec {
192                ensure!(!name.is_empty(), "field name must not be empty");
193                ensure!(*count > 0, "The field count must be nonzero");
194            }
195
196            let names: HashSet<_> = record_spec.iter().map(|field| &field.name).collect();
197            ensure!(
198                names.len() == record_spec.len(),
199                "schema names must be unique"
200            );
201        }
202
203        let (points_arg_begin, points_arg_width) = {
204            let fields_args: Vec<_> = record_spec
205                .iter()
206                .map(|field| field.name.to_owned())
207                .collect();
208
209            let size_args: Vec<_> = record_spec
210                .iter()
211                .map(|field| field.kind.byte_size().to_string())
212                .collect();
213
214            let type_args: Vec<_> = record_spec
215                .iter()
216                .map(|field| {
217                    use ValueKind::*;
218                    match field.kind {
219                        U8 | U16 | U32 | U64 => "U",
220                        I8 | I16 | I32 | I64 => "I",
221                        F32 | F64 => "F",
222                    }
223                })
224                .collect();
225
226            let count_args: Vec<_> = record_spec
227                .iter()
228                .map(|field| field.count.to_string())
229                .collect();
230
231            let viewpoint_args: Vec<_> = {
232                [
233                    viewpoint.tx,
234                    viewpoint.ty,
235                    viewpoint.tz,
236                    viewpoint.qw,
237                    viewpoint.qx,
238                    viewpoint.qy,
239                    viewpoint.qz,
240                ]
241                .iter()
242                .map(|value| value.to_string())
243                .collect()
244            };
245
246            let points_arg_width = (usize::MAX as f64).log10().floor() as usize + 1;
247
248            // Normalize version format for header
249            let header_version = match version.as_str() {
250                "0.5" | ".5" => ".5",
251                "0.6" | ".6" => ".6",
252                "0.7" | ".7" => ".7",
253                _ => ".7",
254            };
255
256            let header_comment = match header_version {
257                ".5" => "# .PCD v.5 - Point Cloud Data file format",
258                ".6" => "# .PCD v.6 - Point Cloud Data file format",
259                _ => "# .PCD v.7 - Point Cloud Data file format",
260            };
261
262            writeln!(writer, "{}", header_comment)?;
263            writeln!(writer, "VERSION {}", header_version)?;
264            writeln!(writer, "FIELDS {}", fields_args.join(" "))?;
265            writeln!(writer, "SIZE {}", size_args.join(" "))?;
266            writeln!(writer, "TYPE {}", type_args.join(" "))?;
267            writeln!(writer, "COUNT {}", count_args.join(" "))?;
268            writeln!(writer, "WIDTH {}", width)?;
269            writeln!(writer, "HEIGHT {}", height)?;
270
271            // Only write VIEWPOINT for v0.7
272            if header_version == ".7" {
273                writeln!(writer, "VIEWPOINT {}", viewpoint_args.join(" "))?;
274            }
275
276            write!(writer, "POINTS ")?;
277            let points_arg_begin = writer.stream_position()?;
278            writeln!(writer, "{:width$}", " ", width = points_arg_width)?;
279
280            match data_kind {
281                DataKind::Binary => writeln!(writer, "DATA binary")?,
282                DataKind::Ascii => writeln!(writer, "DATA ascii")?,
283                DataKind::BinaryCompressed => writeln!(writer, "DATA binary_compressed")?,
284            }
285
286            (points_arg_begin, points_arg_width)
287        };
288
289        let compressed_buffer = if data_kind == DataKind::BinaryCompressed {
290            Some(Vec::new())
291        } else {
292            None
293        };
294
295        let seq_writer = Self {
296            data_kind,
297            record_spec,
298            writer,
299            num_records: 0,
300            points_arg_begin,
301            points_arg_width,
302            finished: false,
303            compressed_buffer,
304            _phantom: PhantomData,
305        };
306        Ok(seq_writer)
307    }
308
309    /// Finish the writer.
310    ///
311    /// The method consumes the writer must be called once when finished.
312    /// Otherwise it will panic when it drops.
313    pub fn finish(mut self) -> Result<()> {
314        // Write compressed data if using compression
315        if self.data_kind == DataKind::BinaryCompressed {
316            if let Some(ref row_major_data) = self.compressed_buffer {
317                if row_major_data.is_empty() {
318                    // For empty data, write zeros for sizes
319                    self.writer.write_u32::<LittleEndian>(0)?;
320                    self.writer.write_u32::<LittleEndian>(0)?;
321                } else {
322                    // Transpose from row-major to column-major (SoA) layout
323                    let num_points = self.num_records;
324                    let field_byte_sizes: Vec<usize> = self
325                        .record_spec
326                        .iter()
327                        .map(|f| f.kind.byte_size() * f.count as usize)
328                        .collect();
329                    let record_size: usize = field_byte_sizes.iter().sum();
330
331                    // column_start[f] is the byte offset where field f's column begins
332                    let mut column_start = Vec::with_capacity(field_byte_sizes.len());
333                    let mut offset = 0usize;
334                    for &fbs in &field_byte_sizes {
335                        column_start.push(offset);
336                        offset += fbs * num_points;
337                    }
338
339                    // field_offset_in_record[f] is the byte offset of field f within a single record
340                    let mut field_offset_in_record = Vec::with_capacity(field_byte_sizes.len());
341                    let mut rec_offset = 0usize;
342                    for &fbs in &field_byte_sizes {
343                        field_offset_in_record.push(rec_offset);
344                        rec_offset += fbs;
345                    }
346
347                    let mut col_major = vec![0u8; row_major_data.len()];
348                    for i in 0..num_points {
349                        for (f, &fbs) in field_byte_sizes.iter().enumerate() {
350                            let src = i * record_size + field_offset_in_record[f];
351                            let dst = column_start[f] + i * fbs;
352                            col_major[dst..dst + fbs]
353                                .copy_from_slice(&row_major_data[src..src + fbs]);
354                        }
355                    }
356
357                    // Compress the column-major data
358                    let compressed_data = lzf::compress(&col_major)?;
359
360                    // Write compressed size and uncompressed size
361                    self.writer
362                        .write_u32::<LittleEndian>(compressed_data.len() as u32)?;
363                    self.writer
364                        .write_u32::<LittleEndian>(col_major.len() as u32)?;
365
366                    // Write compressed data
367                    self.writer.write_all(&compressed_data)?;
368                }
369            }
370        }
371
372        // Update the points count in the header
373        self.writer.seek(SeekFrom::Start(self.points_arg_begin))?;
374        write!(
375            self.writer,
376            "{:<width$}",
377            self.num_records,
378            width = self.points_arg_width
379        )?;
380        self.finished = true;
381        Ok(())
382    }
383
384    /// Writes a new point to PCD data.
385    pub fn push(&mut self, record: &Record) -> Result<()> {
386        match self.data_kind {
387            DataKind::Binary => record.write_chunk(&mut self.writer, &self.record_spec)?,
388            DataKind::Ascii => record.write_line(&mut self.writer, &self.record_spec)?,
389            DataKind::BinaryCompressed => {
390                // Buffer the binary data for compression
391                if let Some(ref mut buffer) = self.compressed_buffer {
392                    // Create a temporary buffer with cursor to write the record
393                    let mut temp_buffer = Vec::new();
394                    let mut cursor = Cursor::new(&mut temp_buffer);
395                    record.write_chunk(&mut cursor, &self.record_spec)?;
396                    buffer.extend_from_slice(&temp_buffer);
397                } else {
398                    return Err(Error::ParseError {
399                        line: 0,
400                        desc: "Compressed buffer not initialized".into(),
401                    });
402                }
403            }
404        }
405
406        self.num_records += 1;
407        Ok(())
408    }
409}
410
411impl<W, Record> Drop for Writer<Record, W>
412where
413    W: Write + Seek,
414{
415    fn drop(&mut self) {
416        if !self.finished {
417            panic!("call finish() before Writer drops");
418        }
419    }
420}