Skip to main content

re_chunk/
split.rs

1use std::sync::Arc;
2
3use re_byte_size::SizeBytes;
4use re_types_core::ChunkId;
5
6use crate::Chunk;
7
8/// See [`Chunk::split_chunk_if_needed`].
9#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub struct ChunkSplitConfig {
11    /// Split chunks larger than this.
12    pub chunk_max_bytes: u64,
13
14    /// Split chunks with more rows than this.
15    ///
16    /// This specifically applies to time-sorted chunks.
17    /// See also [`Self::chunk_max_rows_if_unsorted`].
18    pub chunk_max_rows: u64,
19
20    /// Split chunks with more rows than this.
21    ///
22    /// This specifically applies to _non_ time-sorted chunks.
23    /// See also [`Self::chunk_max_rows`].
24    pub chunk_max_rows_if_unsorted: u64,
25}
26
27impl Chunk {
28    /// Naively splits a chunk if it exceeds the configured thresholds.
29    ///
30    /// The resulting pieces may still be larger than [`ChunkSplitConfig::chunk_max_bytes`].
31    ///
32    /// The Chunk is *deeply* sliced, as opposed to shallowly. Refer to [`Chunk::row_sliced_deep`]
33    /// to learn more about that and why it matters.
34    pub fn split_chunk_if_needed(chunk: Arc<Self>, cfg: &ChunkSplitConfig) -> Vec<Arc<Self>> {
35        let ChunkSplitConfig {
36            chunk_max_bytes,
37            chunk_max_rows,
38            chunk_max_rows_if_unsorted,
39        } = *cfg;
40
41        let chunk_size_bytes = <Self as SizeBytes>::total_size_bytes(chunk.as_ref());
42        let chunk_num_rows = chunk.num_rows() as u64;
43
44        if chunk_num_rows <= 1 {
45            // Can't split even if we wanted to.
46            return vec![chunk];
47        }
48
49        // Check if we need to split based on size or row count
50        let needs_split_bytes = chunk_max_bytes > 0 && chunk_size_bytes > chunk_max_bytes;
51        let needs_split_rows = chunk_max_rows > 0 && chunk_num_rows > chunk_max_rows;
52        let needs_split_unsorted = chunk_max_rows_if_unsorted > 0
53            && chunk_num_rows > chunk_max_rows_if_unsorted
54            && !chunk.is_time_sorted();
55
56        if !needs_split_bytes && !needs_split_rows && !needs_split_unsorted {
57            return vec![chunk];
58        }
59
60        re_tracing::profile_scope!("split_chunk");
61
62        // Determine the target number of rows per split chunk
63        let target_rows = if needs_split_unsorted {
64            chunk_max_rows_if_unsorted
65        } else if needs_split_rows {
66            chunk_max_rows
67        } else {
68            // For byte-based splitting, estimate rows per split chunk based on current density
69            let bytes_per_row = chunk_size_bytes / chunk_num_rows.max(1);
70            chunk_max_bytes / bytes_per_row.max(1)
71        };
72
73        let target_rows = target_rows.max(1) as usize; // Ensure at least 1 row per chunk
74
75        let mut result = Vec::with_capacity(chunk.num_rows().div_ceil(target_rows));
76        let mut start_idx = 0;
77
78        while start_idx < chunk.num_rows() {
79            let remaining_rows = chunk.num_rows() - start_idx;
80            let chunk_size = remaining_rows.min(target_rows);
81
82            let split_chunk = chunk
83                .row_sliced_deep(start_idx, chunk_size)
84                .with_id(ChunkId::new());
85
86            result.push(Arc::new(split_chunk));
87
88            start_idx += chunk_size;
89        }
90
91        re_log::trace!(
92            entity_path = %chunk.entity_path(),
93            original_rows = chunk.num_rows(),
94            original_bytes = %re_format::format_bytes(chunk_size_bytes as _),
95            split_into = result.len(),
96            target_rows,
97            "split chunk"
98        );
99
100        result
101    }
102}