reifydb_cdc/compact/
actor.rs1use std::{sync::Arc, time::Duration};
5
6use reifydb_core::{
7 common::CommitVersion,
8 interface::catalog::config::{ConfigKey, GetConfig},
9};
10use reifydb_runtime::actor::{
11 context::Context,
12 system::ActorConfig,
13 traits::{Actor, Directive},
14};
15use tracing::{debug, error, trace};
16
17use crate::{produce::watermark::CdcProducerWatermark, storage::sqlite::storage::SqliteCdcStorage};
18
19pub enum CompactMessage {
20 Tick,
21
22 CompactAll,
23}
24
25pub struct CompactActor {
26 config: Arc<dyn GetConfig>,
27 store: SqliteCdcStorage,
28 watermark: CdcProducerWatermark,
29}
30
31impl CompactActor {
32 pub fn new(config: Arc<dyn GetConfig>, store: SqliteCdcStorage, watermark: CdcProducerWatermark) -> Self {
33 Self {
34 config,
35 store,
36 watermark,
37 }
38 }
39
40 fn read_block_size(&self) -> usize {
41 self.config.get_config_uint8(ConfigKey::CdcCompactBlockSize) as usize
42 }
43
44 fn read_safety_lag(&self) -> u64 {
45 self.config.get_config_uint8(ConfigKey::CdcCompactSafetyLag)
46 }
47
48 fn read_max_blocks_per_tick(&self) -> usize {
49 self.config.get_config_uint8(ConfigKey::CdcCompactMaxBlocksPerTick) as usize
50 }
51
52 fn read_interval(&self) -> Duration {
53 self.config.get_config_duration(ConfigKey::CdcCompactInterval)
54 }
55
56 fn read_zstd_level(&self) -> u8 {
57 self.config.get_config_uint1(ConfigKey::CdcCompactZstdLevel)
58 }
59}
60
61impl Actor for CompactActor {
62 type State = ();
63 type Message = CompactMessage;
64
65 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
66 let interval = self.read_interval();
67 debug!("[CdcCompact] started: interval={:?}", interval);
68 ctx.schedule_once(interval, || CompactMessage::Tick);
69 }
70
71 fn handle(&self, _state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
72 if ctx.is_cancelled() {
73 debug!("[CdcCompact] stopped");
74 return Directive::Stop;
75 }
76 match msg {
77 CompactMessage::Tick => self.on_tick(ctx),
78 CompactMessage::CompactAll => self.on_compact_all(),
79 }
80 Directive::Continue
81 }
82
83 fn config(&self) -> ActorConfig {
84 ActorConfig::new().mailbox_capacity(8)
85 }
86}
87
88impl CompactActor {
89 #[inline]
90 fn on_tick(&self, ctx: &Context<CompactMessage>) {
91 let block_size = self.read_block_size();
92 let safety_lag = self.read_safety_lag();
93 let max_blocks = self.read_max_blocks_per_tick();
94 let zstd_level = self.read_zstd_level();
95 let watermark = self.watermark.get();
96
97 let produced = self.run_tick_loop(block_size, safety_lag, zstd_level, watermark, max_blocks);
98 if produced > 0 {
99 debug!("[CdcCompact] produced {produced} block(s) this tick");
100 }
101
102 ctx.schedule_once(self.read_interval(), || CompactMessage::Tick);
103 }
104
105 #[inline]
106 fn run_tick_loop(
107 &self,
108 block_size: usize,
109 safety_lag: u64,
110 zstd_level: u8,
111 watermark: CommitVersion,
112 max_blocks: usize,
113 ) -> usize {
114 let mut produced = 0usize;
115 while produced < max_blocks {
116 match self.store.compact_oldest(block_size, safety_lag, zstd_level, watermark) {
117 Ok(Some(s)) => {
118 trace!(
119 "[CdcCompact] block: [{}..{}] entries={} bytes={}",
120 s.min_version.0, s.max_version.0, s.num_entries, s.compressed_bytes,
121 );
122 produced += 1;
123 }
124 Ok(None) => break,
125 Err(e) => {
126 error!("[CdcCompact] {e}");
127 break;
128 }
129 }
130 }
131 produced
132 }
133
134 #[inline]
135 fn on_compact_all(&self) {
136 let block_size = self.read_block_size();
137 let zstd_level = self.read_zstd_level();
138 let watermark = self.watermark.get();
139 match self.store.compact_all(block_size, zstd_level, watermark) {
140 Ok(s) => debug!("[CdcCompact] CompactAll produced {} block(s)", s.len()),
141 Err(e) => error!("[CdcCompact] CompactAll error: {e}"),
142 }
143 }
144}