1use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use copc_core::{
10 deserialize_le, serialize_le, Bounds, Error, LasPointRecord, Result, StreamingLayout,
11};
12use memmap2::Mmap;
13
14static SPILL_SEQUENCE: AtomicU64 = AtomicU64::new(0);
15
16pub struct SpillWriter {
18 path: PathBuf,
19 file: Option<BufWriter<File>>,
20 layout: StreamingLayout,
21 record_width: usize,
22 scratch: Vec<u8>,
23 count: u64,
24 bounds: Option<Bounds>,
25 keep_file: bool,
26}
27
28impl SpillWriter {
29 pub fn create(spill_dir: &Path, layout: StreamingLayout) -> Result<Self> {
30 let nanos = SystemTime::now()
31 .duration_since(UNIX_EPOCH)
32 .map(|d| d.as_nanos())
33 .unwrap_or(0);
34 let sequence = SPILL_SEQUENCE.fetch_add(1, Ordering::Relaxed);
35 let name = format!(
36 ".copc-writer-spill.{}.{}.{}.part",
37 std::process::id(),
38 nanos,
39 sequence
40 );
41 let path = spill_dir.join(name);
42 let file = OpenOptions::new()
43 .write(true)
44 .create_new(true)
45 .open(&path)
46 .map_err(|e| Error::io("create spill file", e))?;
47 let record_width = layout.record_width();
48 Ok(Self {
49 path,
50 file: Some(BufWriter::new(file)),
51 layout,
52 record_width,
53 scratch: vec![0u8; record_width],
54 count: 0,
55 bounds: None,
56 keep_file: false,
57 })
58 }
59
60 pub fn push(&mut self, record: &LasPointRecord) -> Result<()> {
61 serialize_le(record, &self.layout, &mut self.scratch);
62 let writer = self
63 .file
64 .as_mut()
65 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
66 writer
67 .write_all(&self.scratch)
68 .map_err(|e| Error::io("write spill record", e))?;
69 match self.bounds.as_mut() {
70 Some(bounds) => bounds.extend(record.x, record.y, record.z),
71 None => self.bounds = Some(Bounds::point(record.x, record.y, record.z)),
72 }
73 self.count += 1;
74 Ok(())
75 }
76
77 pub fn count(&self) -> u64 {
78 self.count
79 }
80
81 pub fn finalize(mut self) -> Result<SpillReader> {
82 let mut writer = self
83 .file
84 .take()
85 .ok_or_else(|| Error::InvalidInput("spill writer already finalized".into()))?;
86 writer
87 .flush()
88 .map_err(|e| Error::io("flush spill writer", e))?;
89 let file = writer
90 .into_inner()
91 .map_err(|e| Error::io("unwrap spill writer", e.into_error()))?;
92 file.sync_all()
93 .map_err(|e| Error::io("sync spill file", e))?;
94 self.keep_file = true;
95 let path = self.path.clone();
96 let count = usize::try_from(self.count)
97 .map_err(|_| Error::InvalidInput("spill record count exceeds usize range".into()))?;
98 let bounds = self.bounds.unwrap_or_else(|| Bounds::point(0.0, 0.0, 0.0));
99 SpillReader::open(path, self.layout, self.record_width, count, bounds)
100 }
101}
102
103impl Drop for SpillWriter {
104 fn drop(&mut self) {
105 if !self.keep_file {
106 let _ = std::fs::remove_file(&self.path);
107 }
108 }
109}
110
111pub struct SpillReader {
113 path: PathBuf,
114 _file: File,
115 mmap: Mmap,
116 layout: StreamingLayout,
117 record_width: usize,
118 count: usize,
119 bounds: Bounds,
120}
121
122impl SpillReader {
123 fn open(
124 path: PathBuf,
125 layout: StreamingLayout,
126 record_width: usize,
127 count: usize,
128 bounds: Bounds,
129 ) -> Result<Self> {
130 let file = File::open(&path).map_err(|e| Error::io("open spill for mmap", e))?;
131 let mmap = unsafe { Mmap::map(&file) }.map_err(|e| Error::io("mmap spill file", e))?;
132 let expected = record_width
133 .checked_mul(count)
134 .ok_or_else(|| Error::InvalidInput("spill size overflow".into()))?;
135 if mmap.len() != expected {
136 return Err(Error::InvalidInput(format!(
137 "spill file is {} bytes, expected {}",
138 mmap.len(),
139 expected
140 )));
141 }
142 Ok(Self {
143 path,
144 _file: file,
145 mmap,
146 layout,
147 record_width,
148 count,
149 bounds,
150 })
151 }
152
153 pub fn len(&self) -> usize {
154 self.count
155 }
156
157 pub fn is_empty(&self) -> bool {
158 self.count == 0
159 }
160
161 pub fn layout(&self) -> StreamingLayout {
162 self.layout
163 }
164
165 pub fn bounds(&self) -> Bounds {
166 self.bounds
167 }
168
169 #[inline]
170 fn record_bytes(&self, index: usize) -> &[u8] {
171 let start = index * self.record_width;
172 &self.mmap[start..start + self.record_width]
173 }
174
175 #[inline]
176 pub fn xyz_at(&self, index: usize) -> (f64, f64, f64) {
177 debug_assert!(index < self.count);
178 let bytes = self.record_bytes(index);
179 let x = f64::from_le_bytes(bytes[0..8].try_into().expect("spill x width"));
180 let y = f64::from_le_bytes(bytes[8..16].try_into().expect("spill y width"));
181 let z = f64::from_le_bytes(bytes[16..24].try_into().expect("spill z width"));
182 (x, y, z)
183 }
184
185 pub fn record_at(&self, index: usize) -> Result<LasPointRecord> {
186 if index >= self.count {
187 return Err(Error::InvalidInput(format!(
188 "spill index {index} out of range (len {})",
189 self.count
190 )));
191 }
192 deserialize_le(self.record_bytes(index), &self.layout)
193 .map_err(|e| Error::InvalidData(format!("decode spill record {index}: {e}")))
194 }
195}
196
197impl Drop for SpillReader {
198 fn drop(&mut self) {
199 let _ = std::fs::remove_file(&self.path);
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 fn layout_with_color() -> StreamingLayout {
208 StreamingLayout {
209 point_format: 3,
210 has_gps: true,
211 has_color: true,
212 has_nir: false,
213 has_waveform: false,
214 }
215 }
216
217 fn record(seed: u32) -> LasPointRecord {
218 let f = f64::from(seed);
219 LasPointRecord {
220 x: f * 1.5,
221 y: -f * 2.25,
222 z: f * 0.125,
223 intensity: seed as u16,
224 return_number: (seed % 5) as u8,
225 number_of_returns: 5,
226 classification: (seed % 32) as u8,
227 scan_direction_flag: seed % 2 == 0,
228 edge_of_flight_line: seed % 3 == 0,
229 scan_angle: (seed as i16) - 100,
230 user_data: (seed % 256) as u8,
231 point_source_id: seed as u16,
232 synthetic: seed % 4 == 0,
233 key_point: seed % 4 == 1,
234 withheld: seed % 4 == 2,
235 overlap: false,
236 scan_channel: 0,
237 gps_time: 1.0e9 + f,
238 red: (seed * 7) as u16,
239 green: (seed * 11) as u16,
240 blue: (seed * 13) as u16,
241 nir: 0,
242 wave_packet_descriptor_index: 0,
243 byte_offset_to_waveform_data: 0,
244 waveform_packet_size: 0,
245 return_point_waveform_location: 0.0,
246 }
247 }
248
249 #[test]
250 fn spill_round_trips_records_and_bounds() {
251 let dir = tempfile::tempdir().unwrap();
252 let layout = layout_with_color();
253 let mut writer = SpillWriter::create(dir.path(), layout).unwrap();
254 let originals: Vec<LasPointRecord> = (0..256).map(record).collect();
255 for rec in &originals {
256 writer.push(rec).unwrap();
257 }
258 assert_eq!(writer.count(), 256);
259 let reader = writer.finalize().unwrap();
260 assert_eq!(reader.len(), 256);
261 for (i, original) in originals.iter().enumerate() {
262 assert_eq!(reader.record_at(i).unwrap(), *original);
263 assert_eq!(reader.xyz_at(i), (original.x, original.y, original.z));
264 }
265 let bounds = reader.bounds();
266 assert_eq!(bounds.min, (0.0, -573.75, 0.0));
267 assert_eq!(bounds.max, (382.5, 0.0, 31.875));
268 }
269
270 #[test]
271 fn unfinalized_spill_writer_removes_file() {
272 let dir = tempfile::tempdir().unwrap();
273 let path = {
274 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
275 writer.push(&record(1)).unwrap();
276 writer.path.clone()
277 };
278 assert!(!path.exists());
279 }
280
281 #[test]
282 fn finalized_spill_reader_removes_file() {
283 let dir = tempfile::tempdir().unwrap();
284 let mut writer = SpillWriter::create(dir.path(), layout_with_color()).unwrap();
285 writer.push(&record(1)).unwrap();
286 let reader = writer.finalize().unwrap();
287 let path = reader.path.clone();
288 assert!(path.exists());
289 drop(reader);
290 assert!(!path.exists());
291 }
292}