1use std::{collections::BTreeSet, sync::Arc};
2
3use ahash::HashMap;
4use arrow::array::Array as _;
5use itertools::Itertools as _;
6
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, EntityPath, RowId};
9
10use crate::{
11 store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreDiff,
12 ChunkStoreError, ChunkStoreEvent, ChunkStoreResult, ColumnMetadataState,
13};
14
15#[allow(unused_imports)]
17use crate::ChunkId;
18
19impl ChunkStore {
22 pub fn insert_chunk(&mut self, chunk: &Arc<Chunk>) -> ChunkStoreResult<Vec<ChunkStoreEvent>> {
31 if self.chunks_per_chunk_id.contains_key(&chunk.id()) {
32 re_log::debug_once!(
34 "Chunk #{} was inserted more than once (this has no effect)",
35 chunk.id()
36 );
37 return Ok(Vec::new());
38 }
39
40 if !chunk.is_sorted() {
41 return Err(ChunkStoreError::UnsortedChunk);
42 }
43
44 let Some(row_id_range) = chunk.row_id_range() else {
45 return Ok(Vec::new());
46 };
47
48 re_tracing::profile_function!();
49
50 self.insert_id += 1;
51
52 let mut chunk = Arc::clone(chunk);
53
54 if self.id.kind == re_log_types::StoreKind::Blueprint {
65 let patched = chunk.patched_for_blueprint_021_compat();
66 chunk = Arc::new(patched);
67 }
68
69 #[cfg(debug_assertions)]
70 for (component_name, per_desc) in chunk.components().iter() {
71 assert!(
72 per_desc.len() <= 1,
73 "[DEBUG ONLY] Insert Chunk with multiple values for component named `{component_name}`: this is currently UB\n{chunk}",
74 );
75 }
76
77 let non_compacted_chunk = Arc::clone(&chunk); let (chunk, diffs) = if chunk.is_static() {
80 re_tracing::profile_scope!("static");
82
83 let row_id_range_per_component = chunk.row_id_range_per_component();
84
85 let mut overwritten_chunk_ids = HashMap::default();
86
87 for (component_desc, list_array) in chunk.components().iter_flattened() {
88 let is_empty = list_array
89 .nulls()
90 .is_some_and(|validity| validity.is_empty());
91 if is_empty {
92 continue;
93 }
94
95 let Some((_row_id_min_for_component, row_id_max_for_component)) =
96 row_id_range_per_component
97 .get(&component_desc.component_name)
98 .and_then(|per_desc| per_desc.get(component_desc))
99 else {
100 continue;
101 };
102
103 self.static_chunk_ids_per_entity
104 .entry(chunk.entity_path().clone())
105 .or_default()
106 .entry(component_desc.component_name)
107 .and_modify(|cur_chunk_id| {
108 let cur_row_id_max_for_component = self
112 .chunks_per_chunk_id
113 .get(cur_chunk_id)
114 .map_or(RowId::ZERO, |chunk| {
115 chunk
116 .row_id_range_per_component()
117 .get(&component_desc.component_name)
118 .and_then(|per_desc| per_desc.get(component_desc))
119 .map_or(RowId::ZERO, |(_, row_id_max)| *row_id_max)
120 });
121
122 if *row_id_max_for_component > cur_row_id_max_for_component {
123 let cur_row_id_min_for_chunk = self
132 .chunks_per_chunk_id
133 .get(cur_chunk_id)
134 .and_then(|chunk| {
135 chunk.row_id_range().map(|(row_id_min, _)| row_id_min)
136 });
137
138 debug_assert!(
139 cur_row_id_min_for_chunk.is_some(),
140 "This condition cannot fail, we just want to avoid unwrapping",
141 );
142 if let Some(cur_row_id_min_for_chunk) = cur_row_id_min_for_chunk {
143 overwritten_chunk_ids
144 .insert(*cur_chunk_id, cur_row_id_min_for_chunk);
145 }
146
147 *cur_chunk_id = chunk.id();
148 }
149 })
150 .or_insert_with(|| chunk.id());
151 }
152
153 self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(&chunk);
154
155 let mut diffs = vec![ChunkStoreDiff::addition(
156 non_compacted_chunk, None, )];
159
160 debug_assert!(
163 self.static_chunk_ids_per_entity
164 .contains_key(chunk.entity_path()),
165 "This condition cannot fail, we just want to avoid unwrapping",
166 );
167 if let Some(per_component) = self.static_chunk_ids_per_entity.get(chunk.entity_path()) {
168 re_tracing::profile_scope!("static dangling checks");
169
170 for (chunk_id, chunk_row_id_min) in overwritten_chunk_ids {
183 let has_been_fully_overwritten = !per_component
184 .values()
185 .any(|cur_chunk_id| *cur_chunk_id == chunk_id);
186
187 if has_been_fully_overwritten {
188 let chunk_id_removed =
192 self.chunk_ids_per_min_row_id.remove(&chunk_row_id_min);
193 debug_assert!(chunk_id_removed.is_some());
194
195 let chunk_removed = self.chunks_per_chunk_id.remove(&chunk_id);
196 debug_assert!(chunk_removed.is_some());
197
198 if let Some(chunk_removed) = chunk_removed {
199 self.static_chunks_stats -=
200 ChunkStoreChunkStats::from_chunk(&chunk_removed);
201 diffs.push(ChunkStoreDiff::deletion(chunk_removed));
202 }
203 }
204 }
205 }
206
207 (Arc::clone(&chunk), diffs)
208 } else {
209 re_tracing::profile_scope!("temporal");
211
212 let (elected_chunk, chunk_or_compacted) = {
213 re_tracing::profile_scope!("election");
214
215 let elected_chunk = self.find_and_elect_compaction_candidate(&chunk);
216
217 let chunk_or_compacted = if let Some(elected_chunk) = &elected_chunk {
218 let chunk_rowid_min = chunk.row_id_range().map(|(min, _)| min);
219 let elected_rowid_min = elected_chunk.row_id_range().map(|(min, _)| min);
220
221 let mut compacted = if elected_rowid_min < chunk_rowid_min {
222 re_tracing::profile_scope!("concat");
223 elected_chunk.concatenated(&chunk)?
224 } else {
225 re_tracing::profile_scope!("concat");
226 chunk.concatenated(elected_chunk)?
227 };
228
229 {
230 re_tracing::profile_scope!("sort");
231 compacted.sort_if_unsorted();
232 }
233
234 re_log::trace!(
235 "compacted {} ({} rows) and {} ({} rows) together, resulting in {} ({} rows)",
236 chunk.id(),
237 re_format::format_uint(chunk.num_rows()),
238 elected_chunk.id(),
239 re_format::format_uint(elected_chunk.num_rows()),
240 compacted.id(),
241 re_format::format_uint(compacted.num_rows()),
242 );
243
244 Arc::new(compacted)
245 } else {
246 Arc::clone(&chunk)
247 };
248
249 (elected_chunk, chunk_or_compacted)
250 };
251
252 {
253 re_tracing::profile_scope!("insertion (w/ component)");
254
255 let temporal_chunk_ids_per_timeline = self
256 .temporal_chunk_ids_per_entity_per_component
257 .entry(chunk_or_compacted.entity_path().clone())
258 .or_default();
259
260 for (timeline, time_range_per_component) in
265 chunk_or_compacted.time_range_per_component()
266 {
267 let temporal_chunk_ids_per_component =
268 temporal_chunk_ids_per_timeline.entry(timeline).or_default();
269
270 for (component_name, per_desc) in time_range_per_component {
271 for (_component_desc, time_range) in per_desc {
272 let temporal_chunk_ids_per_time = temporal_chunk_ids_per_component
273 .entry(component_name)
274 .or_default();
275
276 temporal_chunk_ids_per_time.max_interval_length = u64::max(
278 temporal_chunk_ids_per_time.max_interval_length,
279 time_range.abs_length(),
280 );
281
282 temporal_chunk_ids_per_time
283 .per_start_time
284 .entry(time_range.min())
285 .or_default()
286 .insert(chunk_or_compacted.id());
287 temporal_chunk_ids_per_time
288 .per_end_time
289 .entry(time_range.max())
290 .or_default()
291 .insert(chunk_or_compacted.id());
292 }
293 }
294 }
295 }
296
297 {
298 re_tracing::profile_scope!("insertion (w/o component)");
299
300 let temporal_chunk_ids_per_timeline = self
301 .temporal_chunk_ids_per_entity
302 .entry(chunk_or_compacted.entity_path().clone())
303 .or_default();
304
305 for (timeline, time_column) in chunk_or_compacted.timelines() {
306 let temporal_chunk_ids_per_time = temporal_chunk_ids_per_timeline
307 .entry(*timeline)
308 .or_default();
309
310 let time_range = time_column.time_range();
311
312 temporal_chunk_ids_per_time.max_interval_length = u64::max(
314 temporal_chunk_ids_per_time.max_interval_length,
315 time_range.abs_length(),
316 );
317
318 temporal_chunk_ids_per_time
319 .per_start_time
320 .entry(time_range.min())
321 .or_default()
322 .insert(chunk_or_compacted.id());
323 temporal_chunk_ids_per_time
324 .per_end_time
325 .entry(time_range.max())
326 .or_default()
327 .insert(chunk_or_compacted.id());
328 }
329 }
330
331 self.temporal_chunks_stats += ChunkStoreChunkStats::from_chunk(&chunk_or_compacted);
332
333 let mut diff = ChunkStoreDiff::addition(
334 Arc::clone(&non_compacted_chunk), None, );
345 if let Some(elected_chunk) = &elected_chunk {
346 let srcs = std::iter::once((non_compacted_chunk.id(), non_compacted_chunk))
348 .chain(
349 self.remove_chunk(elected_chunk.id())
350 .into_iter()
351 .filter(|diff| diff.kind == crate::ChunkStoreDiffKind::Deletion)
352 .map(|diff| (diff.chunk.id(), diff.chunk)),
353 )
354 .collect();
355
356 diff.compacted = Some(crate::ChunkCompactionReport {
357 srcs,
358 new_chunk: chunk_or_compacted.clone(),
359 });
360 }
361
362 (chunk_or_compacted, vec![diff])
363 };
364
365 self.chunks_per_chunk_id.insert(chunk.id(), chunk.clone());
366 self.chunk_ids_per_min_row_id
367 .entry(row_id_range.0)
368 .or_default()
369 .push(chunk.id());
370
371 for (name, columns) in chunk.timelines() {
372 let new_typ = columns.timeline().typ();
373 if let Some(old_typ) = self.time_type_registry.insert(*name, new_typ) {
374 if old_typ != new_typ {
375 re_log::warn_once!(
376 "Timeline '{name}' changed type from {old_typ:?} to {new_typ:?}. \
377 Rerun does not support using different types for the same timeline.",
378 );
379 }
380 }
381 }
382
383 for (component_descr, list_array) in chunk.components().iter_flattened() {
384 if let Some(old_typ) = self
385 .type_registry
386 .insert(component_descr.component_name, list_array.value_type())
387 {
388 if old_typ != list_array.value_type() {
389 re_log::warn_once!(
390 "Component column '{}' changed type from {old_typ:?} to {:?}",
391 component_descr.component_name,
392 list_array.value_type()
393 );
394 }
395 }
396
397 let column_metadata_state = self
398 .per_column_metadata
399 .entry(chunk.entity_path().clone())
400 .or_default()
401 .entry(component_descr.component_name)
402 .or_default()
403 .entry(component_descr.clone())
404 .or_insert(ColumnMetadataState {
405 is_semantically_empty: true,
406 });
407 {
408 let is_semantically_empty =
409 re_arrow_util::is_list_array_semantically_empty(list_array);
410
411 column_metadata_state.is_semantically_empty &= is_semantically_empty;
412 }
413 }
414
415 let events = if self.config.enable_changelog {
416 let events: Vec<_> = diffs
417 .into_iter()
418 .map(|diff| ChunkStoreEvent {
419 store_id: self.id.clone(),
420 store_generation: self.generation(),
421 event_id: self
422 .event_id
423 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
424 diff,
425 })
426 .collect();
427
428 Self::on_events(&events);
429
430 events
431 } else {
432 Vec::new()
433 };
434
435 Ok(events)
436 }
437
438 fn find_and_elect_compaction_candidate(&self, chunk: &Arc<Chunk>) -> Option<Arc<Chunk>> {
449 re_tracing::profile_function!();
450
451 let mut candidates_below_threshold: HashMap<ChunkId, bool> = HashMap::default();
452 let mut check_if_chunk_below_threshold =
453 |store: &Self, candidate_chunk_id: ChunkId| -> bool {
454 let ChunkStoreConfig {
455 enable_changelog: _,
456 chunk_max_bytes,
457 chunk_max_rows,
458 chunk_max_rows_if_unsorted,
459 } = store.config;
460
461 *candidates_below_threshold
462 .entry(candidate_chunk_id)
463 .or_insert_with(|| {
464 store
465 .chunks_per_chunk_id
466 .get(&candidate_chunk_id)
467 .is_some_and(|candidate| {
468 if !chunk.concatenable(candidate) {
469 return false;
470 }
471
472 let total_bytes = <Chunk as SizeBytes>::total_size_bytes(chunk)
473 + <Chunk as SizeBytes>::total_size_bytes(candidate);
474 let is_below_bytes_threshold = total_bytes <= chunk_max_bytes;
475
476 let total_rows = (chunk.num_rows() + candidate.num_rows()) as u64;
477 let is_below_rows_threshold = if candidate.is_time_sorted() {
478 total_rows <= chunk_max_rows
479 } else {
480 total_rows <= chunk_max_rows_if_unsorted
481 };
482
483 is_below_bytes_threshold && is_below_rows_threshold
484 })
485 })
486 };
487
488 let mut candidates: HashMap<ChunkId, u64> = HashMap::default();
489
490 let temporal_chunk_ids_per_timeline = self
491 .temporal_chunk_ids_per_entity_per_component
492 .get(chunk.entity_path())?;
493
494 for (timeline, time_range_per_component) in chunk.time_range_per_component() {
495 let Some(temporal_chunk_ids_per_component) =
496 temporal_chunk_ids_per_timeline.get(&timeline)
497 else {
498 continue;
499 };
500
501 for (component_name, per_desc) in time_range_per_component {
502 for (_component_desc, time_range) in per_desc {
503 let Some(temporal_chunk_ids_per_time) =
504 temporal_chunk_ids_per_component.get(&component_name)
505 else {
506 continue;
507 };
508
509 {
510 if let Some((_data_time, chunk_id_set)) = temporal_chunk_ids_per_time
512 .per_start_time
513 .range(..time_range.min())
514 .next_back()
515 {
516 for &chunk_id in chunk_id_set {
517 if check_if_chunk_below_threshold(self, chunk_id) {
518 *candidates.entry(chunk_id).or_default() += 1;
519 }
520 }
521 }
522
523 if let Some((_data_time, chunk_id_set)) = temporal_chunk_ids_per_time
525 .per_start_time
526 .range(time_range.max().inc()..)
527 .next()
528 {
529 for &chunk_id in chunk_id_set {
530 if check_if_chunk_below_threshold(self, chunk_id) {
531 *candidates.entry(chunk_id).or_default() += 1;
532 }
533 }
534 }
535
536 let chunk_id_set = temporal_chunk_ids_per_time
537 .per_start_time
538 .get(&time_range.min());
539
540 for chunk_id in chunk_id_set.iter().flat_map(|set| set.iter().copied()) {
542 if check_if_chunk_below_threshold(self, chunk_id) {
543 *candidates.entry(chunk_id).or_default() += 2;
544 }
545 }
546 }
547 }
548 }
549 }
550
551 debug_assert!(!candidates.contains_key(&chunk.id()));
552
553 let mut candidates = candidates.into_iter().collect_vec();
554 candidates.sort_by_key(|(_chunk_id, points)| *points);
555 candidates.reverse();
556
557 candidates
558 .into_iter()
559 .find_map(|(chunk_id, _points)| self.chunks_per_chunk_id.get(&chunk_id).map(Arc::clone))
560 }
561
562 pub fn drop_entity_path(&mut self, entity_path: &EntityPath) -> Vec<ChunkStoreEvent> {
568 re_tracing::profile_function!(entity_path.to_string());
569
570 self.gc_id += 1; let generation = self.generation();
573
574 let Self {
575 id,
576 info: _,
577 config: _,
578 time_type_registry: _,
579 type_registry: _,
580 per_column_metadata,
581 chunks_per_chunk_id,
582 chunk_ids_per_min_row_id,
583 temporal_chunk_ids_per_entity_per_component,
584 temporal_chunk_ids_per_entity,
585 temporal_chunks_stats,
586 static_chunk_ids_per_entity,
587 static_chunks_stats,
588 insert_id: _,
589 gc_id: _,
590 event_id,
591 } = self;
592
593 per_column_metadata.remove(entity_path);
594
595 let dropped_static_chunks = {
596 let dropped_static_chunk_ids: BTreeSet<_> = static_chunk_ids_per_entity
597 .remove(entity_path)
598 .unwrap_or_default()
599 .into_values()
600 .collect();
601
602 chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
603 chunk_ids.retain(|chunk_id| !dropped_static_chunk_ids.contains(chunk_id));
604 !chunk_ids.is_empty()
605 });
606
607 dropped_static_chunk_ids.into_iter()
608 };
609
610 let dropped_temporal_chunks = {
611 temporal_chunk_ids_per_entity_per_component.remove(entity_path);
612
613 let dropped_temporal_chunk_ids: BTreeSet<_> = temporal_chunk_ids_per_entity
614 .remove(entity_path)
615 .unwrap_or_default()
616 .into_values()
617 .flat_map(|temporal_chunk_ids_per_time| {
618 let ChunkIdSetPerTime {
619 max_interval_length: _,
620 per_start_time,
621 per_end_time: _, } = temporal_chunk_ids_per_time;
623
624 per_start_time
625 .into_values()
626 .flat_map(|chunk_ids| chunk_ids.into_iter())
627 })
628 .collect();
629
630 chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
631 chunk_ids.retain(|chunk_id| !dropped_temporal_chunk_ids.contains(chunk_id));
632 !chunk_ids.is_empty()
633 });
634
635 dropped_temporal_chunk_ids.into_iter()
636 };
637
638 let dropped_static_chunks = dropped_static_chunks
639 .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
640 .inspect(|chunk| {
641 *static_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
642 })
643 .collect_vec();
645
646 let dropped_temporal_chunks = dropped_temporal_chunks
647 .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
648 .inspect(|chunk| {
649 *temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
650 });
651
652 if self.config.enable_changelog {
653 let events: Vec<_> = dropped_static_chunks
654 .into_iter()
655 .chain(dropped_temporal_chunks)
656 .map(ChunkStoreDiff::deletion)
657 .map(|diff| ChunkStoreEvent {
658 store_id: id.clone(),
659 store_generation: generation.clone(),
660 event_id: event_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
661 diff,
662 })
663 .collect();
664
665 Self::on_events(&events);
666
667 events
668 } else {
669 Vec::new()
670 }
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use re_chunk::{TimePoint, Timeline};
677 use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
678 use similar_asserts::assert_eq;
679
680 use crate::ChunkStoreDiffKind;
681
682 use super::*;
683
684 #[test]
689 fn compaction_simple() -> anyhow::Result<()> {
690 re_log::setup_logging();
691
692 let mut store = ChunkStore::new(
693 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
694 Default::default(),
695 );
696
697 let entity_path = EntityPath::from("this/that");
698
699 let row_id1 = RowId::new();
700 let row_id2 = RowId::new();
701 let row_id3 = RowId::new();
702 let row_id4 = RowId::new();
703 let row_id5 = RowId::new();
704 let row_id6 = RowId::new();
705 let row_id7 = RowId::new();
706 let row_id8 = RowId::new();
707 let row_id9 = RowId::new();
708 let row_id10 = RowId::new();
709
710 let timepoint1 = [(Timeline::new_sequence("frame"), 1)];
711 let timepoint2 = [(Timeline::new_sequence("frame"), 3)];
712 let timepoint3 = [(Timeline::new_sequence("frame"), 5)];
713 let timepoint4 = [(Timeline::new_sequence("frame"), 7)];
714 let timepoint5 = [(Timeline::new_sequence("frame"), 9)];
715
716 let points1 = &[MyPoint::new(1.0, 1.0)];
717 let points2 = &[MyPoint::new(2.0, 2.0)];
718 let points3 = &[MyPoint::new(3.0, 3.0)];
719 let points4 = &[MyPoint::new(4.0, 4.0)];
720 let points5 = &[MyPoint::new(5.0, 5.0)];
721
722 let chunk1 = Chunk::builder(entity_path.clone())
723 .with_component_batches(row_id1, timepoint1, [points1 as _])
724 .with_component_batches(row_id2, timepoint2, [points2 as _])
725 .with_component_batches(row_id3, timepoint3, [points3 as _])
726 .build()?;
727 let chunk2 = Chunk::builder(entity_path.clone())
728 .with_component_batches(row_id4, timepoint4, [points4 as _])
729 .with_component_batches(row_id5, timepoint5, [points5 as _])
730 .build()?;
731 let chunk3 = Chunk::builder(entity_path.clone())
732 .with_component_batches(row_id6, timepoint1, [points1 as _])
733 .with_component_batches(row_id7, timepoint2, [points2 as _])
734 .with_component_batches(row_id8, timepoint3, [points3 as _])
735 .build()?;
736 let chunk4 = Chunk::builder(entity_path.clone())
737 .with_component_batches(row_id9, timepoint4, [points4 as _])
738 .with_component_batches(row_id10, timepoint5, [points5 as _])
739 .build()?;
740
741 let chunk1 = Arc::new(chunk1);
742 let chunk2 = Arc::new(chunk2);
743 let chunk3 = Arc::new(chunk3);
744 let chunk4 = Arc::new(chunk4);
745
746 eprintln!("---\n{store}\ninserting {}", chunk1.id());
747
748 store.insert_chunk(&chunk1)?;
749
750 eprintln!("---\n{store}\ninserting {}", chunk2.id());
751
752 store.insert_chunk(&chunk2)?;
753
754 eprintln!("---\n{store}\ninserting {}", chunk3.id());
755
756 store.insert_chunk(&chunk3)?;
757
758 eprintln!("---\n{store}\ninserting {}", chunk4.id());
759
760 store.insert_chunk(&chunk4)?;
761
762 eprintln!("---\n{store}");
763
764 let got = store
765 .chunks_per_chunk_id
766 .first_key_value()
767 .map(|(_id, chunk)| chunk)
768 .unwrap();
769
770 let expected = Chunk::builder_with_id(got.id(), entity_path.clone())
771 .with_component_batches(row_id1, timepoint1, [points1 as _])
772 .with_component_batches(row_id2, timepoint2, [points2 as _])
773 .with_component_batches(row_id3, timepoint3, [points3 as _])
774 .with_component_batches(row_id4, timepoint4, [points4 as _])
775 .with_component_batches(row_id5, timepoint5, [points5 as _])
776 .with_component_batches(row_id6, timepoint1, [points1 as _])
777 .with_component_batches(row_id7, timepoint2, [points2 as _])
778 .with_component_batches(row_id8, timepoint3, [points3 as _])
779 .with_component_batches(row_id9, timepoint4, [points4 as _])
780 .with_component_batches(row_id10, timepoint5, [points5 as _])
781 .build()?;
782
783 assert_eq!(1, store.chunks_per_chunk_id.len());
784 assert_eq!(
785 expected,
786 **got,
787 "{}",
788 similar_asserts::SimpleDiff::from_str(
789 &format!("{expected}"),
790 &format!("{got}"),
791 "expected",
792 "got",
793 ),
794 );
795
796 Ok(())
797 }
798
799 #[test]
800 fn static_overwrites() -> anyhow::Result<()> {
801 re_log::setup_logging();
802
803 let mut store = ChunkStore::new(
804 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
805 Default::default(),
806 );
807
808 let entity_path = EntityPath::from("this/that");
809
810 let row_id1_1 = RowId::new();
811 let row_id2_1 = RowId::new();
812 let row_id2_2 = RowId::new();
813
814 let timepoint_static = TimePoint::default();
815
816 let points1 = &[MyPoint::new(1.0, 1.0)];
817 let colors1 = &[MyColor::from_rgb(1, 1, 1)];
818 let labels1 = &[MyLabel("111".to_owned())];
819
820 let points2 = &[MyPoint::new(2.0, 2.0)];
821 let colors2 = &[MyColor::from_rgb(2, 2, 2)];
822 let labels2 = &[MyLabel("222".to_owned())];
823
824 let chunk1 = Chunk::builder(entity_path.clone())
825 .with_component_batches(
826 row_id1_1,
827 timepoint_static.clone(),
828 [points1 as _, colors1 as _, labels1 as _],
829 )
830 .build()?;
831 let chunk2 = Chunk::builder(entity_path.clone())
832 .with_component_batches(
833 row_id2_1,
834 timepoint_static.clone(),
835 [points2 as _, colors2 as _],
836 )
837 .build()?;
838 let chunk3 = Chunk::builder(entity_path.clone())
839 .with_component_batches(row_id2_2, timepoint_static, [labels2 as _])
840 .build()?;
841
842 let chunk1 = Arc::new(chunk1);
843 let chunk2 = Arc::new(chunk2);
844 let chunk3 = Arc::new(chunk3);
845
846 let events = store.insert_chunk(&chunk1)?;
847 assert!(
848 events.len() == 1
849 && events[0].chunk.id() == chunk1.id()
850 && events[0].kind == ChunkStoreDiffKind::Addition,
851 "the first write should result in the addition of chunk1 and nothing else"
852 );
853
854 let events = store.insert_chunk(&chunk2)?;
855 assert!(
856 events.len() == 1
857 && events[0].chunk.id() == chunk2.id()
858 && events[0].kind == ChunkStoreDiffKind::Addition,
859 "the second write should result in the addition of chunk2 and nothing else"
860 );
861
862 let stats_before = store.stats();
863 {
864 let ChunkStoreChunkStats {
865 num_chunks,
866 total_size_bytes: _,
867 num_rows,
868 num_events,
869 } = stats_before.static_chunks;
870 assert_eq!(2, num_chunks);
871 assert_eq!(2, num_rows);
872 assert_eq!(5, num_events);
873 }
874
875 let events = store.insert_chunk(&chunk3)?;
876 assert!(
877 events.len() == 2
878 && events[0].chunk.id() == chunk3.id()
879 && events[0].kind == ChunkStoreDiffKind::Addition
880 && events[1].chunk.id() == chunk1.id()
881 && events[1].kind == ChunkStoreDiffKind::Deletion,
882 "the final write should result in the addition of chunk3 _and_ the deletion of the now fully overwritten chunk1"
883 );
884
885 let stats_after = store.stats();
886 {
887 let ChunkStoreChunkStats {
888 num_chunks,
889 total_size_bytes: _,
890 num_rows,
891 num_events,
892 } = stats_after.static_chunks;
893 assert_eq!(2, num_chunks);
894 assert_eq!(2, num_rows);
895 assert_eq!(3, num_events);
896 }
897
898 Ok(())
899 }
900}