use std::collections::HashMap;
use std::io::Cursor;
use byteorder::{LittleEndian, ReadBytesExt};
use copc_streaming::{ByteSource, CopcStreamingReader, VoxelKey};
use crate::TemporalError;
use crate::gps_time::GpsTime;
use crate::temporal_index::NodeTemporalEntry;
#[derive(Debug, Clone)]
pub struct TemporalHeader {
pub version: u32,
pub stride: u32,
pub node_count: u32,
pub page_count: u32,
pub root_page_offset: u64,
pub root_page_size: u32,
}
#[derive(Debug, Clone)]
struct PendingPage {
offset: u64,
size: u32,
subtree_time_min: f64,
subtree_time_max: f64,
}
pub struct TemporalCache {
header: Option<TemporalHeader>,
entries: HashMap<VoxelKey, NodeTemporalEntry>,
pending_pages: Vec<PendingPage>,
stride: u32,
}
impl Default for TemporalCache {
fn default() -> Self {
Self::new()
}
}
impl TemporalCache {
pub fn new() -> Self {
Self {
header: None,
entries: HashMap::new(),
pending_pages: Vec::new(),
stride: 0,
}
}
pub async fn from_reader<S: ByteSource>(
reader: &CopcStreamingReader<S>,
) -> Result<Option<Self>, TemporalError> {
let mut cache = Self::new();
let found = cache
.load_header(reader.source(), reader.evlr_offset(), reader.evlr_count())
.await?;
if !found {
return Ok(None);
}
cache.load_root_page(reader.source()).await?;
Ok(Some(cache))
}
pub async fn load_header(
&mut self,
source: &impl ByteSource,
evlr_offset: u64,
evlr_count: u32,
) -> Result<bool, TemporalError> {
let mut pos = evlr_offset;
for _ in 0..evlr_count {
let hdr_data = source.read_range(pos, 60).await?;
let mut r = Cursor::new(hdr_data.as_slice());
r.set_position(2);
let mut user_id = [0u8; 16];
std::io::Read::read_exact(&mut r, &mut user_id)?;
let record_id = r.read_u16::<LittleEndian>()?;
let data_length = r.read_u64::<LittleEndian>()?;
let data_start = pos + 60;
let uid_end = user_id.iter().position(|&b| b == 0).unwrap_or(16);
let uid_str = std::str::from_utf8(&user_id[..uid_end]).unwrap_or("");
if uid_str == "copc_temporal" && record_id == 1000 {
let header_data = source.read_range(data_start, 32).await?;
let header = parse_temporal_header(&header_data)?;
self.stride = header.stride;
self.header = Some(header);
return Ok(true);
}
pos = data_start + data_length;
}
Ok(false)
}
pub async fn load_root_page(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
let header = self.header.as_ref().ok_or(TemporalError::TruncatedHeader)?;
let data = source
.read_range(header.root_page_offset, header.root_page_size as u64)
.await?;
self.parse_page(&data)?;
Ok(())
}
pub async fn load_pages_for_time_range(
&mut self,
source: &impl ByteSource,
start: GpsTime,
end: GpsTime,
) -> Result<(), TemporalError> {
loop {
let matching: Vec<PendingPage> = self
.pending_pages
.iter()
.filter(|p| p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0)
.cloned()
.collect();
if matching.is_empty() {
break;
}
self.pending_pages
.retain(|p| !(p.subtree_time_max >= start.0 && p.subtree_time_min <= end.0));
let ranges: Vec<_> = matching.iter().map(|p| (p.offset, p.size as u64)).collect();
let results = source.read_ranges(&ranges).await?;
for data in &results {
self.parse_page(data)?;
}
}
Ok(())
}
pub async fn load_all_pages(&mut self, source: &impl ByteSource) -> Result<(), TemporalError> {
while !self.pending_pages.is_empty() {
let pages: Vec<PendingPage> = self.pending_pages.drain(..).collect();
let ranges: Vec<_> = pages.iter().map(|p| (p.offset, p.size as u64)).collect();
let results = source.read_ranges(&ranges).await?;
for data in &results {
self.parse_page(data)?;
}
}
Ok(())
}
pub async fn query(
&mut self,
source: &impl ByteSource,
start: GpsTime,
end: GpsTime,
) -> Result<Vec<&NodeTemporalEntry>, TemporalError> {
self.load_pages_for_time_range(source, start, end).await?;
Ok(self.nodes_in_range(start, end))
}
pub fn get(&self, key: &VoxelKey) -> Option<&NodeTemporalEntry> {
self.entries.get(key)
}
pub fn nodes_in_range(&self, start: GpsTime, end: GpsTime) -> Vec<&NodeTemporalEntry> {
self.entries
.values()
.filter(|e| e.overlaps(start, end))
.collect()
}
pub fn stride(&self) -> u32 {
self.stride
}
pub fn header(&self) -> Option<&TemporalHeader> {
self.header.as_ref()
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (&VoxelKey, &NodeTemporalEntry)> {
self.entries.iter()
}
pub async fn query_points<S: ByteSource>(
&mut self,
reader: &mut CopcStreamingReader<S>,
bounds: &copc_streaming::Aabb,
start: GpsTime,
end: GpsTime,
) -> Result<Vec<las::Point>, TemporalError> {
reader
.load_hierarchy_for_bounds(bounds)
.await
.map_err(TemporalError::Copc)?;
self.load_pages_for_time_range(reader.source(), start, end)
.await?;
let root_bounds = reader.copc_info().root_bounds();
let stride = self.stride;
let matches: Vec<_> = self
.nodes_in_range(start, end)
.into_iter()
.filter(|e| e.key.bounds(&root_bounds).intersects(bounds))
.filter_map(|e| {
let hier = reader.get(&e.key)?;
let range = e.estimate_point_range(start, end, stride, hier.point_count);
if range.is_empty() {
return None;
}
Some((e.key, range))
})
.collect();
let mut all_points = Vec::new();
for (key, range) in matches {
let chunk = reader
.fetch_chunk(&key)
.await
.map_err(TemporalError::Copc)?;
let points = reader
.read_points_range_in_bounds(&chunk, range, bounds)
.map_err(TemporalError::Copc)?;
let points = crate::filter_points_by_time(points, start, end);
all_points.extend(points);
}
Ok(all_points)
}
pub async fn query_points_by_time<S: ByteSource>(
&mut self,
reader: &mut CopcStreamingReader<S>,
start: GpsTime,
end: GpsTime,
) -> Result<Vec<las::Point>, TemporalError> {
reader
.load_all_hierarchy()
.await
.map_err(TemporalError::Copc)?;
self.load_pages_for_time_range(reader.source(), start, end)
.await?;
let stride = self.stride;
let matches: Vec<_> = self
.nodes_in_range(start, end)
.into_iter()
.filter_map(|e| {
let hier = reader.get(&e.key)?;
let range = e.estimate_point_range(start, end, stride, hier.point_count);
if range.is_empty() {
return None;
}
Some((e.key, range))
})
.collect();
let mut all_points = Vec::new();
for (key, range) in matches {
let chunk = reader
.fetch_chunk(&key)
.await
.map_err(TemporalError::Copc)?;
let points = reader
.read_points_range(&chunk, range)
.map_err(TemporalError::Copc)?;
let points = crate::filter_points_by_time(points, start, end);
all_points.extend(points);
}
Ok(all_points)
}
fn parse_page(&mut self, data: &[u8]) -> Result<(), TemporalError> {
let mut r = Cursor::new(data);
while (r.position() as usize) < data.len() {
if r.position() as usize + 20 > data.len() {
break;
}
let level = r.read_i32::<LittleEndian>()?;
let x = r.read_i32::<LittleEndian>()?;
let y = r.read_i32::<LittleEndian>()?;
let z = r.read_i32::<LittleEndian>()?;
let sample_count = r.read_u32::<LittleEndian>()?;
let key = VoxelKey { level, x, y, z };
if sample_count == 0 {
let child_offset = r.read_u64::<LittleEndian>()?;
let child_size = r.read_u32::<LittleEndian>()?;
let time_min = r.read_f64::<LittleEndian>()?;
let time_max = r.read_f64::<LittleEndian>()?;
self.pending_pages.push(PendingPage {
offset: child_offset,
size: child_size,
subtree_time_min: time_min,
subtree_time_max: time_max,
});
} else {
let mut samples = Vec::with_capacity(sample_count as usize);
for _ in 0..sample_count {
samples.push(GpsTime(r.read_f64::<LittleEndian>()?));
}
self.entries
.insert(key, NodeTemporalEntry::new(key, samples));
}
}
Ok(())
}
}
fn parse_temporal_header(data: &[u8]) -> Result<TemporalHeader, TemporalError> {
if data.len() < 32 {
return Err(TemporalError::TruncatedHeader);
}
let mut r = Cursor::new(data);
let version = r.read_u32::<LittleEndian>()?;
if version != 1 {
return Err(TemporalError::UnsupportedVersion(version));
}
let stride = r.read_u32::<LittleEndian>()?;
if stride < 1 {
return Err(TemporalError::InvalidStride(stride));
}
let node_count = r.read_u32::<LittleEndian>()?;
let page_count = r.read_u32::<LittleEndian>()?;
let root_page_offset = r.read_u64::<LittleEndian>()?;
let root_page_size = r.read_u32::<LittleEndian>()?;
let _reserved = r.read_u32::<LittleEndian>()?;
Ok(TemporalHeader {
version,
stride,
node_count,
page_count,
root_page_offset,
root_page_size,
})
}