reifydb_cdc/compact/
actor.rs1use std::{sync::Arc, time::Duration};
5
6use reifydb_core::{
7 common::CommitVersion,
8 interface::catalog::config::{ConfigKey, GetConfig},
9};
10use reifydb_runtime::actor::{
11 context::Context,
12 system::ActorConfig,
13 traits::{Actor, Directive},
14};
15use tracing::{debug, error, trace};
16
17use crate::{produce::watermark::CdcProducerWatermark, storage::sqlite::storage::SqliteCdcStorage};
18
19pub enum CompactMessage {
20 Tick,
23 CompactAll,
26}
27
28pub struct CompactActor {
29 config: Arc<dyn GetConfig>,
30 store: SqliteCdcStorage,
31 watermark: CdcProducerWatermark,
32}
33
34impl CompactActor {
35 pub fn new(config: Arc<dyn GetConfig>, store: SqliteCdcStorage, watermark: CdcProducerWatermark) -> Self {
36 Self {
37 config,
38 store,
39 watermark,
40 }
41 }
42
43 fn read_block_size(&self) -> usize {
44 self.config.get_config_uint8(ConfigKey::CdcCompactBlockSize) as usize
45 }
46
47 fn read_safety_lag(&self) -> u64 {
48 self.config.get_config_uint8(ConfigKey::CdcCompactSafetyLag)
49 }
50
51 fn read_max_blocks_per_tick(&self) -> usize {
52 self.config.get_config_uint8(ConfigKey::CdcCompactMaxBlocksPerTick) as usize
53 }
54
55 fn read_interval(&self) -> Duration {
56 self.config.get_config_duration(ConfigKey::CdcCompactInterval)
57 }
58
59 fn read_zstd_level(&self) -> u8 {
60 self.config.get_config_uint1(ConfigKey::CdcCompactZstdLevel)
61 }
62}
63
64impl Actor for CompactActor {
65 type State = ();
66 type Message = CompactMessage;
67
68 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
69 let interval = self.read_interval();
70 debug!("[CdcCompact] started: interval={:?}", interval);
71 ctx.schedule_once(interval, || CompactMessage::Tick);
72 }
73
74 fn handle(&self, _state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
75 if ctx.is_cancelled() {
76 debug!("[CdcCompact] stopped");
77 return Directive::Stop;
78 }
79 match msg {
80 CompactMessage::Tick => self.on_tick(ctx),
81 CompactMessage::CompactAll => self.on_compact_all(),
82 }
83 Directive::Continue
84 }
85
86 fn config(&self) -> ActorConfig {
87 ActorConfig::new().mailbox_capacity(8)
88 }
89}
90
91impl CompactActor {
92 #[inline]
93 fn on_tick(&self, ctx: &Context<CompactMessage>) {
94 let block_size = self.read_block_size();
95 let safety_lag = self.read_safety_lag();
96 let max_blocks = self.read_max_blocks_per_tick();
97 let zstd_level = self.read_zstd_level();
98 let watermark = self.watermark.get();
99
100 let produced = self.run_tick_loop(block_size, safety_lag, zstd_level, watermark, max_blocks);
101 if produced > 0 {
102 debug!("[CdcCompact] produced {produced} block(s) this tick");
103 }
104
105 ctx.schedule_once(self.read_interval(), || CompactMessage::Tick);
106 }
107
108 #[inline]
109 fn run_tick_loop(
110 &self,
111 block_size: usize,
112 safety_lag: u64,
113 zstd_level: u8,
114 watermark: CommitVersion,
115 max_blocks: usize,
116 ) -> usize {
117 let mut produced = 0usize;
118 while produced < max_blocks {
119 match self.store.compact_oldest(block_size, safety_lag, zstd_level, watermark) {
120 Ok(Some(s)) => {
121 trace!(
122 "[CdcCompact] block: [{}..{}] entries={} bytes={}",
123 s.min_version.0, s.max_version.0, s.num_entries, s.compressed_bytes,
124 );
125 produced += 1;
126 }
127 Ok(None) => break,
128 Err(e) => {
129 error!("[CdcCompact] {e}");
130 break;
131 }
132 }
133 }
134 produced
135 }
136
137 #[inline]
138 fn on_compact_all(&self) {
139 let block_size = self.read_block_size();
140 let zstd_level = self.read_zstd_level();
141 let watermark = self.watermark.get();
142 match self.store.compact_all(block_size, zstd_level, watermark) {
143 Ok(s) => debug!("[CdcCompact] CompactAll produced {} block(s)", s.len()),
144 Err(e) => error!("[CdcCompact] CompactAll error: {e}"),
145 }
146 }
147}