Skip to main content

copc_writer/
spill.rs

1//! Disk spill for streaming COPC writes.
2
3use std::fs::File;
4use std::io::{BufWriter, Write};
5use std::path::Path;
6#[cfg(test)]
7use std::path::PathBuf;
8
9use copc_core::{
10    deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13use tempfile::{NamedTempFile, TempPath};
14
15/// Streams `LasPointRecord` values to a process-local temporary spill file.
16pub struct SpillWriter {
17    #[cfg(test)]
18    path: PathBuf,
19    file: Option<BufWriter<NamedTempFile>>,
20    layout: StreamingLayout,
21    record_width: usize,
22    scratch: Vec<u8>,
23    count: u64,
24    bounds: Option<Bounds>,
25}
26
27impl SpillWriter {
28    pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
29        let file = tempfile::Builder::new()
30            .prefix(".copc-writer-spill.")
31            .suffix(".part")
32            .tempfile_in(spill_dir)
33            .map_err(|e| Error::io("create spill file", e))?;
34        #[cfg(test)]
35        let path = file.path().to_path_buf();
36        let record_width = layout.record_width();
37        Ok(Self {
38            #[cfg(test)]
39            path,
40            file: Some(BufWriter::new(file)),
41            layout,
42            record_width,
43            scratch: vec![0u8; record_width],
44            count: 0,
45            bounds: None,
46        })
47    }
48
49    pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
50        serialize_le(record, &self.layout, &mut self.scratch);
51        let writer = self
52            .file
53            .as_mut()
54            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
55        writer
56            .write_all(&self.scratch)
57            .map_err(|e| Error::io("write spill record", e))?;
58        match self.bounds.as_mut() {
59            Some(bounds) => bounds.extend(record.x, record.y, record.z),
60            None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
61        }
62        self.count += 1;
63        Ok(())
64    }
65
66    pub fn count(&self) -> u64 {
67        self.count
68    }
69
70    pub fn finalize(mut self) -> Result<SpillReader> {
71        let mut writer = self
72            .file
73            .take()
74            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
75        writer
76            .flush()
77            .map_err(|e| Error::io("flush spill writer", e))?;
78        let file = writer
79            .into_inner()
80            .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
81        file.as_file()
82            .sync_all()
83            .map_err(|e| Error::io("sync spill file", e))?;
84        let mmap_file = file
85            .reopen()
86            .map_err(|e| Error::io("open spill for mmap", e))?;
87        let temp_path = file.into_temp_path();
88        let count = usize::try_from(self.count)
89            .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
90        let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
91        SpillReader::open(
92            temp_path,
93            mmap_file,
94            self.layout,
95            self.record_width,
96            count,
97            bounds,
98        )
99    }
100}
101
102/// Memory-mapped random-access view over a finalized spill file.
103pub struct SpillReader {
104    #[cfg(test)]
105    path: PathBuf,
106    mmap: Mmap,
107    _file: File,
108    _path: TempPath,
109    layout: StreamingLayout,
110    record_width: usize,
111    count: usize,
112    bounds: Bounds,
113}
114
115impl SpillReader {
116    fn open(
117        temp_path: TempPath,
118        file: File,
119        layout: StreamingLayout,
120        record_width: usize,
121        count: usize,
122        bounds: Bounds,
123    ) -> Result<Self> {
124        #[cfg(test)]
125        let path = temp_path.to_path_buf();
126        let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
127        let expected = record_width
128            .checked_mul(count)
129            .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
130        if mmap.len() != expected {
131            return Err(Error::InvalidInput(format!(
132                "spill file is {} bytes, expected {}",
133                mmap.len(),
134                expected
135            )));
136        }
137        Ok(Self {
138            #[cfg(test)]
139            path,
140            mmap,
141            _file: file,
142            _path: temp_path,
143            layout,
144            record_width,
145            count,
146            bounds,
147        })
148    }
149
150    pub fn len(&self) -> usize {
151        self.count
152    }
153
154    pub fn is_empty(&self) -> bool {
155        self.count == 0
156    }
157
158    pub fn layout(&self) -> StreamingLayout {
159        self.layout
160    }
161
162    pub fn bounds(&self) -> Bounds {
163        self.bounds
164    }
165
166    #[inline]
167    fn record_bytes(&self, index: usize) -> &[u8] {
168        let start = index * self.record_width;
169        &self.mmap[start..start + self.record_width]
170    }
171
172    #[inline]
173    pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
174        debug_assert!(index < self.count);
175        let bytes = self.record_bytes(index);
176        let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
177        let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
178        let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
179        (x, y, z)
180    }
181
182    pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
183        if index >= self.count {
184            return Err(Error::InvalidInput(format!(
185                "spill index {index} out of range (len {})",
186                self.count
187            )));
188        }
189        deserialize_le(self.record_bytes(index), &self.layout)
190            .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    fn layout_with_color() -> StreamingLayout {
199        StreamingLayout {
200            point_format: 3,
201            has_gps: true,
202            has_color: true,
203            has_nir: false,
204            has_waveform: false,
205        }
206    }
207
208    fn record(seed: u32) -> LasPointRecord {
209        let f = f64::from(seed);
210        LasPointRecord {
211            x: f * 1.5,
212            y: -f * 2.25,
213            z: f * 0.125,
214            intensity: seed as u16,
215            return_number: (seed % 5) as u8,
216            number_of_returns: 5,
217            classification: (seed % 32) as u8,
218            scan_direction_flag: seed % 2 == 0,
219            edge_of_flight_line: seed % 3 == 0,
220            scan_angle: (seed as f32) - 100.25,
221            user_data: (seed % 256) as u8,
222            point_source_id: seed as u16,
223            synthetic: seed % 4 == 0,
224            key_point: seed % 4 == 1,
225            withheld: seed % 4 == 2,
226            overlap: false,
227            scan_channel: 0,
228            gps_time: 1.0e9 + f,
229            red: (seed * 7) as u16,
230            green: (seed * 11) as u16,
231            blue: (seed * 13) as u16,
232            nir: 0,
233            wave_packet_descriptor_index: 0,
234            byte_offset_to_waveform_data: 0,
235            waveform_packet_size: 0,
236            return_point_waveform_location: 0.0,
237        }
238    }
239
240    #[test]
241    fn spill_round_trips_records_and_bounds() {
242        let dir = tempfile::tempdir().unwrap();
243        let layout = layout_with_color();
244        let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
245        let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
246        for rec in &originals {
247            writer.push(rec).unwrap();
248        }
249        assert_eq!(writer.count(), 256);
250        let reader = writer.finalize().unwrap();
251        assert_eq!(reader.len(), 256);
252        for (i, original) in originals.iter().enumerate() {
253            assert_eq!(reader.record_at(i).unwrap(), *original);
254            assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
255        }
256        let bounds = reader.bounds();
257        assert_eq!(bounds.min, (0.0, -573.75, 0.0));
258        assert_eq!(bounds.max, (382.5, 0.0, 31.875));
259    }
260
261    #[test]
262    fn unfinalized_spill_writer_removes_file() {
263        let dir = tempfile::tempdir().unwrap();
264        let path = {
265            let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
266            writer.push(&record(1)).unwrap();
267            writer.path.clone()
268        };
269        assert!(!path.exists());
270    }
271
272    #[test]
273    fn finalized_spill_reader_removes_file() {
274        let dir = tempfile::tempdir().unwrap();
275        let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
276        writer.push(&record(1)).unwrap();
277        let reader = writer.finalize().unwrap();
278        let path = reader.path.clone();
279        assert!(path.exists());
280        drop(reader);
281        assert!(!path.exists());
282    }
283
284    #[cfg(unix)]
285    #[test]
286    fn spill_file_is_private_on_unix() {
287        use std::os::unix::fs::PermissionsExt;
288
289        let dir = tempfile::tempdir().unwrap();
290        let writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
291        let mode = std::fs::metadata(&writer.path)
292            .unwrap()
293            .permissions()
294            .mode()
295            & 0o777;
296        assert_eq!(mode, 0o600);
297    }
298}