Skip to main content

reifydb_cdc/compact/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{sync::Arc, time::Duration};
5
6use reifydb_core::{
7	common::CommitVersion,
8	interface::catalog::config::{ConfigKey, GetConfig},
9};
10use reifydb_runtime::actor::{
11	context::Context,
12	system::ActorConfig,
13	traits::{Actor, Directive},
14};
15use tracing::{debug, error, trace};
16
17use crate::{produce::watermark::CdcProducerWatermark, storage::sqlite::storage::SqliteCdcStorage};
18
19pub enum CompactMessage {
20	/// Periodic compaction: pack up to `CdcCompactMaxBlocksPerTick` blocks
21	/// respecting `CdcCompactSafetyLag`. Reschedules itself.
22	Tick,
23	/// Drain everything compactable, ignoring safety lag and allowing a
24	/// partial final block. Admin/test trigger. Fire-and-forget.
25	CompactAll,
26}
27
28pub struct CompactActor {
29	config: Arc<dyn GetConfig>,
30	store: SqliteCdcStorage,
31	watermark: CdcProducerWatermark,
32}
33
34impl CompactActor {
35	pub fn new(config: Arc<dyn GetConfig>, store: SqliteCdcStorage, watermark: CdcProducerWatermark) -> Self {
36		Self {
37			config,
38			store,
39			watermark,
40		}
41	}
42
43	fn read_block_size(&self) -> usize {
44		self.config.get_config_uint8(ConfigKey::CdcCompactBlockSize) as usize
45	}
46
47	fn read_safety_lag(&self) -> u64 {
48		self.config.get_config_uint8(ConfigKey::CdcCompactSafetyLag)
49	}
50
51	fn read_max_blocks_per_tick(&self) -> usize {
52		self.config.get_config_uint8(ConfigKey::CdcCompactMaxBlocksPerTick) as usize
53	}
54
55	fn read_interval(&self) -> Duration {
56		self.config.get_config_duration(ConfigKey::CdcCompactInterval)
57	}
58
59	fn read_zstd_level(&self) -> u8 {
60		self.config.get_config_uint1(ConfigKey::CdcCompactZstdLevel)
61	}
62}
63
64impl Actor for CompactActor {
65	type State = ();
66	type Message = CompactMessage;
67
68	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
69		let interval = self.read_interval();
70		debug!("[CdcCompact] started: interval={:?}", interval);
71		ctx.schedule_once(interval, || CompactMessage::Tick);
72	}
73
74	fn handle(&self, _state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
75		if ctx.is_cancelled() {
76			debug!("[CdcCompact] stopped");
77			return Directive::Stop;
78		}
79		match msg {
80			CompactMessage::Tick => self.on_tick(ctx),
81			CompactMessage::CompactAll => self.on_compact_all(),
82		}
83		Directive::Continue
84	}
85
86	fn config(&self) -> ActorConfig {
87		ActorConfig::new().mailbox_capacity(8)
88	}
89}
90
91impl CompactActor {
92	#[inline]
93	fn on_tick(&self, ctx: &Context<CompactMessage>) {
94		let block_size = self.read_block_size();
95		let safety_lag = self.read_safety_lag();
96		let max_blocks = self.read_max_blocks_per_tick();
97		let zstd_level = self.read_zstd_level();
98		let watermark = self.watermark.get();
99
100		let produced = self.run_tick_loop(block_size, safety_lag, zstd_level, watermark, max_blocks);
101		if produced > 0 {
102			debug!("[CdcCompact] produced {produced} block(s) this tick");
103		}
104
105		ctx.schedule_once(self.read_interval(), || CompactMessage::Tick);
106	}
107
108	#[inline]
109	fn run_tick_loop(
110		&self,
111		block_size: usize,
112		safety_lag: u64,
113		zstd_level: u8,
114		watermark: CommitVersion,
115		max_blocks: usize,
116	) -> usize {
117		let mut produced = 0usize;
118		while produced < max_blocks {
119			match self.store.compact_oldest(block_size, safety_lag, zstd_level, watermark) {
120				Ok(Some(s)) => {
121					trace!(
122						"[CdcCompact] block: [{}..{}] entries={} bytes={}",
123						s.min_version.0, s.max_version.0, s.num_entries, s.compressed_bytes,
124					);
125					produced += 1;
126				}
127				Ok(None) => break,
128				Err(e) => {
129					error!("[CdcCompact] {e}");
130					break;
131				}
132			}
133		}
134		produced
135	}
136
137	#[inline]
138	fn on_compact_all(&self) {
139		let block_size = self.read_block_size();
140		let zstd_level = self.read_zstd_level();
141		let watermark = self.watermark.get();
142		match self.store.compact_all(block_size, zstd_level, watermark) {
143			Ok(s) => debug!("[CdcCompact] CompactAll produced {} block(s)", s.len()),
144			Err(e) => error!("[CdcCompact] CompactAll error: {e}"),
145		}
146	}
147}