Skip to main content

copc_writer/
spill.rs

1//! Disk spill for streaming COPC writes.
2
3use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use copc_core::{
10    deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13
14static SPILL_SEQUENCE: AtomicU64 = AtomicU64::new(0);
15
16/// Streams `LasPointRecord` values to a process-local temporary spill file.
17pub struct SpillWriter {
18    path: PathBuf,
19    file: Option<BufWriter<File>>,
20    layout: StreamingLayout,
21    record_width: usize,
22    scratch: Vec<u8>,
23    count: u64,
24    bounds: Option<Bounds>,
25    keep_file: bool,
26}
27
28impl SpillWriter {
29    pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
30        let nanos = SystemTime::now()
31            .duration_since(UNIX_EPOCH)
32            .map(|d| d.as_nanos())
33            .unwrap_or(0);
34        let sequence = SPILL_SEQUENCE.fetch_add(1, Ordering::Relaxed);
35        let name = format!(
36            ".copc-writer-spill.{}.{}.{}.part",
37            std::process::id(),
38            nanos,
39            sequence
40        );
41        let path = spill_dir.join(name);
42        let file = OpenOptions::new()
43            .write(true)
44            .create_new(true)
45            .open(&path)
46            .map_err(|e| Error::io("create spill file", e))?;
47        let record_width = layout.record_width();
48        Ok(Self {
49            path,
50            file: Some(BufWriter::new(file)),
51            layout,
52            record_width,
53            scratch: vec![0u8; record_width],
54            count: 0,
55            bounds: None,
56            keep_file: false,
57        })
58    }
59
60    pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
61        serialize_le(record, &self.layout, &mut self.scratch);
62        let writer = self
63            .file
64            .as_mut()
65            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
66        writer
67            .write_all(&self.scratch)
68            .map_err(|e| Error::io("write spill record", e))?;
69        match self.bounds.as_mut() {
70            Some(bounds) => bounds.extend(record.x, record.y, record.z),
71            None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
72        }
73        self.count += 1;
74        Ok(())
75    }
76
77    pub fn count(&self) -> u64 {
78        self.count
79    }
80
81    pub fn finalize(mut self) -> Result<SpillReader> {
82        let mut writer = self
83            .file
84            .take()
85            .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
86        writer
87            .flush()
88            .map_err(|e| Error::io("flush spill writer", e))?;
89        let file = writer
90            .into_inner()
91            .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
92        file.sync_all()
93            .map_err(|e| Error::io("sync spill file", e))?;
94        self.keep_file = true;
95        let path = self.path.clone();
96        let count = usize::try_from(self.count)
97            .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
98        let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
99        SpillReader::open(path, self.layout, self.record_width, count, bounds)
100    }
101}
102
103impl Drop for SpillWriter {
104    fn drop(&mut self) {
105        if !self.keep_file {
106            let _ = std::fs::remove_file(&self.path);
107        }
108    }
109}
110
111/// Memory-mapped random-access view over a finalized spill file.
112pub struct SpillReader {
113    path: PathBuf,
114    _file: File,
115    mmap: Mmap,
116    layout: StreamingLayout,
117    record_width: usize,
118    count: usize,
119    bounds: Bounds,
120}
121
122impl SpillReader {
123    fn open(
124        path: PathBuf,
125        layout: StreamingLayout,
126        record_width: usize,
127        count: usize,
128        bounds: Bounds,
129    ) -> Result<Self> {
130        let file = File::open(&path).map_err(|e| Error::io("open spill for mmap", e))?;
131        let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
132        let expected = record_width
133            .checked_mul(count)
134            .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
135        if mmap.len() != expected {
136            return Err(Error::InvalidInput(format!(
137                "spill file is {} bytes, expected {}",
138                mmap.len(),
139                expected
140            )));
141        }
142        Ok(Self {
143            path,
144            _file: file,
145            mmap,
146            layout,
147            record_width,
148            count,
149            bounds,
150        })
151    }
152
153    pub fn len(&self) -> usize {
154        self.count
155    }
156
157    pub fn is_empty(&self) -> bool {
158        self.count == 0
159    }
160
161    pub fn layout(&self) -> StreamingLayout {
162        self.layout
163    }
164
165    pub fn bounds(&self) -> Bounds {
166        self.bounds
167    }
168
169    #[inline]
170    fn record_bytes(&self, index: usize) -> &[u8] {
171        let start = index * self.record_width;
172        &self.mmap[start..start + self.record_width]
173    }
174
175    #[inline]
176    pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
177        debug_assert!(index < self.count);
178        let bytes = self.record_bytes(index);
179        let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
180        let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
181        let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
182        (x, y, z)
183    }
184
185    pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
186        if index >= self.count {
187            return Err(Error::InvalidInput(format!(
188                "spill index {index} out of range (len {})",
189                self.count
190            )));
191        }
192        deserialize_le(self.record_bytes(index), &self.layout)
193            .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
194    }
195}
196
197impl Drop for SpillReader {
198    fn drop(&mut self) {
199        let _ = std::fs::remove_file(&self.path);
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    fn layout_with_color() -> StreamingLayout {
208        StreamingLayout {
209            point_format: 3,
210            has_gps: true,
211            has_color: true,
212            has_nir: false,
213            has_waveform: false,
214        }
215    }
216
217    fn record(seed: u32) -> LasPointRecord {
218        let f = f64::from(seed);
219        LasPointRecord {
220            x: f * 1.5,
221            y: -f * 2.25,
222            z: f * 0.125,
223            intensity: seed as u16,
224            return_number: (seed % 5) as u8,
225            number_of_returns: 5,
226            classification: (seed % 32) as u8,
227            scan_direction_flag: seed % 2 == 0,
228            edge_of_flight_line: seed % 3 == 0,
229            scan_angle: (seed as i16) - 100,
230            user_data: (seed % 256) as u8,
231            point_source_id: seed as u16,
232            synthetic: seed % 4 == 0,
233            key_point: seed % 4 == 1,
234            withheld: seed % 4 == 2,
235            overlap: false,
236            scan_channel: 0,
237            gps_time: 1.0e9 + f,
238            red: (seed * 7) as u16,
239            green: (seed * 11) as u16,
240            blue: (seed * 13) as u16,
241            nir: 0,
242            wave_packet_descriptor_index: 0,
243            byte_offset_to_waveform_data: 0,
244            waveform_packet_size: 0,
245            return_point_waveform_location: 0.0,
246        }
247    }
248
249    #[test]
250    fn spill_round_trips_records_and_bounds() {
251        let dir = tempfile::tempdir().unwrap();
252        let layout = layout_with_color();
253        let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
254        let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
255        for rec in &originals {
256            writer.push(rec).unwrap();
257        }
258        assert_eq!(writer.count(), 256);
259        let reader = writer.finalize().unwrap();
260        assert_eq!(reader.len(), 256);
261        for (i, original) in originals.iter().enumerate() {
262            assert_eq!(reader.record_at(i).unwrap(), *original);
263            assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
264        }
265        let bounds = reader.bounds();
266        assert_eq!(bounds.min, (0.0, -573.75, 0.0));
267        assert_eq!(bounds.max, (382.5, 0.0, 31.875));
268    }
269
270    #[test]
271    fn unfinalized_spill_writer_removes_file() {
272        let dir = tempfile::tempdir().unwrap();
273        let path = {
274            let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
275            writer.push(&record(1)).unwrap();
276            writer.path.clone()
277        };
278        assert!(!path.exists());
279    }
280
281    #[test]
282    fn finalized_spill_reader_removes_file() {
283        let dir = tempfile::tempdir().unwrap();
284        let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
285        writer.push(&record(1)).unwrap();
286        let reader = writer.finalize().unwrap();
287        let path = reader.path.clone();
288        assert!(path.exists());
289        drop(reader);
290        assert!(!path.exists());
291    }
292}