Skip to main content

reifydb_store_multi/store/
worker.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Background worker for deferred drop operations.
5//!
6//! This module provides an actor-based drop processing system that executes
7//! version cleanup operations off the critical commit path.
8//!
9//! The actor model is platform-agnostic:
10//! - **Native**: Runs on its own OS thread, processes messages from a channel
11//! - **WASM**: Messages are processed inline (synchronously) when sent
12
13use std::{collections::HashMap, time::Duration};
14
15use reifydb_core::{
16	common::CommitVersion,
17	encoded::key::EncodedKey,
18	event::{
19		EventBus,
20		metric::{StorageDrop, StorageStatsRecordedEvent},
21	},
22};
23use reifydb_runtime::{
24	actor::{
25		context::Context,
26		mailbox::ActorRef,
27		system::{ActorConfig, ActorSystem},
28		timers::TimerHandle,
29		traits::{Actor, Directive},
30	},
31	clock::{Clock, Instant},
32};
33use reifydb_type::util::cowvec::CowVec;
34use tracing::{Span, debug, error, instrument};
35
36use super::drop::find_keys_to_drop;
37use crate::{
38	hot::storage::HotStorage,
39	tier::{EntryKind, TierStorage},
40};
41
42/// Configuration for the drop worker.
43#[derive(Debug, Clone)]
44pub struct DropWorkerConfig {
45	/// How many drop requests to batch before executing.
46	pub batch_size: usize,
47	/// Maximum time to wait before flushing a partial batch.
48	pub flush_interval: Duration,
49}
50
51impl Default for DropWorkerConfig {
52	fn default() -> Self {
53		Self {
54			batch_size: 100,
55			flush_interval: Duration::from_millis(50),
56		}
57	}
58}
59
60/// A request to drop old versions of a key.
61#[derive(Debug, Clone)]
62pub struct DropRequest {
63	/// The table containing the key.
64	pub table: EntryKind,
65	/// The logical key (without version suffix).
66	pub key: CowVec<u8>,
67	/// Drop versions below this threshold (if Some).
68	pub up_to_version: Option<CommitVersion>,
69	/// Keep this many most recent versions (if Some).
70	pub keep_last_versions: Option<usize>,
71	/// The commit version that created this drop request.
72	pub commit_version: CommitVersion,
73	/// A version being written in the same batch (to avoid race).
74	pub pending_version: Option<CommitVersion>,
75}
76
77/// Messages for the drop actor.
78#[derive(Clone)]
79pub enum DropMessage {
80	/// A single drop request to process.
81	Request(DropRequest),
82	/// A batch of drop requests to process.
83	Batch(Vec<DropRequest>),
84	/// Periodic tick for flushing batches.
85	Tick,
86	/// Shutdown the actor.
87	Shutdown,
88}
89
90/// Actor that processes drop operations asynchronously.
91pub struct DropActor {
92	storage: HotStorage,
93	event_bus: EventBus,
94	config: DropWorkerConfig,
95	clock: Clock,
96}
97
98/// State for the drop actor.
99pub struct DropActorState {
100	/// Pending requests waiting to be processed.
101	pending_requests: Vec<DropRequest>,
102	/// Last time we flushed the batch.
103	last_flush: Instant,
104	/// Handle to the periodic timer (for cleanup).
105	_timer_handle: Option<TimerHandle>,
106	/// Number of flushes since startup, used to schedule periodic maintenance.
107	flush_count: u64,
108}
109
110impl DropActor {
111	pub fn new(config: DropWorkerConfig, storage: HotStorage, event_bus: EventBus, clock: Clock) -> Self {
112		Self {
113			storage,
114			event_bus,
115			config,
116			clock,
117		}
118	}
119
120	pub fn spawn(
121		system: &ActorSystem,
122		config: DropWorkerConfig,
123		storage: HotStorage,
124		event_bus: EventBus,
125		clock: Clock,
126	) -> ActorRef<DropMessage> {
127		let actor = Self::new(config, storage, event_bus, clock);
128		system.spawn("drop-worker", actor).actor_ref().clone()
129	}
130
131	/// Maybe flush if batch is full.
132	fn maybe_flush(&self, state: &mut DropActorState) {
133		if state.pending_requests.len() >= self.config.batch_size {
134			self.flush(state);
135		}
136	}
137
138	/// Flush all pending requests.
139	fn flush(&self, state: &mut DropActorState) {
140		if state.pending_requests.is_empty() {
141			return;
142		}
143
144		Self::process_batch(&self.storage, &mut state.pending_requests, &self.event_bus);
145		state.last_flush = self.clock.instant();
146
147		state.flush_count += 1;
148		if state.flush_count % 100 == 0 {
149			self.storage.maintenance();
150		}
151	}
152
153	#[instrument(name = "drop::process_batch", level = "debug", skip_all, fields(num_requests = requests.len(), total_dropped))]
154	fn process_batch(storage: &HotStorage, requests: &mut Vec<DropRequest>, event_bus: &EventBus) {
155		// Collect all keys to drop, grouped by table: (key, version) pairs
156		let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>> = HashMap::new();
157		// Collect drop stats for metrics
158		let mut drops_with_stats = Vec::new();
159		let mut max_pending_version = CommitVersion(0);
160
161		for request in requests.drain(..) {
162			// Track highest version for event (prefer pending_version if set, otherwise use commit_version)
163			let version_for_event = request.pending_version.unwrap_or(request.commit_version);
164			if version_for_event > max_pending_version {
165				max_pending_version = version_for_event;
166			}
167
168			match find_keys_to_drop(
169				storage,
170				request.table,
171				request.key.as_ref(),
172				request.up_to_version,
173				request.keep_last_versions,
174				request.pending_version,
175			) {
176				Ok(entries_to_drop) => {
177					for entry in entries_to_drop {
178						// Collect stats for metrics
179						drops_with_stats.push(StorageDrop {
180							key: EncodedKey(request.key.clone()),
181							value_bytes: entry.value_bytes,
182						});
183
184						// Queue for physical deletion: (key, version) pair
185						batches.entry(request.table)
186							.or_default()
187							.push((entry.key, entry.version));
188					}
189				}
190				Err(e) => {
191					error!("Drop actor failed to find keys to drop: {}", e);
192				}
193			}
194		}
195
196		if !batches.is_empty() {
197			if let Err(e) = storage.drop(batches) {
198				error!("Drop actor failed to execute drops: {}", e);
199			}
200		}
201
202		let total_dropped = drops_with_stats.len();
203		Span::current().record("total_dropped", total_dropped);
204
205		event_bus.emit(StorageStatsRecordedEvent::new(vec![], vec![], drops_with_stats, max_pending_version));
206	}
207}
208
209impl Actor for DropActor {
210	type State = DropActorState;
211	type Message = DropMessage;
212
213	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
214		debug!("Drop actor started");
215
216		// Schedule periodic tick for flushing partial batches
217		let timer_handle = ctx.schedule_repeat(Duration::from_millis(10), DropMessage::Tick);
218
219		DropActorState {
220			pending_requests: Vec::with_capacity(self.config.batch_size),
221			last_flush: self.clock.instant(),
222			_timer_handle: Some(timer_handle),
223			flush_count: 0,
224		}
225	}
226
227	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
228		// Check for cancellation
229		if ctx.is_cancelled() {
230			// Flush remaining requests before stopping
231			self.flush(state);
232			return Directive::Stop;
233		}
234
235		match msg {
236			DropMessage::Request(request) => {
237				state.pending_requests.push(request);
238				self.maybe_flush(state);
239			}
240			DropMessage::Batch(requests) => {
241				state.pending_requests.extend(requests);
242				self.maybe_flush(state);
243			}
244			DropMessage::Tick => {
245				if !state.pending_requests.is_empty()
246					&& state.last_flush.elapsed() >= self.config.flush_interval
247				{
248					self.flush(state);
249				}
250			}
251			DropMessage::Shutdown => {
252				debug!("Drop actor received shutdown signal");
253				// Process any remaining requests before shutdown
254				self.flush(state);
255				return Directive::Stop;
256			}
257		}
258
259		Directive::Continue
260	}
261
262	fn post_stop(&self) {
263		debug!("Drop actor stopped");
264	}
265
266	fn config(&self) -> ActorConfig {
267		ActorConfig::new().mailbox_capacity(4096 * 16)
268	}
269}