use std::collections::BTreeSet;
use std::sync::Arc;
use emath::NumExt as _;
use re_chunk::{Chunk, ChunkId};
use re_mutex::Mutex;
pub type ChunkPromise = poll_promise::Promise<Result<Vec<Chunk>, ()>>;
#[derive(Clone, Debug)]
pub struct RequestInfo {
pub root_chunk_ids: ahash::HashSet<ChunkId>,
pub row_indices: BTreeSet<usize>,
pub size_bytes_uncompressed: u64,
pub size_bytes_on_wire: u64,
}
pub struct ChunkBatchRequest {
pub promise: Mutex<Option<ChunkPromise>>,
pub info: Arc<RequestInfo>,
}
#[derive(Clone, Copy)]
pub struct ByteFloat(pub f64);
impl std::iter::Sum for ByteFloat {
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
Self(iter.fold(0.0, |acc, item| acc + item.0))
}
}
impl std::ops::Mul<f32> for ByteFloat {
type Output = Self;
fn mul(self, rhs: f32) -> Self::Output {
Self(self.0 * rhs as f64)
}
}
impl std::ops::Div<f32> for ByteFloat {
type Output = Self;
fn div(self, rhs: f32) -> Self::Output {
Self(self.0 / rhs as f64)
}
}
pub struct ChunkRequests {
requests: Vec<ChunkBatchRequest>,
pub download_size_history: emath::History<ByteFloat>,
pub recently_canceled: emath::History<usize>,
}
impl Default for ChunkRequests {
fn default() -> Self {
Self {
requests: Vec::new(),
download_size_history: emath::History::new(0..50, 2.0),
recently_canceled: emath::History::new(0..100, 1.0),
}
}
}
static_assertions::assert_impl_all!(ChunkRequests: Sync);
#[cfg(feature = "testing")]
impl Clone for ChunkRequests {
fn clone(&self) -> Self {
use tap::Tap as _;
Self {
requests: Vec::new(),
download_size_history: self.download_size_history.clone().tap_mut(|h| h.clear()),
recently_canceled: self.recently_canceled.clone().tap_mut(|h| h.clear()),
}
}
}
impl ChunkRequests {
pub fn has_pending(&self) -> bool {
!self.requests.is_empty()
}
pub fn num_on_wire_bytes_pending(&self) -> u64 {
self.requests
.iter()
.map(|b| b.info.size_bytes_uncompressed)
.sum()
}
pub fn bandwidth(&self) -> Option<f64> {
self.download_size_history.bandwidth().map(|b| b.0)
}
pub fn bandwidth_data_freshness(&self, egui_now_time: f64) -> f64 {
self.download_size_history
.iter()
.last()
.map(|(t, _)| {
let age = egui_now_time - t;
(1.0 - age / self.download_size_history.max_age() as f64).at_least(0.0)
})
.unwrap_or(0.0)
}
#[must_use = "Returns newly received chunks"]
pub fn receive_finished(&mut self, egui_now_time: f64) -> Vec<Chunk> {
re_tracing::profile_function!();
let mut all_chunks = Vec::new();
let history = &mut self.download_size_history;
history.flush(egui_now_time);
self.requests.retain_mut(|batch| {
let mut promise_opt = batch.promise.lock();
if let Some(promise) = promise_opt.take() {
match promise.try_take() {
Ok(Ok(chunks)) => {
all_chunks.extend(chunks);
history.add(
egui_now_time,
ByteFloat(batch.info.size_bytes_on_wire as f64),
);
false
}
Ok(Err(())) => false,
Err(promise) => {
*promise_opt = Some(promise);
true
}
}
} else {
false
}
});
all_chunks
}
pub fn add(&mut self, batch: ChunkBatchRequest) {
self.requests.push(batch);
}
pub fn pending_requests(&self) -> Vec<Arc<RequestInfo>> {
self.requests
.iter()
.map(|batch| Arc::clone(&batch.info))
.collect()
}
#[must_use = "Returns root chunks whose download got cancelled. Mark them as unloaded!"]
pub fn cancel_outdated_requests(
&mut self,
egui_now_time: f64,
desired_root_chunks: &ahash::HashSet<ChunkId>,
) -> Vec<ChunkId> {
re_tracing::profile_function!();
let batches = &mut self.requests;
let mut canceled_root_chunks = vec![];
let num_batches_before = batches.len();
batches.retain(|batch| {
if batch.info.root_chunk_ids.is_disjoint(desired_root_chunks) {
canceled_root_chunks.extend(batch.info.root_chunk_ids.iter().copied());
false } else {
true }
});
let num_canceled = num_batches_before - batches.len();
self.recently_canceled.flush(egui_now_time);
if 0 < num_canceled {
re_log::trace!("Canceled {num_canceled} in-flight chunk fetches");
self.recently_canceled.add(egui_now_time, num_canceled);
}
canceled_root_chunks
}
}