serde_ply/de/
chunked.rs

1use crate::{
2    de::{
3        val_reader::{AsciiValReader, BinValReader, ScalarReader},
4        RowDeserializer,
5    },
6    DeserializeError, ElementDef, PlyFormat, PlyHeader,
7};
8use byteorder::{BigEndian, LittleEndian};
9use serde::{
10    de::{DeserializeSeed, Error, SeqAccess, Visitor},
11    Deserialize, Deserializer,
12};
13use std::{io::Cursor, marker::PhantomData};
14
15/// Streaming PLY file parser for chunked data processing.
16///
17/// Processes PLY files incrementally as data becomes available, useful for
18/// streaming data or async readers. Unlike [`crate::PlyReader`], this parser
19/// can handle incomplete data and resume parsing when more bytes arrive.
20///
21/// # Example
22/// ```rust
23/// use serde::{Deserialize, de::DeserializeSeed};
24/// use serde_ply::{PlyChunkedReader, RowVisitor};
25///
26/// #[derive(Deserialize)]
27/// struct Vertex { x: f32, y: f32, z: f32 }
28///
29/// let mut file = PlyChunkedReader::new();
30/// let mut vertices = Vec::new();
31///
32/// // Feed data in chunks
33/// let data = br#"ply
34/// format ascii 1.0
35/// element vertex 2
36/// property float x
37/// property float y
38/// property float z
39/// end_header
40/// 1.0 2.0 3.0
41/// 4.0 5.0 6.0
42/// "#;
43///
44/// for chunk in data.chunks(15) {
45///     file.buffer_mut().extend_from_slice(chunk);
46///
47///     if file.current_element().is_some() {
48///         RowVisitor::new(|v: Vertex| vertices.push(v)).deserialize(&mut file)?;
49///     }
50/// }
51/// assert_eq!(vertices.len(), 2);
52/// # Ok::<(), Box<dyn std::error::Error>>(())
53/// ```
54pub struct PlyChunkedReader {
55    header: Option<PlyHeader>,
56    current_element_index: usize,
57    rows_parsed: usize,
58    data_buffer: Vec<u8>,
59}
60
61impl PlyChunkedReader {
62    /// Create a new chunked PLY file parser.
63    pub fn new() -> Self {
64        Self {
65            header: None,
66            current_element_index: 0,
67            rows_parsed: 0,
68            data_buffer: Vec::new(),
69        }
70    }
71
72    /// Get mutable access to the internal buffer.
73    ///
74    /// Allows writing data directly into the parser's buffer without copies.
75    /// Useful for async readers or when feeding data in chunks.
76    pub fn buffer_mut(&mut self) -> &mut Vec<u8> {
77        &mut self.data_buffer
78    }
79
80    /// Get the parsed PLY header if available.
81    ///
82    /// Returns `None` if there isn't enough buffered data to parse the complete header.
83    /// The header is parsed lazily when first accessed.
84    ///
85    /// # Example
86    /// ```rust
87    /// use serde_ply::PlyChunkedReader;
88    ///
89    /// let mut file = PlyChunkedReader::new();
90    /// assert!(file.header().is_none());
91    ///
92    /// file.buffer_mut().extend_from_slice(
93    ///     b"ply\nformat ascii 1.0\nelement vertex 1\nproperty float x\nend_header\n"
94    /// );
95    /// assert!(file.header().is_some());
96    /// ```
97    pub fn header(&mut self) -> Option<&PlyHeader> {
98        if self.header.is_none() {
99            let available_data = &self.data_buffer;
100            let mut cursor = Cursor::new(available_data);
101            let header = PlyHeader::parse(&mut cursor);
102            if let Ok(header) = header {
103                self.header = Some(header);
104                self.data_buffer.drain(..cursor.position() as usize);
105            }
106        }
107        self.header.as_ref()
108    }
109
110    /// Deserialize as many complete elements as possible from the current buffer.
111    ///
112    /// Stops when the buffer is exhausted or an element boundary is reached.
113    /// Use this for batch processing of available data.
114    pub fn next_chunk<T>(&mut self) -> Result<T, DeserializeError>
115    where
116        T: for<'de> Deserialize<'de>,
117    {
118        T::deserialize(self)
119    }
120
121    /// Get the current element definition being processed.
122    ///
123    /// Returns `None` when the header isn't parsed yet, or when all elements
124    /// have been processed. Use this to inspect element structure before parsing.
125    ///
126    /// # Example
127    /// ```rust
128    /// use serde_ply::PlyChunkedReader;
129    ///
130    /// let mut file = PlyChunkedReader::new();
131    /// file.buffer_mut().extend_from_slice(
132    ///     b"ply\nformat ascii 1.0\nelement vertex 1\nproperty float x\nend_header\n"
133    /// );
134    ///
135    /// if let Some(element) = file.current_element() {
136    ///     assert_eq!(element.name, "vertex");
137    ///     assert_eq!(element.count, 1);
138    /// }
139    /// ```
140    pub fn current_element(&mut self) -> Option<&ElementDef> {
141        let ind = self.current_element_index;
142        self.header().and_then(|e| e.elem_defs.get(ind))
143    }
144
145    /// Number of rows parsed so far in the current element.
146    pub fn rows_done(&self) -> usize {
147        self.rows_parsed
148    }
149}
150
151impl<'de> Deserializer<'de> for &'_ mut PlyChunkedReader {
152    type Error = DeserializeError;
153
154    fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
155    where
156        V: serde::de::Visitor<'de>,
157    {
158        self.deserialize_seq(visitor)
159    }
160
161    fn deserialize_seq<V>(self, visitor: V) -> Result<V::Value, Self::Error>
162    where
163        V: serde::de::Visitor<'de>,
164    {
165        let _ = self.header();
166        // Make sure header is parsed
167        let Some(header) = &self.header else {
168            return visitor.visit_seq(EmptySeq);
169        };
170
171        // Check if we've moved past all elements, if so error that we've run out of elements.
172        if self.current_element_index >= header.elem_defs.len() {
173            return Err(DeserializeError::custom("Ran out of elements"));
174        }
175
176        let elem_def = &header.elem_defs[self.current_element_index];
177
178        let mut cursor = Cursor::new(&self.data_buffer);
179        let remaining = elem_def.count - self.rows_parsed;
180
181        let (res, rows_remaining) = match header.format {
182            PlyFormat::Ascii => {
183                let mut seq = ChunkPlyReaderSeqVisitor {
184                    remaining,
185                    row: RowDeserializer::<_, AsciiValReader>::new(
186                        &mut cursor,
187                        &elem_def.properties,
188                    ),
189                };
190                let res = visitor.visit_seq(&mut seq)?;
191                (res, seq.remaining)
192            }
193            PlyFormat::BinaryLittleEndian => {
194                let mut seq = ChunkPlyReaderSeqVisitor {
195                    remaining,
196                    row: RowDeserializer::<_, BinValReader<LittleEndian>>::new(
197                        &mut cursor,
198                        &elem_def.properties,
199                    ),
200                };
201                let res = visitor.visit_seq(&mut seq)?;
202                (res, seq.remaining)
203            }
204            PlyFormat::BinaryBigEndian => {
205                let mut seq = ChunkPlyReaderSeqVisitor {
206                    remaining,
207                    row: RowDeserializer::<_, BinValReader<BigEndian>>::new(
208                        &mut cursor,
209                        &elem_def.properties,
210                    ),
211                };
212                let res = visitor.visit_seq(&mut seq)?;
213                (res, seq.remaining)
214            }
215        };
216
217        self.rows_parsed = elem_def.count - rows_remaining;
218        self.data_buffer.drain(..cursor.position() as usize);
219
220        // If we've parsed all elements move to the next element.
221        if self.rows_parsed >= elem_def.count {
222            self.rows_parsed = 0;
223            self.current_element_index += 1;
224        }
225
226        Ok(res)
227    }
228
229    fn deserialize_newtype_struct<V>(
230        self,
231        _name: &'static str,
232        visitor: V,
233    ) -> Result<V::Value, Self::Error>
234    where
235        V: Visitor<'de>,
236    {
237        visitor.visit_newtype_struct(self)
238    }
239
240    serde::forward_to_deserialize_any! {
241        bool i8 u8 i16 u16 i32 u32 f32 f64 i128 i64 u128 u64 char str string
242        bytes byte_buf unit unit_struct tuple
243        tuple_struct map struct enum identifier ignored_any option
244    }
245}
246
247struct EmptySeq;
248
249impl<'de> SeqAccess<'de> for EmptySeq {
250    type Error = DeserializeError;
251
252    fn next_element_seed<T>(&mut self, _seed: T) -> Result<Option<T::Value>, Self::Error>
253    where
254        T: serde::de::DeserializeSeed<'de>,
255    {
256        Ok(None)
257    }
258}
259
260struct ChunkPlyReaderSeqVisitor<'a, D: AsRef<[u8]>, S: ScalarReader> {
261    remaining: usize,
262    row: RowDeserializer<'a, Cursor<D>, S>,
263}
264
265impl<'de, D: AsRef<[u8]>, S: ScalarReader> SeqAccess<'de>
266    for &mut ChunkPlyReaderSeqVisitor<'_, D, S>
267{
268    type Error = DeserializeError;
269
270    fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>, Self::Error>
271    where
272        T: serde::de::DeserializeSeed<'de>,
273    {
274        if self.remaining == 0 {
275            return Ok(None);
276        }
277
278        let last_pos = self.row.reader.position();
279        match seed.deserialize(&mut self.row) {
280            Ok(element) => {
281                self.remaining -= 1;
282                Ok(Some(element))
283            }
284            // Not enough data for this element, stop here
285            Err(e) if e.0.kind() == std::io::ErrorKind::UnexpectedEof => {
286                self.row.reader.set_position(last_pos);
287                Ok(None)
288            }
289            Err(e) => Err(e)?,
290        }
291    }
292
293    fn size_hint(&self) -> Option<usize> {
294        Some(self.remaining)
295    }
296}
297
298impl Default for PlyChunkedReader {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304/// Visitor for processing PLY rows one at a time.
305///
306/// Provides a callback-based interface for processing PLY elements
307/// without collecting them into intermediate collections. Useful for
308/// streaming processing or when memory usage is a concern.
309///
310/// # Example
311/// ```rust
312/// use serde::{Deserialize, de::DeserializeSeed};
313/// use serde_ply::{PlyChunkedReader, RowVisitor};
314///
315/// #[derive(Deserialize)]
316/// struct Vertex { x: f32, y: f32, z: f32 }
317///
318/// let mut file = PlyChunkedReader::new();
319/// file.buffer_mut().extend_from_slice(
320///     b"ply\nformat ascii 1.0\nelement vertex 1\nproperty float x\nproperty float y\nproperty float z\nend_header\n1.0 2.0 3.0\n"
321/// );
322///
323/// let mut count = 0;
324/// RowVisitor::new(|_vertex: Vertex| {
325///     count += 1;
326/// }).deserialize(&mut file)?;
327/// assert_eq!(count, 1);
328/// # Ok::<(), Box<dyn std::error::Error>>(())
329/// ```
330pub struct RowVisitor<T, F: FnMut(T)> {
331    row_callback: F,
332    _row: PhantomData<T>,
333}
334
335impl<T, F: FnMut(T)> RowVisitor<T, F> {
336    /// Create a new row visitor with the given callback.
337    ///
338    /// The callback is invoked for each successfully parsed row.
339    /// Call [`serde::de::DeserializeSeed::deserialize`] to start processing.
340    #[must_use = "Please call deserialize(&mut file) to actually deserialize data"]
341    pub fn new(row_callback: F) -> Self {
342        Self {
343            row_callback,
344            _row: PhantomData,
345        }
346    }
347}
348
349impl<'de, T: Deserialize<'de>, F: FnMut(T)> DeserializeSeed<'de> for &mut RowVisitor<T, F> {
350    type Value = ();
351
352    fn deserialize<D: Deserializer<'de>>(self, deserializer: D) -> Result<(), D::Error> {
353        deserializer.deserialize_seq(self)
354    }
355}
356
357impl<'de, T: Deserialize<'de>, F: FnMut(T)> Visitor<'de> for &mut RowVisitor<T, F> {
358    type Value = ();
359    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
360        formatter.write_str("a sequence of rows")
361    }
362    fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<(), A::Error> {
363        while let Some(row) = seq.next_element()? {
364            (self.row_callback)(row);
365        }
366        Ok(())
367    }
368}