Skip to main content

copc_temporal/
temporal_cache.rs

1//! Incremental temporal index loading via async ByteSource.
2//!
3//! Loads temporal index pages on demand, pruning subtrees by time range
4//! before fetching child pages.
5
6use std::collections::HashMap;
7use std::io::Cursor;
8
9use byteorder::{LittleEndian, ReadBytesExt};
10use copc_streaming::{ByteSource, CopcStreamingReader, VoxelKey};
11
12use crate::TemporalError;
13use crate::gps_time::GpsTime;
14use crate::temporal_index::NodeTemporalEntry;
15
16/// Header of the temporal index EVLR (32 bytes).
17#[derive(Debug, Clone)]
18pub struct TemporalHeader {
19    /// Format version (must be 1).
20    pub version: u32,
21    /// Sampling stride — every N-th point is recorded.
22    pub stride: u32,
23    /// Total number of node entries across all pages.
24    pub node_count: u32,
25    /// Total number of pages.
26    pub page_count: u32,
27    /// Absolute file offset of the root page.
28    pub root_page_offset: u64,
29    /// Size of the root page in bytes.
30    pub root_page_size: u32,
31}
32
33#[derive(Debug, Clone)]
34struct PendingPage {
35    offset: u64,
36    size: u32,
37    subtree_time_min: f64,
38    subtree_time_max: f64,
39}
40
41/// Incrementally-loaded temporal index cache.
42pub struct TemporalCache {
43    header: Option<TemporalHeader>,
44    entries: HashMap<VoxelKey, NodeTemporalEntry>,
45    pending_pages: Vec<PendingPage>,
46    stride: u32,
47}
48
49impl Default for TemporalCache {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl TemporalCache {
56    /// Create an empty temporal cache.
57    pub fn new() -> Self {
58        Self {
59            header: None,
60            entries: HashMap::new(),
61            pending_pages: Vec::new(),
62            stride: 0,
63        }
64    }
65
66    /// Open the temporal index from a COPC reader.
67    ///
68    /// Loads the temporal header and root page. Returns `Ok(None)` if
69    /// no temporal EVLR exists in the file.
70    pub async fn from_reader<S: ByteSource>(
71        reader: &CopcStreamingReader<S>,
72    ) -> Result<Option<Self>, TemporalError> {
73        let mut cache = Self::new();
74        let found = cache
75            .load_header(reader.source(), reader.evlr_offset(), reader.evlr_count())
76            .await?;
77        if !found {
78            return Ok(None);
79        }
80        cache.load_root_page(reader.source()).await?;
81        Ok(Some(cache))
82    }
83
84    /// Scan EVLRs to find the temporal EVLR and read its header.
85    /// Returns false if no temporal EVLR exists.
86    pub async fn load_header(
87        &mut self,
88        source: &impl ByteSource,
89        evlr_offset: u64,
90        evlr_count: u32,
91    ) -> Result<bool, TemporalError> {
92        let mut pos = evlr_offset;
93
94        for _ in 0..evlr_count {
95            let hdr_data = source.read_range(pos, 60).await?;
96            let mut r = Cursor::new(hdr_data.as_slice());
97
98            // reserved (2)
99            r.set_position(2);
100            let mut user_id = [0u8; 16];
101            std::io::Read::read_exact(&mut r, &mut user_id)?;
102            let record_id = r.read_u16::<LittleEndian>()?;
103            let data_length = r.read_u64::<LittleEndian>()?;
104
105            let data_start = pos + 60;
106
107            let uid_end = user_id.iter().position(|&b| b == 0).unwrap_or(16);
108            let uid_str = std::str::from_utf8(&user_id[..uid_end]).unwrap_or("");
109
110            if uid_str == "copc_temporal" && record_id == 1000 {
111                let header_data = source.read_range(data_start, 32).await?;
112                let header = parse_temporal_header(&header_data)?;
113                self.stride = header.stride;
114                self.header = Some(header);
115                return Ok(true);
116            }
117
118            pos = data_start + data_length;
119        }
120
121        Ok(false)
122    }
123
124    /// Load the root temporal page.
125    pub async fn load_root_page(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
126        let header = self.header.as_ref().ok_or(TemporalError::TruncatedHeader)?;
127
128        let data = source
129            .read_range(header.root_page_offset, header.root_page_size as u64)
130            .await?;
131        self.parse_page(&data)?;
132        Ok(())
133    }
134
135    /// Load child pages that overlap a time range, pruning by subtree bounds.
136    pub async fn load_pages_for_time_range(
137        &mut self,
138        source: &impl ByteSource,
139        start: GpsTime,
140        end: GpsTime,
141    ) -> Result<(), TemporalError> {
142        loop {
143            let matching: Vec<PendingPage> = self
144                .pending_pages
145                .iter()
146                .filter(|p| p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0)
147                .cloned()
148                .collect();
149
150            if matching.is_empty() {
151                break;
152            }
153
154            self.pending_pages
155                .retain(|p| !(p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0));
156
157            let ranges: Vec<_> = matching.iter().map(|p| (p.offset, p.size as u64)).collect();
158            let results = source.read_ranges(&ranges).await?;
159
160            for data in &results {
161                self.parse_page(data)?;
162            }
163        }
164
165        Ok(())
166    }
167
168    /// Load ALL pending pages.
169    pub async fn load_all_pages(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
170        while !self.pending_pages.is_empty() {
171            let pages: Vec<PendingPage> = self.pending_pages.drain(..).collect();
172            let ranges: Vec<_> = pages.iter().map(|p| (p.offset, p.size as u64)).collect();
173            let results = source.read_ranges(&ranges).await?;
174
175            for data in &results {
176                self.parse_page(data)?;
177            }
178        }
179        Ok(())
180    }
181
182    /// Load relevant pages and return all nodes that overlap a time range.
183    ///
184    /// This is the primary query method — it ensures the right pages are loaded
185    /// before returning results. Equivalent to calling `load_pages_for_time_range`
186    /// followed by `nodes_in_range`, but cannot return incomplete results.
187    pub async fn query(
188        &mut self,
189        source: &impl ByteSource,
190        start: GpsTime,
191        end: GpsTime,
192    ) -> Result<Vec<&NodeTemporalEntry>, TemporalError> {
193        self.load_pages_for_time_range(source, start, end).await?;
194        Ok(self.nodes_in_range(start, end))
195    }
196
197    /// Look up the temporal entry for a node.
198    pub fn get(&self, key: &VoxelKey) -> Option<&NodeTemporalEntry> {
199        self.entries.get(key)
200    }
201
202    /// Return all loaded nodes whose time range overlaps `[start, end]`.
203    pub fn nodes_in_range(&self, start: GpsTime, end: GpsTime) -> Vec<&NodeTemporalEntry> {
204        self.entries
205            .values()
206            .filter(|e| e.overlaps(start, end))
207            .collect()
208    }
209
210    /// The sampling stride (every N-th point is recorded in the index).
211    pub fn stride(&self) -> u32 {
212        self.stride
213    }
214
215    /// The parsed temporal index header, if loaded.
216    pub fn header(&self) -> Option<&TemporalHeader> {
217        self.header.as_ref()
218    }
219
220    /// Number of loaded node entries.
221    pub fn len(&self) -> usize {
222        self.entries.len()
223    }
224
225    /// Whether no node entries have been loaded.
226    pub fn is_empty(&self) -> bool {
227        self.entries.is_empty()
228    }
229
230    /// Iterate all loaded entries.
231    pub fn iter(&self) -> impl Iterator<Item = (&VoxelKey, &NodeTemporalEntry)> {
232        self.entries.iter()
233    }
234
235    fn parse_page(&mut self, data: &[u8]) -> Result<(), TemporalError> {
236        let mut r = Cursor::new(data);
237
238        while (r.position() as usize) < data.len() {
239            if r.position() as usize + 20 > data.len() {
240                break;
241            }
242
243            let level = r.read_i32::<LittleEndian>()?;
244            let x = r.read_i32::<LittleEndian>()?;
245            let y = r.read_i32::<LittleEndian>()?;
246            let z = r.read_i32::<LittleEndian>()?;
247            let sample_count = r.read_u32::<LittleEndian>()?;
248
249            let key = VoxelKey { level, x, y, z };
250
251            if sample_count == 0 {
252                // Page pointer: 28 more bytes
253                let child_offset = r.read_u64::<LittleEndian>()?;
254                let child_size = r.read_u32::<LittleEndian>()?;
255                let time_min = r.read_f64::<LittleEndian>()?;
256                let time_max = r.read_f64::<LittleEndian>()?;
257
258                self.pending_pages.push(PendingPage {
259                    offset: child_offset,
260                    size: child_size,
261                    subtree_time_min: time_min,
262                    subtree_time_max: time_max,
263                });
264            } else {
265                let mut samples = Vec::with_capacity(sample_count as usize);
266                for _ in 0..sample_count {
267                    samples.push(GpsTime(r.read_f64::<LittleEndian>()?));
268                }
269
270                self.entries
271                    .insert(key, NodeTemporalEntry::new(key, samples));
272            }
273        }
274
275        Ok(())
276    }
277}
278
279fn parse_temporal_header(data: &[u8]) -> Result<TemporalHeader, TemporalError> {
280    if data.len() < 32 {
281        return Err(TemporalError::TruncatedHeader);
282    }
283    let mut r = Cursor::new(data);
284    let version = r.read_u32::<LittleEndian>()?;
285    if version != 1 {
286        return Err(TemporalError::UnsupportedVersion(version));
287    }
288    let stride = r.read_u32::<LittleEndian>()?;
289    if stride < 1 {
290        return Err(TemporalError::InvalidStride(stride));
291    }
292    let node_count = r.read_u32::<LittleEndian>()?;
293    let page_count = r.read_u32::<LittleEndian>()?;
294    let root_page_offset = r.read_u64::<LittleEndian>()?;
295    let root_page_size = r.read_u32::<LittleEndian>()?;
296    let _reserved = r.read_u32::<LittleEndian>()?;
297
298    Ok(TemporalHeader {
299        version,
300        stride,
301        node_count,
302        page_count,
303        root_page_offset,
304        root_page_size,
305    })
306}