1use std::fs::File;
4use std::io::{BufWriter, Write};
5use std::path::Path;
6#[cfg(test)]
7use std::path::PathBuf;
8
9use copc_core::{
10 deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13use tempfile::{NamedTempFile, TempPath};
14
15pub struct SpillWriter {
17 #[cfg(test)]
18 path: PathBuf,
19 file: Option<BufWriter<NamedTempFile>>,
20 layout: StreamingLayout,
21 record_width: usize,
22 scratch: Vec<u8>,
23 count: u64,
24 bounds: Option<Bounds>,
25}
26
27impl SpillWriter {
28 pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
29 let file = tempfile::Builder::new()
30 .prefix(".copc-writer-spill.")
31 .suffix(".part")
32 .tempfile_in(spill_dir)
33 .map_err(|e| Error::io("create spill file", e))?;
34 #[cfg(test)]
35 let path = file.path().to_path_buf();
36 let record_width = layout.record_width();
37 Ok(Self {
38 #[cfg(test)]
39 path,
40 file: Some(BufWriter::new(file)),
41 layout,
42 record_width,
43 scratch: vec![0u8; record_width],
44 count: 0,
45 bounds: None,
46 })
47 }
48
49 pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
50 serialize_le(record, &self.layout, &mut self.scratch);
51 let writer = self
52 .file
53 .as_mut()
54 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
55 writer
56 .write_all(&self.scratch)
57 .map_err(|e| Error::io("write spill record", e))?;
58 match self.bounds.as_mut() {
59 Some(bounds) => bounds.extend(record.x, record.y, record.z),
60 None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
61 }
62 self.count += 1;
63 Ok(())
64 }
65
66 pub fn count(&self) -> u64 {
67 self.count
68 }
69
70 pub fn finalize(mut self) -> Result<SpillReader> {
71 let mut writer = self
72 .file
73 .take()
74 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
75 writer
76 .flush()
77 .map_err(|e| Error::io("flush spill writer", e))?;
78 let file = writer
79 .into_inner()
80 .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
81 file.as_file()
82 .sync_all()
83 .map_err(|e| Error::io("sync spill file", e))?;
84 let mmap_file = file
85 .reopen()
86 .map_err(|e| Error::io("open spill for mmap", e))?;
87 let temp_path = file.into_temp_path();
88 let count = usize::try_from(self.count)
89 .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
90 let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
91 SpillReader::open(
92 temp_path,
93 mmap_file,
94 self.layout,
95 self.record_width,
96 count,
97 bounds,
98 )
99 }
100}
101
102pub struct SpillReader {
104 #[cfg(test)]
105 path: PathBuf,
106 mmap: Mmap,
107 _file: File,
108 _path: TempPath,
109 layout: StreamingLayout,
110 record_width: usize,
111 count: usize,
112 bounds: Bounds,
113}
114
115impl SpillReader {
116 fn open(
117 temp_path: TempPath,
118 file: File,
119 layout: StreamingLayout,
120 record_width: usize,
121 count: usize,
122 bounds: Bounds,
123 ) -> Result<Self> {
124 #[cfg(test)]
125 let path = temp_path.to_path_buf();
126 let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
127 let expected = record_width
128 .checked_mul(count)
129 .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
130 if mmap.len() != expected {
131 return Err(Error::InvalidInput(format!(
132 "spill file is {} bytes, expected {}",
133 mmap.len(),
134 expected
135 )));
136 }
137 Ok(Self {
138 #[cfg(test)]
139 path,
140 mmap,
141 _file: file,
142 _path: temp_path,
143 layout,
144 record_width,
145 count,
146 bounds,
147 })
148 }
149
150 pub fn len(&self) -> usize {
151 self.count
152 }
153
154 pub fn is_empty(&self) -> bool {
155 self.count == 0
156 }
157
158 pub fn layout(&self) -> StreamingLayout {
159 self.layout
160 }
161
162 pub fn bounds(&self) -> Bounds {
163 self.bounds
164 }
165
166 #[inline]
167 fn record_bytes(&self, index: usize) -> &[u8] {
168 let start = index * self.record_width;
169 &self.mmap[start..start + self.record_width]
170 }
171
172 #[inline]
173 pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
174 debug_assert!(index < self.count);
175 let bytes = self.record_bytes(index);
176 let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
177 let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
178 let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
179 (x, y, z)
180 }
181
182 pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
183 if index >= self.count {
184 return Err(Error::InvalidInput(format!(
185 "spill index {index} out of range (len {})",
186 self.count
187 )));
188 }
189 deserialize_le(self.record_bytes(index), &self.layout)
190 .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 fn layout_with_color() -> StreamingLayout {
199 StreamingLayout {
200 point_format: 3,
201 has_gps: true,
202 has_color: true,
203 has_nir: false,
204 has_waveform: false,
205 }
206 }
207
208 fn record(seed: u32) -> LasPointRecord {
209 let f = f64::from(seed);
210 LasPointRecord {
211 x: f * 1.5,
212 y: -f * 2.25,
213 z: f * 0.125,
214 intensity: seed as u16,
215 return_number: (seed % 5) as u8,
216 number_of_returns: 5,
217 classification: (seed % 32) as u8,
218 scan_direction_flag: seed % 2 == 0,
219 edge_of_flight_line: seed % 3 == 0,
220 scan_angle: (seed as f32) - 100.25,
221 user_data: (seed % 256) as u8,
222 point_source_id: seed as u16,
223 synthetic: seed % 4 == 0,
224 key_point: seed % 4 == 1,
225 withheld: seed % 4 == 2,
226 overlap: false,
227 scan_channel: 0,
228 gps_time: 1.0e9 + f,
229 red: (seed * 7) as u16,
230 green: (seed * 11) as u16,
231 blue: (seed * 13) as u16,
232 nir: 0,
233 wave_packet_descriptor_index: 0,
234 byte_offset_to_waveform_data: 0,
235 waveform_packet_size: 0,
236 return_point_waveform_location: 0.0,
237 }
238 }
239
240 #[test]
241 fn spill_round_trips_records_and_bounds() {
242 let dir = tempfile::tempdir().unwrap();
243 let layout = layout_with_color();
244 let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
245 let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
246 for rec in &originals {
247 writer.push(rec).unwrap();
248 }
249 assert_eq!(writer.count(), 256);
250 let reader = writer.finalize().unwrap();
251 assert_eq!(reader.len(), 256);
252 for (i, original) in originals.iter().enumerate() {
253 assert_eq!(reader.record_at(i).unwrap(), *original);
254 assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
255 }
256 let bounds = reader.bounds();
257 assert_eq!(bounds.min, (0.0, -573.75, 0.0));
258 assert_eq!(bounds.max, (382.5, 0.0, 31.875));
259 }
260
261 #[test]
262 fn unfinalized_spill_writer_removes_file() {
263 let dir = tempfile::tempdir().unwrap();
264 let path = {
265 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
266 writer.push(&record(1)).unwrap();
267 writer.path.clone()
268 };
269 assert!(!path.exists());
270 }
271
272 #[test]
273 fn finalized_spill_reader_removes_file() {
274 let dir = tempfile::tempdir().unwrap();
275 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
276 writer.push(&record(1)).unwrap();
277 let reader = writer.finalize().unwrap();
278 let path = reader.path.clone();
279 assert!(path.exists());
280 drop(reader);
281 assert!(!path.exists());
282 }
283
284 #[cfg(unix)]
285 #[test]
286 fn spill_file_is_private_on_unix() {
287 use std::os::unix::fs::PermissionsExt;
288
289 let dir = tempfile::tempdir().unwrap();
290 let writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
291 let mode = std::fs::metadata(&writer.path)
292 .unwrap()
293 .permissions()
294 .mode()
295 & 0o777;
296 assert_eq!(mode, 0o600);
297 }
298}