1use std::sync::Arc;
2
3use re_byte_size::SizeBytes;
4use re_types_core::ChunkId;
5
6use crate::Chunk;
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub struct ChunkSplitConfig {
11 pub chunk_max_bytes: u64,
13
14 pub chunk_max_rows: u64,
19
20 pub chunk_max_rows_if_unsorted: u64,
25}
26
27impl Chunk {
28 pub fn split_chunk_if_needed(chunk: Arc<Self>, cfg: &ChunkSplitConfig) -> Vec<Arc<Self>> {
35 let ChunkSplitConfig {
36 chunk_max_bytes,
37 chunk_max_rows,
38 chunk_max_rows_if_unsorted,
39 } = *cfg;
40
41 let chunk_size_bytes = <Self as SizeBytes>::total_size_bytes(chunk.as_ref());
42 let chunk_num_rows = chunk.num_rows() as u64;
43
44 if chunk_num_rows <= 1 {
45 return vec![chunk];
47 }
48
49 let needs_split_bytes = chunk_max_bytes > 0 && chunk_size_bytes > chunk_max_bytes;
51 let needs_split_rows = chunk_max_rows > 0 && chunk_num_rows > chunk_max_rows;
52 let needs_split_unsorted = chunk_max_rows_if_unsorted > 0
53 && chunk_num_rows > chunk_max_rows_if_unsorted
54 && !chunk.is_time_sorted();
55
56 if !needs_split_bytes && !needs_split_rows && !needs_split_unsorted {
57 return vec![chunk];
58 }
59
60 re_tracing::profile_scope!("split_chunk");
61
62 let target_rows = if needs_split_unsorted {
64 chunk_max_rows_if_unsorted
65 } else if needs_split_rows {
66 chunk_max_rows
67 } else {
68 let bytes_per_row = chunk_size_bytes / chunk_num_rows.max(1);
70 chunk_max_bytes / bytes_per_row.max(1)
71 };
72
73 let target_rows = target_rows.max(1) as usize; let mut result = Vec::with_capacity(chunk.num_rows().div_ceil(target_rows));
76 let mut start_idx = 0;
77
78 while start_idx < chunk.num_rows() {
79 let remaining_rows = chunk.num_rows() - start_idx;
80 let chunk_size = remaining_rows.min(target_rows);
81
82 let split_chunk = chunk
83 .row_sliced_deep(start_idx, chunk_size)
84 .with_id(ChunkId::new());
85
86 result.push(Arc::new(split_chunk));
87
88 start_idx += chunk_size;
89 }
90
91 re_log::trace!(
92 entity_path = %chunk.entity_path(),
93 original_rows = chunk.num_rows(),
94 original_bytes = %re_format::format_bytes(chunk_size_bytes as _),
95 split_into = result.len(),
96 target_rows,
97 "split chunk"
98 );
99
100 result
101 }
102}