Skip to main content

reifydb_store_multi/store/
worker.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, time::Duration};
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::key::EncodedKey,
9	event::{
10		EventBus,
11		metric::{MultiCommittedEvent, MultiDrop},
12	},
13	interface::store::EntryKind,
14};
15use reifydb_runtime::{
16	actor::{
17		context::Context,
18		mailbox::ActorRef,
19		system::{ActorConfig, ActorSystem},
20		timers::TimerHandle,
21		traits::{Actor, Directive},
22	},
23	context::clock::{Clock, Instant},
24};
25use reifydb_type::util::cowvec::CowVec;
26use tracing::{Span, debug, error, instrument};
27
28use super::drop::find_keys_to_drop;
29use crate::{hot::storage::HotStorage, tier::TierStorage};
30
31/// Configuration for the drop worker.
32#[derive(Debug, Clone)]
33pub struct DropWorkerConfig {
34	/// How many drop requests to batch before executing.
35	pub batch_size: usize,
36	/// Maximum time to wait before flushing a partial batch.
37	pub flush_interval: Duration,
38}
39
40impl Default for DropWorkerConfig {
41	fn default() -> Self {
42		Self {
43			batch_size: 100,
44			flush_interval: Duration::from_millis(50),
45		}
46	}
47}
48
49use reifydb_core::actors::drop::{DropMessage, DropRequest};
50
51/// Actor that processes drop operations asynchronously.
52pub struct DropActor {
53	storage: HotStorage,
54	event_bus: EventBus,
55	config: DropWorkerConfig,
56	clock: Clock,
57}
58
59/// State for the drop actor.
60pub struct DropActorState {
61	/// Pending requests waiting to be processed.
62	pending_requests: Vec<DropRequest>,
63	/// Last time we flushed the batch.
64	last_flush: Instant,
65	/// Handle to the periodic timer (for cleanup).
66	_timer_handle: Option<TimerHandle>,
67	/// Number of flushes since startup, used to schedule periodic maintenance.
68	flush_count: u64,
69}
70
71impl DropActor {
72	pub fn new(config: DropWorkerConfig, storage: HotStorage, event_bus: EventBus, clock: Clock) -> Self {
73		Self {
74			storage,
75			event_bus,
76			config,
77			clock,
78		}
79	}
80
81	pub fn spawn(
82		system: &ActorSystem,
83		config: DropWorkerConfig,
84		storage: HotStorage,
85		event_bus: EventBus,
86		clock: Clock,
87	) -> ActorRef<DropMessage> {
88		let actor = Self::new(config, storage, event_bus, clock);
89		system.spawn("drop-worker", actor).actor_ref().clone()
90	}
91
92	/// Maybe flush if batch is full.
93	fn maybe_flush(&self, state: &mut DropActorState) {
94		if state.pending_requests.len() >= self.config.batch_size {
95			self.flush(state);
96		}
97	}
98
99	/// Flush all pending requests.
100	fn flush(&self, state: &mut DropActorState) {
101		if state.pending_requests.is_empty() {
102			return;
103		}
104
105		Self::process_batch(&self.storage, &mut state.pending_requests, &self.event_bus);
106		state.last_flush = self.clock.instant();
107
108		state.flush_count += 1;
109		if state.flush_count.is_multiple_of(100) {
110			self.storage.maintenance();
111		}
112	}
113
114	#[instrument(name = "drop::process_batch", level = "debug", skip_all, fields(num_requests = requests.len(), total_dropped))]
115	fn process_batch(storage: &HotStorage, requests: &mut Vec<DropRequest>, event_bus: &EventBus) {
116		// Collect all keys to drop, grouped by table: (key, version) pairs
117		let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>> = HashMap::new();
118		// Collect drop stats for metrics
119		let mut drops_with_stats = Vec::new();
120		let mut max_pending_version = CommitVersion(0);
121
122		for request in requests.drain(..) {
123			// Track highest version for event (prefer pending_version if set, otherwise use commit_version)
124			let version_for_event = request.pending_version.unwrap_or(request.commit_version);
125			if version_for_event > max_pending_version {
126				max_pending_version = version_for_event;
127			}
128
129			match find_keys_to_drop(storage, request.table, request.key.as_ref(), request.pending_version) {
130				Ok(entries_to_drop) => {
131					for entry in entries_to_drop {
132						// Collect stats for metrics
133						drops_with_stats.push(MultiDrop {
134							key: EncodedKey(request.key.clone()),
135							value_bytes: entry.value_bytes,
136						});
137
138						// Queue for physical deletion: (key, version) pair
139						batches.entry(request.table)
140							.or_default()
141							.push((entry.key, entry.version));
142					}
143				}
144				Err(e) => {
145					error!("Drop actor failed to find keys to drop: {}", e);
146				}
147			}
148		}
149
150		if !batches.is_empty()
151			&& let Err(e) = storage.drop(batches)
152		{
153			error!("Drop actor failed to execute drops: {}", e);
154		}
155
156		let total_dropped = drops_with_stats.len();
157		Span::current().record("total_dropped", total_dropped);
158
159		event_bus.emit(MultiCommittedEvent::new(vec![], vec![], drops_with_stats, max_pending_version));
160	}
161}
162
163impl Actor for DropActor {
164	type State = DropActorState;
165	type Message = DropMessage;
166
167	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
168		debug!("Drop actor started");
169
170		// Schedule periodic tick for flushing partial batches
171		let timer_handle = ctx.schedule_repeat(Duration::from_millis(10), DropMessage::Tick);
172
173		DropActorState {
174			pending_requests: Vec::with_capacity(self.config.batch_size),
175			last_flush: self.clock.instant(),
176			_timer_handle: Some(timer_handle),
177			flush_count: 0,
178		}
179	}
180
181	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
182		// Check for cancellation
183		if ctx.is_cancelled() {
184			// Flush remaining requests before stopping
185			self.flush(state);
186			return Directive::Stop;
187		}
188
189		match msg {
190			DropMessage::Request(request) => {
191				state.pending_requests.push(request);
192				self.maybe_flush(state);
193			}
194			DropMessage::Batch(requests) => {
195				state.pending_requests.extend(requests);
196				self.maybe_flush(state);
197			}
198			DropMessage::Tick => {
199				if !state.pending_requests.is_empty()
200					&& state.last_flush.elapsed() >= self.config.flush_interval
201				{
202					self.flush(state);
203				}
204			}
205			DropMessage::Shutdown => {
206				debug!("Drop actor received shutdown signal");
207				// Process any remaining requests before shutdown
208				self.flush(state);
209				return Directive::Stop;
210			}
211		}
212
213		Directive::Continue
214	}
215
216	fn post_stop(&self) {
217		debug!("Drop actor stopped");
218	}
219
220	fn config(&self) -> ActorConfig {
221		ActorConfig::new().mailbox_capacity(4096 * 16)
222	}
223}