Skip to main content

copc_temporal/
temporal_cache.rs

1//! Incremental temporal index loading via async ByteSource.
2//!
3//! Loads temporal index pages on demand, pruning subtrees by time range
4//! before fetching child pages.
5
6use std::collections::HashMap;
7use std::io::Cursor;
8
9use byteorder::{LittleEndian, ReadBytesExt};
10use copc_streaming::{ByteSource, CopcStreamingReader, VoxelKey};
11
12use crate::TemporalError;
13use crate::gps_time::GpsTime;
14use crate::temporal_index::NodeTemporalEntry;
15
16/// Header of the temporal index EVLR (32 bytes).
17#[derive(Debug, Clone)]
18pub struct TemporalHeader {
19    /// Format version (must be 1).
20    pub version: u32,
21    /// Sampling stride — every N-th point is recorded.
22    pub stride: u32,
23    /// Total number of node entries across all pages.
24    pub node_count: u32,
25    /// Total number of pages.
26    pub page_count: u32,
27    /// Absolute file offset of the root page.
28    pub root_page_offset: u64,
29    /// Size of the root page in bytes.
30    pub root_page_size: u32,
31}
32
33#[derive(Debug, Clone)]
34struct PendingPage {
35    offset: u64,
36    size: u32,
37    subtree_time_min: f64,
38    subtree_time_max: f64,
39}
40
41/// Incrementally-loaded temporal index cache.
42pub struct TemporalCache {
43    header: Option<TemporalHeader>,
44    entries: HashMap<VoxelKey, NodeTemporalEntry>,
45    pending_pages: Vec<PendingPage>,
46    stride: u32,
47}
48
49impl Default for TemporalCache {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl TemporalCache {
56    /// Create an empty temporal cache.
57    pub fn new() -> Self {
58        Self {
59            header: None,
60            entries: HashMap::new(),
61            pending_pages: Vec::new(),
62            stride: 0,
63        }
64    }
65
66    /// Open the temporal index from a COPC reader.
67    ///
68    /// Loads the temporal header and root page. Returns `Ok(None)` if
69    /// no temporal EVLR exists in the file.
70    pub async fn from_reader<S: ByteSource>(
71        reader: &CopcStreamingReader<S>,
72    ) -> Result<Option<Self>, TemporalError> {
73        let mut cache = Self::new();
74        let found = cache
75            .load_header(reader.source(), reader.evlr_offset(), reader.evlr_count())
76            .await?;
77        if !found {
78            return Ok(None);
79        }
80        cache.load_root_page(reader.source()).await?;
81        Ok(Some(cache))
82    }
83
84    /// Scan EVLRs to find the temporal EVLR and read its header.
85    /// Returns false if no temporal EVLR exists.
86    pub async fn load_header(
87        &mut self,
88        source: &impl ByteSource,
89        evlr_offset: u64,
90        evlr_count: u32,
91    ) -> Result<bool, TemporalError> {
92        let mut pos = evlr_offset;
93
94        for _ in 0..evlr_count {
95            let hdr_data = source.read_range(pos, 60).await?;
96            let mut r = Cursor::new(hdr_data.as_slice());
97
98            // reserved (2)
99            r.set_position(2);
100            let mut user_id = [0u8; 16];
101            std::io::Read::read_exact(&mut r, &mut user_id)?;
102            let record_id = r.read_u16::<LittleEndian>()?;
103            let data_length = r.read_u64::<LittleEndian>()?;
104
105            let data_start = pos + 60;
106
107            let uid_end = user_id.iter().position(|&b| b == 0).unwrap_or(16);
108            let uid_str = std::str::from_utf8(&user_id[..uid_end]).unwrap_or("");
109
110            if uid_str == "copc_temporal" && record_id == 1000 {
111                let header_data = source.read_range(data_start, 32).await?;
112                let header = parse_temporal_header(&header_data)?;
113                self.stride = header.stride;
114                self.header = Some(header);
115                return Ok(true);
116            }
117
118            pos = data_start + data_length;
119        }
120
121        Ok(false)
122    }
123
124    /// Load the root temporal page.
125    pub async fn load_root_page(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
126        let header = self.header.as_ref().ok_or(TemporalError::TruncatedHeader)?;
127
128        let data = source
129            .read_range(header.root_page_offset, header.root_page_size as u64)
130            .await?;
131        self.parse_page(&data)?;
132        Ok(())
133    }
134
135    /// Load child pages that overlap a time range, pruning by subtree bounds.
136    pub async fn load_pages_for_time_range(
137        &mut self,
138        source: &impl ByteSource,
139        start: GpsTime,
140        end: GpsTime,
141    ) -> Result<(), TemporalError> {
142        loop {
143            let matching: Vec<PendingPage> = self
144                .pending_pages
145                .iter()
146                .filter(|p| p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0)
147                .cloned()
148                .collect();
149
150            if matching.is_empty() {
151                break;
152            }
153
154            self.pending_pages
155                .retain(|p| !(p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0));
156
157            let ranges: Vec<_> = matching.iter().map(|p| (p.offset, p.size as u64)).collect();
158            let results = source.read_ranges(&ranges).await?;
159
160            for data in &results {
161                self.parse_page(data)?;
162            }
163        }
164
165        Ok(())
166    }
167
168    /// Load ALL pending pages.
169    pub async fn load_all_pages(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
170        while !self.pending_pages.is_empty() {
171            let pages: Vec<PendingPage> = self.pending_pages.drain(..).collect();
172            let ranges: Vec<_> = pages.iter().map(|p| (p.offset, p.size as u64)).collect();
173            let results = source.read_ranges(&ranges).await?;
174
175            for data in &results {
176                self.parse_page(data)?;
177            }
178        }
179        Ok(())
180    }
181
182    /// Load relevant pages and return all nodes that overlap a time range.
183    ///
184    /// This is the primary query method — it ensures the right pages are loaded
185    /// before returning results. Equivalent to calling `load_pages_for_time_range`
186    /// followed by `nodes_in_range`, but cannot return incomplete results.
187    pub async fn query(
188        &mut self,
189        source: &impl ByteSource,
190        start: GpsTime,
191        end: GpsTime,
192    ) -> Result<Vec<&NodeTemporalEntry>, TemporalError> {
193        self.load_pages_for_time_range(source, start, end).await?;
194        Ok(self.nodes_in_range(start, end))
195    }
196
197    /// Look up the temporal entry for a node.
198    pub fn get(&self, key: &VoxelKey) -> Option<&NodeTemporalEntry> {
199        self.entries.get(key)
200    }
201
202    /// Return all loaded nodes whose time range overlaps `[start, end]`.
203    pub fn nodes_in_range(&self, start: GpsTime, end: GpsTime) -> Vec<&NodeTemporalEntry> {
204        self.entries
205            .values()
206            .filter(|e| e.overlaps(start, end))
207            .collect()
208    }
209
210    /// The sampling stride (every N-th point is recorded in the index).
211    pub fn stride(&self) -> u32 {
212        self.stride
213    }
214
215    /// The parsed temporal index header, if loaded.
216    pub fn header(&self) -> Option<&TemporalHeader> {
217        self.header.as_ref()
218    }
219
220    /// Number of loaded node entries.
221    pub fn len(&self) -> usize {
222        self.entries.len()
223    }
224
225    /// Whether no node entries have been loaded.
226    pub fn is_empty(&self) -> bool {
227        self.entries.is_empty()
228    }
229
230    /// Iterate all loaded entries.
231    pub fn iter(&self) -> impl Iterator<Item = (&VoxelKey, &NodeTemporalEntry)> {
232        self.entries.iter()
233    }
234
235    // --- High-level queries ---
236
237    /// Query points by time range and spatial bounds.
238    ///
239    /// Loads the relevant hierarchy and temporal pages, fetches matching
240    /// chunks, and returns only the points that fall inside both the time
241    /// window and the bounding box.
242    ///
243    /// ```rust,ignore
244    /// let points = temporal.query_points(
245    ///     &mut reader, &query_box, start, end,
246    /// ).await?;
247    /// ```
248    pub async fn query_points<S: ByteSource>(
249        &mut self,
250        reader: &mut CopcStreamingReader<S>,
251        bounds: &copc_streaming::Aabb,
252        start: GpsTime,
253        end: GpsTime,
254    ) -> Result<Vec<las::Point>, TemporalError> {
255        // Load spatial hierarchy for the region.
256        reader
257            .load_hierarchy_for_bounds(bounds)
258            .await
259            .map_err(TemporalError::Copc)?;
260
261        // Load temporal pages that overlap the time range.
262        self.load_pages_for_time_range(reader.source(), start, end)
263            .await?;
264
265        let root_bounds = reader.copc_info().root_bounds();
266        let stride = self.stride;
267
268        // Collect matching (key, point_range) pairs.
269        let matches: Vec<_> = self
270            .nodes_in_range(start, end)
271            .into_iter()
272            .filter(|e| e.key.bounds(&root_bounds).intersects(bounds))
273            .filter_map(|e| {
274                let hier = reader.get(&e.key)?;
275                let range = e.estimate_point_range(start, end, stride, hier.point_count);
276                if range.is_empty() {
277                    return None;
278                }
279                Some((e.key, range))
280            })
281            .collect();
282
283        let mut all_points = Vec::new();
284        for (key, range) in matches {
285            let chunk = reader
286                .fetch_chunk(&key)
287                .await
288                .map_err(TemporalError::Copc)?;
289            let points = reader
290                .read_points_range_in_bounds(&chunk, range, bounds)
291                .map_err(TemporalError::Copc)?;
292            let points = crate::filter_points_by_time(points, start, end);
293            all_points.extend(points);
294        }
295        Ok(all_points)
296    }
297
298    /// Query points by time range only (no spatial filtering).
299    ///
300    /// Loads all hierarchy pages and temporal pages that overlap the time
301    /// range, then returns points within the time window.
302    pub async fn query_points_by_time<S: ByteSource>(
303        &mut self,
304        reader: &mut CopcStreamingReader<S>,
305        start: GpsTime,
306        end: GpsTime,
307    ) -> Result<Vec<las::Point>, TemporalError> {
308        reader
309            .load_all_hierarchy()
310            .await
311            .map_err(TemporalError::Copc)?;
312
313        self.load_pages_for_time_range(reader.source(), start, end)
314            .await?;
315
316        let stride = self.stride;
317
318        let matches: Vec<_> = self
319            .nodes_in_range(start, end)
320            .into_iter()
321            .filter_map(|e| {
322                let hier = reader.get(&e.key)?;
323                let range = e.estimate_point_range(start, end, stride, hier.point_count);
324                if range.is_empty() {
325                    return None;
326                }
327                Some((e.key, range))
328            })
329            .collect();
330
331        let mut all_points = Vec::new();
332        for (key, range) in matches {
333            let chunk = reader
334                .fetch_chunk(&key)
335                .await
336                .map_err(TemporalError::Copc)?;
337            let points = reader
338                .read_points_range(&chunk, range)
339                .map_err(TemporalError::Copc)?;
340            let points = crate::filter_points_by_time(points, start, end);
341            all_points.extend(points);
342        }
343        Ok(all_points)
344    }
345
346    fn parse_page(&mut self, data: &[u8]) -> Result<(), TemporalError> {
347        let mut r = Cursor::new(data);
348
349        while (r.position() as usize) < data.len() {
350            if r.position() as usize + 20 > data.len() {
351                break;
352            }
353
354            let level = r.read_i32::<LittleEndian>()?;
355            let x = r.read_i32::<LittleEndian>()?;
356            let y = r.read_i32::<LittleEndian>()?;
357            let z = r.read_i32::<LittleEndian>()?;
358            let sample_count = r.read_u32::<LittleEndian>()?;
359
360            let key = VoxelKey { level, x, y, z };
361
362            if sample_count == 0 {
363                // Page pointer: 28 more bytes
364                let child_offset = r.read_u64::<LittleEndian>()?;
365                let child_size = r.read_u32::<LittleEndian>()?;
366                let time_min = r.read_f64::<LittleEndian>()?;
367                let time_max = r.read_f64::<LittleEndian>()?;
368
369                self.pending_pages.push(PendingPage {
370                    offset: child_offset,
371                    size: child_size,
372                    subtree_time_min: time_min,
373                    subtree_time_max: time_max,
374                });
375            } else {
376                let mut samples = Vec::with_capacity(sample_count as usize);
377                for _ in 0..sample_count {
378                    samples.push(GpsTime(r.read_f64::<LittleEndian>()?));
379                }
380
381                self.entries
382                    .insert(key, NodeTemporalEntry::new(key, samples));
383            }
384        }
385
386        Ok(())
387    }
388}
389
390fn parse_temporal_header(data: &[u8]) -> Result<TemporalHeader, TemporalError> {
391    if data.len() < 32 {
392        return Err(TemporalError::TruncatedHeader);
393    }
394    let mut r = Cursor::new(data);
395    let version = r.read_u32::<LittleEndian>()?;
396    if version != 1 {
397        return Err(TemporalError::UnsupportedVersion(version));
398    }
399    let stride = r.read_u32::<LittleEndian>()?;
400    if stride < 1 {
401        return Err(TemporalError::InvalidStride(stride));
402    }
403    let node_count = r.read_u32::<LittleEndian>()?;
404    let page_count = r.read_u32::<LittleEndian>()?;
405    let root_page_offset = r.read_u64::<LittleEndian>()?;
406    let root_page_size = r.read_u32::<LittleEndian>()?;
407    let _reserved = r.read_u32::<LittleEndian>()?;
408
409    Ok(TemporalHeader {
410        version,
411        stride,
412        node_count,
413        page_count,
414        root_page_offset,
415        root_page_size,
416    })
417}