Skip to main content

copc_temporal/
temporal_cache.rs

1//! Incremental temporal index loading via async ByteSource.
2//!
3//! Loads temporal index pages on demand, pruning subtrees by time range
4//! before fetching child pages.
5
6use std::collections::HashMap;
7use std::io::Cursor;
8
9use byteorder::{LittleEndian, ReadBytesExt};
10use copc_streaming::{ByteSource, CopcStreamingReader, VoxelKey};
11
12use crate::TemporalError;
13use crate::gps_time::GpsTime;
14use crate::temporal_index::NodeTemporalEntry;
15
16/// Header of the temporal index EVLR (32 bytes).
17#[derive(Debug, Clone)]
18pub struct TemporalHeader {
19    /// Format version (must be 1).
20    pub version: u32,
21    /// Sampling stride — every N-th point is recorded.
22    pub stride: u32,
23    /// Total number of node entries across all pages.
24    pub node_count: u32,
25    /// Total number of pages.
26    pub page_count: u32,
27    /// Absolute file offset of the root page.
28    pub root_page_offset: u64,
29    /// Size of the root page in bytes.
30    pub root_page_size: u32,
31}
32
33#[derive(Debug, Clone)]
34struct PendingPage {
35    offset: u64,
36    size: u32,
37    subtree_time_min: f64,
38    subtree_time_max: f64,
39}
40
41/// Incrementally-loaded temporal index cache.
42pub struct TemporalCache {
43    header: Option<TemporalHeader>,
44    entries: HashMap<VoxelKey, NodeTemporalEntry>,
45    pending_pages: Vec<PendingPage>,
46    stride: u32,
47}
48
49impl Default for TemporalCache {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl TemporalCache {
56    /// Create an empty temporal cache.
57    pub fn new() -> Self {
58        Self {
59            header: None,
60            entries: HashMap::new(),
61            pending_pages: Vec::new(),
62            stride: 0,
63        }
64    }
65
66    /// Open the temporal index from a COPC reader.
67    ///
68    /// Loads the temporal header and root page. Returns `Ok(None)` if
69    /// no temporal EVLR exists in the file.
70    pub async fn from_reader<S: ByteSource>(
71        reader: &CopcStreamingReader<S>,
72    ) -> Result<Option<Self>, TemporalError> {
73        let mut cache = Self::new();
74        let found = cache
75            .load_header(reader.source(), reader.evlr_offset(), reader.evlr_count())
76            .await?;
77        if !found {
78            return Ok(None);
79        }
80        cache.load_root_page(reader.source()).await?;
81        Ok(Some(cache))
82    }
83
84    /// Scan EVLRs to find the temporal EVLR and read its header.
85    /// Returns false if no temporal EVLR exists.
86    pub async fn load_header(
87        &mut self,
88        source: &impl ByteSource,
89        evlr_offset: u64,
90        evlr_count: u32,
91    ) -> Result<bool, TemporalError> {
92        let mut pos = evlr_offset;
93
94        for _ in 0..evlr_count {
95            let hdr_data = source.read_range(pos, 60).await?;
96            let mut r = Cursor::new(hdr_data.as_slice());
97
98            // reserved (2)
99            r.set_position(2);
100            let mut user_id = [0u8; 16];
101            std::io::Read::read_exact(&mut r, &mut user_id)?;
102            let record_id = r.read_u16::<LittleEndian>()?;
103            let data_length = r.read_u64::<LittleEndian>()?;
104
105            let data_start = pos + 60;
106
107            let uid_end = user_id.iter().position(|&b| b == 0).unwrap_or(16);
108            let uid_str = std::str::from_utf8(&user_id[..uid_end]).unwrap_or("");
109
110            if uid_str == "copc_temporal" && record_id == 1000 {
111                let header_data = source.read_range(data_start, 32).await?;
112                let header = parse_temporal_header(&header_data)?;
113                self.stride = header.stride;
114                self.header = Some(header);
115                return Ok(true);
116            }
117
118            pos = data_start + data_length;
119        }
120
121        Ok(false)
122    }
123
124    /// Load the root temporal page.
125    pub async fn load_root_page(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
126        let header = self.header.as_ref().ok_or(TemporalError::TruncatedHeader)?;
127
128        let data = source
129            .read_range(header.root_page_offset, header.root_page_size as u64)
130            .await?;
131        self.parse_page(&data)?;
132        Ok(())
133    }
134
135    /// Load child pages that overlap a time range, pruning by subtree bounds.
136    pub async fn load_pages_for_time_range(
137        &mut self,
138        source: &impl ByteSource,
139        start: GpsTime,
140        end: GpsTime,
141    ) -> Result<(), TemporalError> {
142        loop {
143            let matching: Vec<PendingPage> = self
144                .pending_pages
145                .iter()
146                .filter(|p| p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0)
147                .cloned()
148                .collect();
149
150            if matching.is_empty() {
151                break;
152            }
153
154            self.pending_pages
155                .retain(|p| !(p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0));
156
157            let ranges: Vec<_> = matching.iter().map(|p| (p.offset, p.size as u64)).collect();
158            let results = source.read_ranges(&ranges).await?;
159
160            for data in &results {
161                self.parse_page(data)?;
162            }
163        }
164
165        Ok(())
166    }
167
168    /// Load ALL pending pages.
169    pub async fn load_all_pages(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
170        while !self.pending_pages.is_empty() {
171            let pages: Vec<PendingPage> = self.pending_pages.drain(..).collect();
172            let ranges: Vec<_> = pages.iter().map(|p| (p.offset, p.size as u64)).collect();
173            let results = source.read_ranges(&ranges).await?;
174
175            for data in &results {
176                self.parse_page(data)?;
177            }
178        }
179        Ok(())
180    }
181
182    /// Look up the temporal entry for a node.
183    pub fn get(&self, key: &VoxelKey) -> Option<&NodeTemporalEntry> {
184        self.entries.get(key)
185    }
186
187    /// Return all loaded nodes whose time range overlaps `[start, end]`.
188    pub fn nodes_in_range(&self, start: GpsTime, end: GpsTime) -> Vec<&NodeTemporalEntry> {
189        self.entries
190            .values()
191            .filter(|e| e.overlaps(start, end))
192            .collect()
193    }
194
195    /// The sampling stride (every N-th point is recorded in the index).
196    pub fn stride(&self) -> u32 {
197        self.stride
198    }
199
200    /// The parsed temporal index header, if loaded.
201    pub fn header(&self) -> Option<&TemporalHeader> {
202        self.header.as_ref()
203    }
204
205    /// Number of loaded node entries.
206    pub fn len(&self) -> usize {
207        self.entries.len()
208    }
209
210    /// Whether no node entries have been loaded.
211    pub fn is_empty(&self) -> bool {
212        self.entries.is_empty()
213    }
214
215    /// Iterate all loaded entries.
216    pub fn iter(&self) -> impl Iterator<Item = (&VoxelKey, &NodeTemporalEntry)> {
217        self.entries.iter()
218    }
219
220    fn parse_page(&mut self, data: &[u8]) -> Result<(), TemporalError> {
221        let mut r = Cursor::new(data);
222
223        while (r.position() as usize) < data.len() {
224            if r.position() as usize + 20 > data.len() {
225                break;
226            }
227
228            let level = r.read_i32::<LittleEndian>()?;
229            let x = r.read_i32::<LittleEndian>()?;
230            let y = r.read_i32::<LittleEndian>()?;
231            let z = r.read_i32::<LittleEndian>()?;
232            let sample_count = r.read_u32::<LittleEndian>()?;
233
234            let key = VoxelKey { level, x, y, z };
235
236            if sample_count == 0 {
237                // Page pointer: 28 more bytes
238                let child_offset = r.read_u64::<LittleEndian>()?;
239                let child_size = r.read_u32::<LittleEndian>()?;
240                let time_min = r.read_f64::<LittleEndian>()?;
241                let time_max = r.read_f64::<LittleEndian>()?;
242
243                self.pending_pages.push(PendingPage {
244                    offset: child_offset,
245                    size: child_size,
246                    subtree_time_min: time_min,
247                    subtree_time_max: time_max,
248                });
249            } else {
250                let mut samples = Vec::with_capacity(sample_count as usize);
251                for _ in 0..sample_count {
252                    samples.push(GpsTime(r.read_f64::<LittleEndian>()?));
253                }
254
255                self.entries
256                    .insert(key, NodeTemporalEntry::new(key, samples));
257            }
258        }
259
260        Ok(())
261    }
262}
263
264fn parse_temporal_header(data: &[u8]) -> Result<TemporalHeader, TemporalError> {
265    if data.len() < 32 {
266        return Err(TemporalError::TruncatedHeader);
267    }
268    let mut r = Cursor::new(data);
269    let version = r.read_u32::<LittleEndian>()?;
270    if version != 1 {
271        return Err(TemporalError::UnsupportedVersion(version));
272    }
273    let stride = r.read_u32::<LittleEndian>()?;
274    if stride < 1 {
275        return Err(TemporalError::InvalidStride(stride));
276    }
277    let node_count = r.read_u32::<LittleEndian>()?;
278    let page_count = r.read_u32::<LittleEndian>()?;
279    let root_page_offset = r.read_u64::<LittleEndian>()?;
280    let root_page_size = r.read_u32::<LittleEndian>()?;
281    let _reserved = r.read_u32::<LittleEndian>()?;
282
283    Ok(TemporalHeader {
284        version,
285        stride,
286        node_count,
287        page_count,
288        root_page_offset,
289        root_page_size,
290    })
291}