1use std::fs::File;
4use std::io::{BufWriter, Write};
5use std::path::Path;
6#[cfg(test)]
7use std::path::PathBuf;
8
9use copc_core::{
10 deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13use tempfile::{NamedTempFile, TempPath};
14
15const SPILL_IO_BUFFER_BYTES: usize = 1024 * 1024;
16
17pub struct SpillWriter {
19 #[cfg(test)]
20 path: PathBuf,
21 file: Option<BufWriter<NamedTempFile>>,
22 layout: StreamingLayout,
23 record_width: usize,
24 scratch: Vec<u8>,
25 count: u64,
26 bounds: Option<Bounds>,
27}
28
29impl SpillWriter {
30 pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
31 let file = tempfile::Builder::new()
32 .prefix(".copc-writer-spill.")
33 .suffix(".part")
34 .tempfile_in(spill_dir)
35 .map_err(|e| Error::io("create spill file", e))?;
36 #[cfg(test)]
37 let path = file.path().to_path_buf();
38 let record_width = layout.record_width();
39 Ok(Self {
40 #[cfg(test)]
41 path,
42 file: Some(BufWriter::with_capacity(SPILL_IO_BUFFER_BYTES, file)),
43 layout,
44 record_width,
45 scratch: vec![0u8; record_width],
46 count: 0,
47 bounds: None,
48 })
49 }
50
51 pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
52 serialize_le(record, &self.layout, &mut self.scratch)
53 .map_err(|e| Error::InvalidInput(format!("encode spill record: {e}")))?;
54 let writer = self
55 .file
56 .as_mut()
57 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
58 writer
59 .write_all(&self.scratch)
60 .map_err(|e| Error::io("write spill record", e))?;
61 match self.bounds.as_mut() {
62 Some(bounds) => bounds.extend(record.x, record.y, record.z),
63 None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
64 }
65 self.count += 1;
66 Ok(())
67 }
68
69 pub fn count(&self) -> u64 {
70 self.count
71 }
72
73 pub fn finalize(mut self) -> Result<SpillReader> {
74 let mut writer = self
75 .file
76 .take()
77 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
78 writer
79 .flush()
80 .map_err(|e| Error::io("flush spill writer", e))?;
81 let file = writer
82 .into_inner()
83 .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
84 file.as_file()
85 .sync_all()
86 .map_err(|e| Error::io("sync spill file", e))?;
87 let mmap_file = file
88 .reopen()
89 .map_err(|e| Error::io("open spill for mmap", e))?;
90 let temp_path = file.into_temp_path();
91 let count = usize::try_from(self.count)
92 .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
93 let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
94 SpillReader::open(
95 temp_path,
96 mmap_file,
97 self.layout,
98 self.record_width,
99 count,
100 bounds,
101 )
102 }
103}
104
105pub struct SpillReader {
107 #[cfg(test)]
108 path: PathBuf,
109 mmap: Mmap,
110 _file: File,
111 _path: TempPath,
112 layout: StreamingLayout,
113 record_width: usize,
114 count: usize,
115 bounds: Bounds,
116}
117
118impl SpillReader {
119 fn open(
120 temp_path: TempPath,
121 file: File,
122 layout: StreamingLayout,
123 record_width: usize,
124 count: usize,
125 bounds: Bounds,
126 ) -> Result<Self> {
127 #[cfg(test)]
128 let path = temp_path.to_path_buf();
129 let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
130 let expected = record_width
131 .checked_mul(count)
132 .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
133 if mmap.len() != expected {
134 return Err(Error::InvalidInput(format!(
135 "spill file is {} bytes, expected {}",
136 mmap.len(),
137 expected
138 )));
139 }
140 Ok(Self {
141 #[cfg(test)]
142 path,
143 mmap,
144 _file: file,
145 _path: temp_path,
146 layout,
147 record_width,
148 count,
149 bounds,
150 })
151 }
152
153 pub fn len(&self) -> usize {
154 self.count
155 }
156
157 pub fn is_empty(&self) -> bool {
158 self.count == 0
159 }
160
161 pub fn layout(&self) -> &StreamingLayout {
162 &self.layout
163 }
164
165 pub fn bounds(&self) -> Bounds {
166 self.bounds
167 }
168
169 #[inline]
170 fn record_bytes(&self, index: usize) -> &[u8] {
171 let start = index * self.record_width;
172 &self.mmap[start..start + self.record_width]
173 }
174
175 #[inline]
176 pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
177 debug_assert!(index < self.count);
178 let bytes = self.record_bytes(index);
179 let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
180 let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
181 let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
182 (x, y, z)
183 }
184
185 pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
186 if index >= self.count {
187 return Err(Error::InvalidInput(format!(
188 "spill index {index} out of range (len {})",
189 self.count
190 )));
191 }
192 deserialize_le(self.record_bytes(index), &self.layout)
193 .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 fn layout_with_color() -> StreamingLayout {
202 StreamingLayout {
203 point_format: 3,
204 has_gps: true,
205 has_color: true,
206 has_nir: false,
207 has_waveform: false,
208 extra_bytes: 2,
209 extra_bytes_descriptors: Vec::new(),
210 }
211 }
212
213 fn record(seed: u32) -> LasPointRecord {
214 let f = f64::from(seed);
215 LasPointRecord {
216 x: f * 1.5,
217 y: -f * 2.25,
218 z: f * 0.125,
219 intensity: seed as u16,
220 return_number: (seed % 5) as u8,
221 number_of_returns: 5,
222 classification: (seed % 32) as u8,
223 scan_direction_flag: seed % 2 == 0,
224 edge_of_flight_line: seed % 3 == 0,
225 scan_angle: (seed as f32) - 100.25,
226 user_data: (seed % 256) as u8,
227 point_source_id: seed as u16,
228 synthetic: seed % 4 == 0,
229 key_point: seed % 4 == 1,
230 withheld: seed % 4 == 2,
231 overlap: false,
232 scan_channel: 0,
233 gps_time: 1.0e9 + f,
234 red: (seed * 7) as u16,
235 green: (seed * 11) as u16,
236 blue: (seed * 13) as u16,
237 nir: 0,
238 wave_packet_descriptor_index: 0,
239 byte_offset_to_waveform_data: 0,
240 waveform_packet_size: 0,
241 return_point_waveform_location: 0.0,
242 extra_bytes: vec![(seed & 0xff) as u8, ((seed >> 8) & 0xff) as u8],
243 }
244 }
245
246 #[test]
247 fn spill_round_trips_records_and_bounds() {
248 let dir = tempfile::tempdir().unwrap();
249 let layout = layout_with_color();
250 let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
251 let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
252 for rec in &originals {
253 writer.push(rec).unwrap();
254 }
255 assert_eq!(writer.count(), 256);
256 let reader = writer.finalize().unwrap();
257 assert_eq!(reader.len(), 256);
258 for (i, original) in originals.iter().enumerate() {
259 assert_eq!(reader.record_at(i).unwrap(), *original);
260 assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
261 }
262 let bounds = reader.bounds();
263 assert_eq!(bounds.min, (0.0, -573.75, 0.0));
264 assert_eq!(bounds.max, (382.5, 0.0, 31.875));
265 }
266
267 #[test]
268 fn unfinalized_spill_writer_removes_file() {
269 let dir = tempfile::tempdir().unwrap();
270 let path = {
271 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
272 writer.push(&record(1)).unwrap();
273 writer.path.clone()
274 };
275 assert!(!path.exists());
276 }
277
278 #[test]
279 fn finalized_spill_reader_removes_file() {
280 let dir = tempfile::tempdir().unwrap();
281 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
282 writer.push(&record(1)).unwrap();
283 let reader = writer.finalize().unwrap();
284 let path = reader.path.clone();
285 assert!(path.exists());
286 drop(reader);
287 assert!(!path.exists());
288 }
289
290 #[cfg(unix)]
291 #[test]
292 fn spill_file_is_private_on_unix() {
293 use std::os::unix::fs::PermissionsExt;
294
295 let dir = tempfile::tempdir().unwrap();
296 let writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
297 let mode = std::fs::metadata(&writer.path)
298 .unwrap()
299 .permissions()
300 .mode()
301 & 0o777;
302 assert_eq!(mode, 0o600);
303 }
304}