1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
use std::sync::Arc;
use re_sdk_types::components::VideoCodec;
use crate::{ChunkStore, ChunkStoreConfig, ChunkStoreError};
/// Callback to detect whether a video sample is the start of a GoP (keyframe).
pub type IsStartOfGop = Arc<dyn Fn(&[u8], VideoCodec) -> anyhow::Result<bool> + Send + Sync>;
/// Options for [`ChunkStore::compacted`].
#[derive(Clone)]
pub struct CompactionOptions {
/// Controls chunk size thresholds for both merging and splitting.
pub config: ChunkStoreConfig,
/// Maximum number of extra compaction passes to run.
///
/// Compaction is iterative: each pass merges small neighboring chunks.
/// Stops early if the chunk count converges.
/// Defaults to 50 if `None`.
pub num_extra_passes: Option<usize>,
/// If set, video stream chunks will be rebatched so that each chunk
/// aligns to GoP (Group of Pictures) boundaries.
///
/// The callback should return `true` if the given sample data is a keyframe
/// for the given codec. Use `re_video::is_start_of_gop` wrapped in a closure.
///
/// If `None`, no video rebatching is performed.
///
/// **Note:** GoP rebatching never splits a GoP across chunks, so if a single
/// GoP is larger than [`ChunkStoreConfig::chunk_max_bytes`], it becomes one
/// oversized chunk regardless of the ceiling. Streams with long keyframe
/// intervals (e.g. 10+ seconds between I-frames) can therefore produce chunks
/// that are many megabytes in size.
pub is_start_of_gop: Option<IsStartOfGop>,
/// If set, chunks are split so no two archetype groups sharing a chunk differ
/// in byte size by more than this factor. Values should be `>= 1`; at `1.0`,
/// every archetype is forced into its own chunk.
///
/// This keeps "thick" columns (images, videos, blobs) out of the same chunk as
/// "thin" columns (scalars, transforms, text). Components belonging to the same
/// archetype always stay together.
///
/// `None` disables the split.
pub split_size_ratio: Option<f64>,
}
impl ChunkStore {
/// Return a new, compacted version of this store.
///
/// Compaction merges small neighboring chunks that share the same entity path, timelines, and
/// datatypes, up to the thresholds in the config. Large chunks may be split.
///
/// If `is_start_of_gop` is provided, video stream chunks are rebatched to align
/// with GoP boundaries after compaction, and sparse `is_keyframe` marker chunks
/// are emitted.
///
/// If `split_size_ratio` is provided, chunks are split on entry so no two
/// archetype groups sharing a chunk differ in byte size by more than that factor.
///
/// The returned store has compaction **disabled** ([`ChunkStoreConfig::ALL_DISABLED`]).
// TODO(RR-4328): we should improve this by exploiting the chunk index, hopefully making it
// memory-bounded
pub fn compacted(&self, options: &CompactionOptions) -> Result<Self, ChunkStoreError> {
re_tracing::profile_function!();
// Initial pass: re-insert all chunks into a compaction-enabled store.
let mut store = Self::new(self.id().clone(), options.config.clone());
for chunk in self.iter_physical_chunks() {
store.insert_chunk(chunk)?;
}
store.finalize_compaction(options)
}
/// Finalize a compaction-enabled store: run up to
/// [`CompactionOptions::num_extra_passes`] additional compaction passes
/// (stopping early when the chunk count stops decreasing), optionally rebatch
/// video chunks along GoP boundaries, then disable compaction on the returned
/// store ([`ChunkStoreConfig::ALL_DISABLED`] config).
///
/// Consumes `self`. Assumes `self` was built with a compaction-enabled
/// config (otherwise each pass is a no-op).
pub fn finalize_compaction(
mut self,
options: &CompactionOptions,
) -> Result<Self, ChunkStoreError> {
re_tracing::profile_function!();
if self.config.chunk_max_bytes == 0
&& self.config.chunk_max_rows == 0
&& self.config.chunk_max_rows_if_unsorted == 0
{
re_log::debug_warn!(
"Finalizing compaction on a store that does not have compaction enabled. \
Extra compaction passes will have no effects."
);
}
let CompactionOptions {
config,
num_extra_passes,
is_start_of_gop,
split_size_ratio,
} = options;
let num_extra_passes = num_extra_passes.unwrap_or(50);
// If `split_size_ratio` is set, re-insert each chunk split into one piece per
// size tier. Compaction's merge-candidate search is per-component, so a thin
// chunk can only merge with another thin chunk — it won't pull a thick
// sibling back in through the shared small component.
if let Some(&ratio) = split_size_ratio.as_ref() {
let chunks: Vec<_> = self.iter_physical_chunks().cloned().collect();
let mut new_store = Self::new(self.id().clone(), config.clone());
for chunk in &chunks {
if let Some(splits) = crate::split_thick_thin::split_chunk(chunk, ratio) {
for split in splits {
new_store.insert_chunk(&Arc::new(split))?;
}
} else {
new_store.insert_chunk(chunk)?;
}
}
self = new_store;
}
for pass in 0..num_extra_passes {
let now = web_time::Instant::now();
let num_before = self.num_physical_chunks();
let chunks: Vec<_> = self.iter_physical_chunks().cloned().collect();
let mut new_store = Self::new(self.id().clone(), config.clone());
for chunk in &chunks {
new_store.insert_chunk(chunk)?;
}
let num_after = new_store.num_physical_chunks();
self = new_store;
re_log::info!(
pass,
num_before,
num_after,
time = ?now.elapsed(),
"compaction pass completed",
);
if num_after >= num_before {
re_log::info!(pass, "converged, stopping early");
break;
}
}
// Rebatch video stream chunks along GoP boundaries.
if let Some(is_start_of_gop) = is_start_of_gop {
let now = web_time::Instant::now();
match crate::rebatch_videos::rebatch_video_chunks_to_gops(
&self,
config,
is_start_of_gop.as_ref(),
) {
Ok(new_store) => {
self = new_store;
re_log::info!(time = ?now.elapsed(), "video GoP rebatching completed");
}
Err(err) => {
re_log::warn!(%err, "video GoP rebatching failed");
}
}
}
// Post-condition: returned store is inert.
self.config = ChunkStoreConfig::ALL_DISABLED;
Ok(self)
}
}
#[cfg(test)]
mod tests {
use re_chunk::{Chunk, RowId};
use re_log_types::{EntityPath, StoreId, StoreKind, Timeline, example_components::MyPoint};
use re_sdk_types::components::Blob;
use re_types_core::{ArchetypeName, ComponentDescriptor};
use super::*;
#[test]
fn finalize_compaction_resets_config_to_all_disabled() {
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test");
let store = ChunkStore::new(store_id, ChunkStoreConfig::CHANGELOG_DISABLED);
let options = CompactionOptions {
config: ChunkStoreConfig::CHANGELOG_DISABLED,
num_extra_passes: Some(0),
is_start_of_gop: None,
split_size_ratio: None,
};
let result = store
.finalize_compaction(&options)
.expect("zero passes should succeed");
assert_eq!(result.config, ChunkStoreConfig::ALL_DISABLED);
}
/// Produce a chunk carrying one blob row and one point row under two different archetypes.
fn mixed_chunk(entity: &EntityPath, frame: i64, blob_bytes: usize) -> Arc<Chunk> {
let blob_descriptor = ComponentDescriptor {
archetype: Some(ArchetypeName::from("my.Video")),
component: "Video:blob".into(),
component_type: None,
};
let point_descriptor = ComponentDescriptor {
archetype: Some(ArchetypeName::from("my.Points")),
component: "Points:pos".into(),
component_type: None,
};
let blob = Blob::from(vec![0u8; blob_bytes]);
let point = &[MyPoint::new(frame as f32, frame as f32)];
let chunk = Chunk::builder(entity.clone())
.with_component_batches(
RowId::new(),
[(Timeline::new_sequence("frame"), frame)],
[
(
blob_descriptor,
&[blob] as &dyn re_types_core::ComponentBatch,
),
(
point_descriptor,
point as &dyn re_types_core::ComponentBatch,
),
],
)
.build()
.unwrap();
Arc::new(chunk)
}
#[test]
fn compacted_splits_thick_from_thin() -> anyhow::Result<()> {
re_log::setup_logging();
let entity = EntityPath::from("camera");
let blob_bytes = 128 * 1024; // well above the scalar payload
let mut store = ChunkStore::new(
StoreId::random(StoreKind::Recording, "test_app"),
ChunkStoreConfig::ALL_DISABLED,
);
for frame in 0..4 {
store.insert_chunk(&mixed_chunk(&entity, frame, blob_bytes))?;
}
let options = CompactionOptions {
config: ChunkStoreConfig::DEFAULT,
num_extra_passes: Some(3),
is_start_of_gop: None,
split_size_ratio: Some(10.0),
};
let compacted = store.compacted(&options)?;
// After compaction, no output chunk may mix the two archetypes.
for chunk in compacted.iter_physical_chunks() {
let archetypes: std::collections::BTreeSet<_> = chunk
.components()
.values()
.map(|c| c.descriptor.archetype)
.collect();
assert_eq!(
archetypes.len(),
1,
"chunk mixes archetypes after thick/thin split: {archetypes:?}",
);
}
// And we should still end up with at least one chunk per archetype.
let archetypes_seen: std::collections::BTreeSet<_> = compacted
.iter_physical_chunks()
.flat_map(|c| c.components().values().map(|c| c.descriptor.archetype))
.collect();
assert_eq!(archetypes_seen.len(), 2);
Ok(())
}
#[test]
fn compacted_leaves_mixed_chunk_alone_without_ratio() -> anyhow::Result<()> {
re_log::setup_logging();
let entity = EntityPath::from("camera");
let mut store = ChunkStore::new(
StoreId::random(StoreKind::Recording, "test_app"),
ChunkStoreConfig::ALL_DISABLED,
);
for frame in 0..4 {
store.insert_chunk(&mixed_chunk(&entity, frame, 128 * 1024))?;
}
let options = CompactionOptions {
config: ChunkStoreConfig::DEFAULT,
num_extra_passes: Some(3),
is_start_of_gop: None,
split_size_ratio: None,
};
let compacted = store.compacted(&options)?;
// Without the ratio, thick and thin stay together.
let any_mixed = compacted.iter_physical_chunks().any(|chunk| {
let archetypes: std::collections::BTreeSet<_> = chunk
.components()
.values()
.map(|c| c.descriptor.archetype)
.collect();
archetypes.len() > 1
});
assert!(
any_mixed,
"expected at least one chunk to still mix archetypes"
);
Ok(())
}
}