Skip to main content

reifydb_cdc/compact/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{sync::Arc, time::Duration};
5
6use reifydb_core::{
7	common::CommitVersion,
8	interface::catalog::config::{ConfigKey, GetConfig},
9};
10use reifydb_runtime::actor::{
11	context::Context,
12	system::ActorConfig,
13	traits::{Actor, Directive},
14};
15use tracing::{debug, error, trace};
16
17use crate::{produce::watermark::CdcProducerWatermark, storage::sqlite::storage::SqliteCdcStorage};
18
19pub enum CompactMessage {
20	Tick,
21
22	CompactAll,
23}
24
25pub struct CompactActor {
26	config: Arc<dyn GetConfig>,
27	store: SqliteCdcStorage,
28	watermark: CdcProducerWatermark,
29}
30
31impl CompactActor {
32	pub fn new(config: Arc<dyn GetConfig>, store: SqliteCdcStorage, watermark: CdcProducerWatermark) -> Self {
33		Self {
34			config,
35			store,
36			watermark,
37		}
38	}
39
40	fn read_block_size(&self) -> usize {
41		self.config.get_config_uint8(ConfigKey::CdcCompactBlockSize) as usize
42	}
43
44	fn read_safety_lag(&self) -> u64 {
45		self.config.get_config_uint8(ConfigKey::CdcCompactSafetyLag)
46	}
47
48	fn read_max_blocks_per_tick(&self) -> usize {
49		self.config.get_config_uint8(ConfigKey::CdcCompactMaxBlocksPerTick) as usize
50	}
51
52	fn read_interval(&self) -> Duration {
53		self.config.get_config_duration(ConfigKey::CdcCompactInterval)
54	}
55
56	fn read_zstd_level(&self) -> u8 {
57		self.config.get_config_uint1(ConfigKey::CdcCompactZstdLevel)
58	}
59}
60
61impl Actor for CompactActor {
62	type State = ();
63	type Message = CompactMessage;
64
65	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
66		let interval = self.read_interval();
67		debug!("[CdcCompact] started: interval={:?}", interval);
68		ctx.schedule_once(interval, || CompactMessage::Tick);
69	}
70
71	fn handle(&self, _state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
72		if ctx.is_cancelled() {
73			debug!("[CdcCompact] stopped");
74			return Directive::Stop;
75		}
76		match msg {
77			CompactMessage::Tick => self.on_tick(ctx),
78			CompactMessage::CompactAll => self.on_compact_all(),
79		}
80		Directive::Continue
81	}
82
83	fn config(&self) -> ActorConfig {
84		ActorConfig::new().mailbox_capacity(8)
85	}
86}
87
88impl CompactActor {
89	#[inline]
90	fn on_tick(&self, ctx: &Context<CompactMessage>) {
91		let block_size = self.read_block_size();
92		let safety_lag = self.read_safety_lag();
93		let max_blocks = self.read_max_blocks_per_tick();
94		let zstd_level = self.read_zstd_level();
95		let watermark = self.watermark.get();
96
97		let produced = self.run_tick_loop(block_size, safety_lag, zstd_level, watermark, max_blocks);
98		if produced > 0 {
99			debug!("[CdcCompact] produced {produced} block(s) this tick");
100		}
101
102		ctx.schedule_once(self.read_interval(), || CompactMessage::Tick);
103	}
104
105	#[inline]
106	fn run_tick_loop(
107		&self,
108		block_size: usize,
109		safety_lag: u64,
110		zstd_level: u8,
111		watermark: CommitVersion,
112		max_blocks: usize,
113	) -> usize {
114		let mut produced = 0usize;
115		while produced < max_blocks {
116			match self.store.compact_oldest(block_size, safety_lag, zstd_level, watermark) {
117				Ok(Some(s)) => {
118					trace!(
119						"[CdcCompact] block: [{}..{}] entries={} bytes={}",
120						s.min_version.0, s.max_version.0, s.num_entries, s.compressed_bytes,
121					);
122					produced += 1;
123				}
124				Ok(None) => break,
125				Err(e) => {
126					error!("[CdcCompact] {e}");
127					break;
128				}
129			}
130		}
131		produced
132	}
133
134	#[inline]
135	fn on_compact_all(&self) {
136		let block_size = self.read_block_size();
137		let zstd_level = self.read_zstd_level();
138		let watermark = self.watermark.get();
139		match self.store.compact_all(block_size, zstd_level, watermark) {
140			Ok(s) => debug!("[CdcCompact] CompactAll produced {} block(s)", s.len()),
141			Err(e) => error!("[CdcCompact] CompactAll error: {e}"),
142		}
143	}
144}