Skip to main content

copc_temporal/
temporal_cache.rs

1//! Incremental temporal index loading via async ByteSource.
2//!
3//! Loads temporal index pages on demand, pruning subtrees by time range
4//! before fetching child pages.
5
6use std::collections::HashMap;
7use std::io::Cursor;
8
9use byteorder::{LittleEndian, ReadBytesExt};
10use copc_streaming::{ByteSource, CopcStreamingReader, VoxelKey};
11
12use crate::TemporalError;
13use crate::gps_time::GpsTime;
14use crate::temporal_index::NodeTemporalEntry;
15
16/// Header of the temporal index EVLR (32 bytes).
17#[derive(Debug, Clone)]
18pub struct TemporalHeader {
19    pub version: u32,
20    pub stride: u32,
21    pub node_count: u32,
22    pub page_count: u32,
23    pub root_page_offset: u64,
24    pub root_page_size: u32,
25}
26
27#[derive(Debug, Clone)]
28struct PendingPage {
29    offset: u64,
30    size: u32,
31    subtree_time_min: f64,
32    subtree_time_max: f64,
33}
34
35/// Incrementally-loaded temporal index cache.
36pub struct TemporalCache {
37    header: Option<TemporalHeader>,
38    entries: HashMap<VoxelKey, NodeTemporalEntry>,
39    pending_pages: Vec<PendingPage>,
40    stride: u32,
41}
42
43impl Default for TemporalCache {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TemporalCache {
50    pub fn new() -> Self {
51        Self {
52            header: None,
53            entries: HashMap::new(),
54            pending_pages: Vec::new(),
55            stride: 0,
56        }
57    }
58
59    /// Open the temporal index from a COPC reader.
60    ///
61    /// Loads the temporal header and root page. Returns `Ok(None)` if
62    /// no temporal EVLR exists in the file.
63    pub async fn from_reader<S: ByteSource>(
64        reader: &CopcStreamingReader<S>,
65    ) -> Result<Option<Self>, TemporalError> {
66        let mut cache = Self::new();
67        let found = cache
68            .load_header(reader.source(), reader.evlr_offset(), reader.evlr_count())
69            .await?;
70        if !found {
71            return Ok(None);
72        }
73        cache.load_root_page(reader.source()).await?;
74        Ok(Some(cache))
75    }
76
77    /// Scan EVLRs to find the temporal EVLR and read its header.
78    /// Returns false if no temporal EVLR exists.
79    pub async fn load_header(
80        &mut self,
81        source: &impl ByteSource,
82        evlr_offset: u64,
83        evlr_count: u32,
84    ) -> Result<bool, TemporalError> {
85        let mut pos = evlr_offset;
86
87        for _ in 0..evlr_count {
88            let hdr_data = source.read_range(pos, 60).await?;
89            let mut r = Cursor::new(hdr_data.as_slice());
90
91            // reserved (2)
92            r.set_position(2);
93            let mut user_id = [0u8; 16];
94            std::io::Read::read_exact(&mut r, &mut user_id)?;
95            let record_id = r.read_u16::<LittleEndian>()?;
96            let data_length = r.read_u64::<LittleEndian>()?;
97
98            let data_start = pos + 60;
99
100            let uid_end = user_id.iter().position(|&b| b == 0).unwrap_or(16);
101            let uid_str = std::str::from_utf8(&user_id[..uid_end]).unwrap_or("");
102
103            if uid_str == "copc_temporal" && record_id == 1000 {
104                let header_data = source.read_range(data_start, 32).await?;
105                let header = parse_temporal_header(&header_data)?;
106                self.stride = header.stride;
107                self.header = Some(header);
108                return Ok(true);
109            }
110
111            pos = data_start + data_length;
112        }
113
114        Ok(false)
115    }
116
117    /// Load the root temporal page.
118    pub async fn load_root_page(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
119        let header = self.header.as_ref().ok_or(TemporalError::TruncatedHeader)?;
120
121        let data = source
122            .read_range(header.root_page_offset, header.root_page_size as u64)
123            .await?;
124        self.parse_page(&data)?;
125        Ok(())
126    }
127
128    /// Load child pages that overlap a time range, pruning by subtree bounds.
129    pub async fn load_pages_for_time_range(
130        &mut self,
131        source: &impl ByteSource,
132        start: GpsTime,
133        end: GpsTime,
134    ) -> Result<(), TemporalError> {
135        loop {
136            let matching: Vec<PendingPage> = self
137                .pending_pages
138                .iter()
139                .filter(|p| p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0)
140                .cloned()
141                .collect();
142
143            if matching.is_empty() {
144                break;
145            }
146
147            self.pending_pages
148                .retain(|p| !(p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0));
149
150            let ranges: Vec<_> = matching.iter().map(|p| (p.offset, p.size as u64)).collect();
151            let results = source.read_ranges(&ranges).await?;
152
153            for data in &results {
154                self.parse_page(data)?;
155            }
156        }
157
158        Ok(())
159    }
160
161    /// Load ALL pending pages.
162    pub async fn load_all_pages(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
163        while !self.pending_pages.is_empty() {
164            let pages: Vec<PendingPage> = self.pending_pages.drain(..).collect();
165            let ranges: Vec<_> = pages.iter().map(|p| (p.offset, p.size as u64)).collect();
166            let results = source.read_ranges(&ranges).await?;
167
168            for data in &results {
169                self.parse_page(data)?;
170            }
171        }
172        Ok(())
173    }
174
175    pub fn get(&self, key: &VoxelKey) -> Option<&NodeTemporalEntry> {
176        self.entries.get(key)
177    }
178
179    pub fn nodes_in_range(&self, start: GpsTime, end: GpsTime) -> Vec<&NodeTemporalEntry> {
180        self.entries
181            .values()
182            .filter(|e| e.overlaps(start, end))
183            .collect()
184    }
185
186    pub fn stride(&self) -> u32 {
187        self.stride
188    }
189
190    pub fn header(&self) -> Option<&TemporalHeader> {
191        self.header.as_ref()
192    }
193
194    pub fn len(&self) -> usize {
195        self.entries.len()
196    }
197
198    pub fn is_empty(&self) -> bool {
199        self.entries.is_empty()
200    }
201
202    /// Iterate all loaded entries.
203    pub fn iter(&self) -> impl Iterator<Item = (&VoxelKey, &NodeTemporalEntry)> {
204        self.entries.iter()
205    }
206
207    fn parse_page(&mut self, data: &[u8]) -> Result<(), TemporalError> {
208        let mut r = Cursor::new(data);
209
210        while (r.position() as usize) < data.len() {
211            if r.position() as usize + 20 > data.len() {
212                break;
213            }
214
215            let level = r.read_i32::<LittleEndian>()?;
216            let x = r.read_i32::<LittleEndian>()?;
217            let y = r.read_i32::<LittleEndian>()?;
218            let z = r.read_i32::<LittleEndian>()?;
219            let sample_count = r.read_u32::<LittleEndian>()?;
220
221            let key = VoxelKey { level, x, y, z };
222
223            if sample_count == 0 {
224                // Page pointer: 28 more bytes
225                let child_offset = r.read_u64::<LittleEndian>()?;
226                let child_size = r.read_u32::<LittleEndian>()?;
227                let time_min = r.read_f64::<LittleEndian>()?;
228                let time_max = r.read_f64::<LittleEndian>()?;
229
230                self.pending_pages.push(PendingPage {
231                    offset: child_offset,
232                    size: child_size,
233                    subtree_time_min: time_min,
234                    subtree_time_max: time_max,
235                });
236            } else {
237                let mut samples = Vec::with_capacity(sample_count as usize);
238                for _ in 0..sample_count {
239                    samples.push(GpsTime(r.read_f64::<LittleEndian>()?));
240                }
241
242                self.entries
243                    .insert(key, NodeTemporalEntry::new(key, samples));
244            }
245        }
246
247        Ok(())
248    }
249}
250
251fn parse_temporal_header(data: &[u8]) -> Result<TemporalHeader, TemporalError> {
252    if data.len() < 32 {
253        return Err(TemporalError::TruncatedHeader);
254    }
255    let mut r = Cursor::new(data);
256    let version = r.read_u32::<LittleEndian>()?;
257    if version != 1 {
258        return Err(TemporalError::UnsupportedVersion(version));
259    }
260    let stride = r.read_u32::<LittleEndian>()?;
261    if stride < 1 {
262        return Err(TemporalError::InvalidStride(stride));
263    }
264    let node_count = r.read_u32::<LittleEndian>()?;
265    let page_count = r.read_u32::<LittleEndian>()?;
266    let root_page_offset = r.read_u64::<LittleEndian>()?;
267    let root_page_size = r.read_u32::<LittleEndian>()?;
268    let _reserved = r.read_u32::<LittleEndian>()?;
269
270    Ok(TemporalHeader {
271        version,
272        stride,
273        node_count,
274        page_count,
275        root_page_offset,
276        root_page_size,
277    })
278}