use futures::{StreamExt as _, TryStream, TryStreamExt as _, future::ready, stream};
use itertools::Itertools as _;
use std::{
collections::HashSet,
num::{NonZeroU16, NonZeroUsize},
ops::Add,
sync::{Arc, Mutex},
};
use tokio::task;
use tracing::trace;
use crate::{
asset_manager::AssetManager,
format::{
ChunkId, ChunkLength, ChunkOffset, SnapshotId,
manifest::{ChunkPayload, Manifest, VirtualChunkLocation},
snapshot::ManifestFileInfo,
},
ops::pointed_snapshots,
repository::{RepositoryError, RepositoryErrorKind, RepositoryResult},
stream_utils::{StreamLimiter, try_unique_stream},
};
use icechunk_types::{ICResultExt as _, error::ICResultCtxExt as _};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ChunkStorageStats {
pub native_bytes: u64,
pub virtual_bytes: u64,
pub inlined_bytes: u64,
}
impl ChunkStorageStats {
pub fn new(native_bytes: u64, virtual_bytes: u64, inlined_bytes: u64) -> Self {
Self { native_bytes, virtual_bytes, inlined_bytes }
}
pub fn non_virtual_bytes(&self) -> u64 {
self.native_bytes.saturating_add(self.inlined_bytes)
}
pub fn total_bytes(&self) -> u64 {
self.native_bytes
.saturating_add(self.virtual_bytes)
.saturating_add(self.inlined_bytes)
}
}
impl Add for ChunkStorageStats {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
native_bytes: self.native_bytes.saturating_add(other.native_bytes),
virtual_bytes: self.virtual_bytes.saturating_add(other.virtual_bytes),
inlined_bytes: self.inlined_bytes.saturating_add(other.inlined_bytes),
}
}
}
fn insert_and_increment_size_if_new<T: Eq + std::hash::Hash>(
seen: &Arc<Mutex<HashSet<T>>>,
key: T,
size_increment: u64,
size_counter: &mut u64,
) -> RepositoryResult<()> {
if seen
.lock()
.map_err(|e| {
RepositoryErrorKind::Other(format!(
"Thread panic during manifest_chunk_storage: {e}"
))
})
.capture()?
.insert(key)
{
*size_counter += size_increment;
}
Ok(())
}
#[expect(clippy::type_complexity)]
fn calculate_manifest_storage(
manifest: &Arc<Manifest>,
seen_native_chunks: &Arc<Mutex<HashSet<ChunkId>>>,
seen_virtual_chunks: &Arc<
Mutex<HashSet<(VirtualChunkLocation, ChunkOffset, ChunkLength)>>,
>,
) -> RepositoryResult<ChunkStorageStats> {
trace!(manifest_id = %manifest.id(), "Processing manifest");
let mut native_bytes: u64 = 0;
let mut virtual_bytes: u64 = 0;
let mut inlined_bytes: u64 = 0;
for payload in manifest.chunk_payloads().inject()? {
match payload {
Ok(ChunkPayload::Ref(chunk_ref)) => {
insert_and_increment_size_if_new(
seen_native_chunks,
chunk_ref.id,
chunk_ref.length,
&mut native_bytes,
)?;
}
Ok(ChunkPayload::Virtual(virtual_ref)) => {
let virtual_chunk_identifier = (
virtual_ref.location.clone(),
virtual_ref.offset,
virtual_ref.length,
);
insert_and_increment_size_if_new(
seen_virtual_chunks,
virtual_chunk_identifier,
virtual_ref.length,
&mut virtual_bytes,
)?;
}
Ok(ChunkPayload::Inline(bytes)) => {
inlined_bytes += bytes.len() as u64;
}
Ok(_) => {}
Err(err) => {
tracing::error!(
error = %err,
"Error in chunk payload iterator"
);
}
}
}
trace!(manifest_id = %manifest.id(), "Manifest done");
let stats = ChunkStorageStats::new(native_bytes, virtual_bytes, inlined_bytes);
Ok(stats)
}
async fn unique_manifest_infos<'a>(
asset_manager: Arc<AssetManager>,
extra_roots: &'a HashSet<SnapshotId>,
max_snapshots_in_memory: NonZeroU16,
) -> RepositoryResult<impl TryStream<Ok = ManifestFileInfo, Error = RepositoryError> + 'a>
{
let all_snaps = pointed_snapshots(asset_manager, extra_roots)
.await?
.map(ready)
.buffer_unordered(max_snapshots_in_memory.get() as usize);
let all_manifest_infos = all_snaps
.map(|snap| {
let files: Vec<_> = snap?.manifest_files().try_collect().inject()?;
Ok(stream::iter(files.into_iter().map(Ok)))
})
.try_flatten();
let res = try_unique_stream(|mi| mi.id.clone(), all_manifest_infos);
Ok(res)
}
pub async fn repo_chunks_storage(
asset_manager: Arc<AssetManager>,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
) -> RepositoryResult<ChunkStorageStats> {
let extra_roots = Default::default();
let manifest_infos = unique_manifest_infos(
Arc::clone(&asset_manager),
&extra_roots,
max_snapshots_in_memory,
)
.await?;
let limiter = &Arc::new(StreamLimiter::new(
"repo_chunks_storage".to_string(),
max_compressed_manifest_mem_bytes.get(),
));
let rate_limited_manifests =
limiter.limit_stream(manifest_infos, |minfo| minfo.size_bytes as usize);
let seen_native_chunks = &Arc::new(Mutex::new(HashSet::new()));
let seen_virtual_chunks = &Arc::new(Mutex::new(HashSet::new()));
let asset_manager = &asset_manager;
let compute_stream = rate_limited_manifests
.map_ok(|m| async move {
let manifest =
Arc::clone(asset_manager).fetch_manifest(&m.id, m.size_bytes).await?;
Ok((manifest, m))
})
.try_buffer_unordered(max_concurrent_manifest_fetches.get() as usize)
.and_then(|(manifest, minfo)| async move {
let seen_native_chunks = Arc::clone(seen_native_chunks);
let seen_virtual_chunks = Arc::clone(seen_virtual_chunks);
let stats = task::spawn_blocking(move || {
calculate_manifest_storage(
&manifest,
&seen_native_chunks,
&seen_virtual_chunks,
)
})
.await
.capture()??;
Ok((stats, minfo))
});
let (_, res) = limiter
.unlimit_stream(compute_stream, |(_, minfo)| minfo.size_bytes as usize)
.try_fold(
(0u64, ChunkStorageStats::default()),
|(processed, total_stats), (partial, _)| {
ready(Ok((processed + 1, total_stats + partial)))
},
)
.await?;
debug_assert_eq!(limiter.current_usage(), 0);
Ok(res)
}