Skip to main content

copc_writer/
spill.rs

1//! Disk spill for streaming COPC writes.
2
3use std::fs::File;
4use std::io::{BufWriter, Write};
5use std::path::Path;
6#[cfg(test)]
7use std::path::PathBuf;
8
9use copc_core::{
10    deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13use tempfile::{NamedTempFile, TempPath};
14
15const SPILL_IO_BUFFER_BYTES: usize = 1024 * 1024;
16
17/// Streams `LasPointRecord` values to a process-local temporary spill file.
18pub struct SpillWriter {
19    #[cfg(test)]
20    path: PathBuf,
21    file: Option<BufWriter<NamedTempFile>>,
22    layout: StreamingLayout,
23    record_width: usize,
24    scratch: Vec<u8>,
25    count: u64,
26    bounds: Option<Bounds>,
27}
28
29impl SpillWriter {
30    pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
31        let file = tempfile::Builder::new()
32            .prefix(".copc-writer-spill.")
33            .suffix(".part")
34            .tempfile_in(spill_dir)
35            .map_err(|e| Error::io("create spill file", e))?;
36        #[cfg(test)]
37        let path = file.path().to_path_buf();
38        let record_width = layout.record_width();
39        Ok(Self {
40            #[cfg(test)]
41            path,
42            file: Some(BufWriter::with_capacity(SPILL_IO_BUFFER_BYTES, file)),
43            layout,
44            record_width,
45            scratch: vec![0u8; record_width],
46            count: 0,
47            bounds: None,
48        })
49    }
50
51    pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
52        serialize_le(record, &self.layout, &mut self.scratch)
53            .map_err(|e| Error::InvalidInput(format!("encode spill record: {e}")))?;
54        let writer = self
55            .file
56            .as_mut()
57            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
58        writer
59            .write_all(&self.scratch)
60            .map_err(|e| Error::io("write spill record", e))?;
61        match self.bounds.as_mut() {
62            Some(bounds) => bounds.extend(record.x, record.y, record.z),
63            None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
64        }
65        self.count += 1;
66        Ok(())
67    }
68
69    pub fn count(&self) -> u64 {
70        self.count
71    }
72
73    pub fn finalize(mut self) -> Result<SpillReader> {
74        let mut writer = self
75            .file
76            .take()
77            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
78        writer
79            .flush()
80            .map_err(|e| Error::io("flush spill writer", e))?;
81        let file = writer
82            .into_inner()
83            .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
84        file.as_file()
85            .sync_all()
86            .map_err(|e| Error::io("sync spill file", e))?;
87        let mmap_file = file
88            .reopen()
89            .map_err(|e| Error::io("open spill for mmap", e))?;
90        let temp_path = file.into_temp_path();
91        let count = usize::try_from(self.count)
92            .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
93        let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
94        SpillReader::open(
95            temp_path,
96            mmap_file,
97            self.layout,
98            self.record_width,
99            count,
100            bounds,
101        )
102    }
103}
104
105/// Memory-mapped random-access view over a finalized spill file.
106pub struct SpillReader {
107    #[cfg(test)]
108    path: PathBuf,
109    mmap: Mmap,
110    _file: File,
111    _path: TempPath,
112    layout: StreamingLayout,
113    record_width: usize,
114    count: usize,
115    bounds: Bounds,
116}
117
118impl SpillReader {
119    fn open(
120        temp_path: TempPath,
121        file: File,
122        layout: StreamingLayout,
123        record_width: usize,
124        count: usize,
125        bounds: Bounds,
126    ) -> Result<Self> {
127        #[cfg(test)]
128        let path = temp_path.to_path_buf();
129        let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
130        let expected = record_width
131            .checked_mul(count)
132            .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
133        if mmap.len() != expected {
134            return Err(Error::InvalidInput(format!(
135                "spill file is {} bytes, expected {}",
136                mmap.len(),
137                expected
138            )));
139        }
140        Ok(Self {
141            #[cfg(test)]
142            path,
143            mmap,
144            _file: file,
145            _path: temp_path,
146            layout,
147            record_width,
148            count,
149            bounds,
150        })
151    }
152
153    pub fn len(&self) -> usize {
154        self.count
155    }
156
157    pub fn is_empty(&self) -> bool {
158        self.count == 0
159    }
160
161    pub fn layout(&self) -> &StreamingLayout {
162        &self.layout
163    }
164
165    pub fn bounds(&self) -> Bounds {
166        self.bounds
167    }
168
169    #[inline]
170    fn record_bytes(&self, index: usize) -> &[u8] {
171        let start = index * self.record_width;
172        &self.mmap[start..start + self.record_width]
173    }
174
175    #[inline]
176    pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
177        debug_assert!(index < self.count);
178        let bytes = self.record_bytes(index);
179        let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
180        let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
181        let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
182        (x, y, z)
183    }
184
185    pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
186        if index >= self.count {
187            return Err(Error::InvalidInput(format!(
188                "spill index {index} out of range (len {})",
189                self.count
190            )));
191        }
192        deserialize_le(self.record_bytes(index), &self.layout)
193            .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    fn layout_with_color() -> StreamingLayout {
202        StreamingLayout {
203            point_format: 3,
204            has_gps: true,
205            has_color: true,
206            has_nir: false,
207            has_waveform: false,
208            extra_bytes: 2,
209            extra_bytes_descriptors: Vec::new(),
210        }
211    }
212
213    fn record(seed: u32) -> LasPointRecord {
214        let f = f64::from(seed);
215        LasPointRecord {
216            x: f * 1.5,
217            y: -f * 2.25,
218            z: f * 0.125,
219            intensity: seed as u16,
220            return_number: (seed % 5) as u8,
221            number_of_returns: 5,
222            classification: (seed % 32) as u8,
223            scan_direction_flag: seed % 2 == 0,
224            edge_of_flight_line: seed % 3 == 0,
225            scan_angle: (seed as f32) - 100.25,
226            user_data: (seed % 256) as u8,
227            point_source_id: seed as u16,
228            synthetic: seed % 4 == 0,
229            key_point: seed % 4 == 1,
230            withheld: seed % 4 == 2,
231            overlap: false,
232            scan_channel: 0,
233            gps_time: 1.0e9 + f,
234            red: (seed * 7) as u16,
235            green: (seed * 11) as u16,
236            blue: (seed * 13) as u16,
237            nir: 0,
238            wave_packet_descriptor_index: 0,
239            byte_offset_to_waveform_data: 0,
240            waveform_packet_size: 0,
241            return_point_waveform_location: 0.0,
242            extra_bytes: vec![(seed & 0xff) as u8, ((seed >> 8) & 0xff) as u8],
243        }
244    }
245
246    #[test]
247    fn spill_round_trips_records_and_bounds() {
248        let dir = tempfile::tempdir().unwrap();
249        let layout = layout_with_color();
250        let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
251        let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
252        for rec in &originals {
253            writer.push(rec).unwrap();
254        }
255        assert_eq!(writer.count(), 256);
256        let reader = writer.finalize().unwrap();
257        assert_eq!(reader.len(), 256);
258        for (i, original) in originals.iter().enumerate() {
259            assert_eq!(reader.record_at(i).unwrap(), *original);
260            assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
261        }
262        let bounds = reader.bounds();
263        assert_eq!(bounds.min, (0.0, -573.75, 0.0));
264        assert_eq!(bounds.max, (382.5, 0.0, 31.875));
265    }
266
267    #[test]
268    fn unfinalized_spill_writer_removes_file() {
269        let dir = tempfile::tempdir().unwrap();
270        let path = {
271            let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
272            writer.push(&record(1)).unwrap();
273            writer.path.clone()
274        };
275        assert!(!path.exists());
276    }
277
278    #[test]
279    fn finalized_spill_reader_removes_file() {
280        let dir = tempfile::tempdir().unwrap();
281        let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
282        writer.push(&record(1)).unwrap();
283        let reader = writer.finalize().unwrap();
284        let path = reader.path.clone();
285        assert!(path.exists());
286        drop(reader);
287        assert!(!path.exists());
288    }
289
290    #[cfg(unix)]
291    #[test]
292    fn spill_file_is_private_on_unix() {
293        use std::os::unix::fs::PermissionsExt;
294
295        let dir = tempfile::tempdir().unwrap();
296        let writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
297        let mode = std::fs::metadata(&writer.path)
298            .unwrap()
299            .permissions()
300            .mode()
301            & 0o777;
302        assert_eq!(mode, 0o600);
303    }
304}