reifydb_store_multi/store/
worker.rs1use std::{collections::HashMap, time::Duration};
14
15use reifydb_core::{
16 common::CommitVersion,
17 encoded::key::EncodedKey,
18 event::{
19 EventBus,
20 metric::{StorageDrop, StorageStatsRecordedEvent},
21 },
22};
23use reifydb_runtime::{
24 actor::{
25 context::Context,
26 mailbox::ActorRef,
27 system::{ActorConfig, ActorSystem},
28 timers::TimerHandle,
29 traits::{Actor, Directive},
30 },
31 clock::{Clock, Instant},
32};
33use reifydb_type::util::cowvec::CowVec;
34use tracing::{Span, debug, error, instrument};
35
36use super::drop::find_keys_to_drop;
37use crate::{
38 hot::storage::HotStorage,
39 tier::{EntryKind, TierStorage},
40};
41
42#[derive(Debug, Clone)]
44pub struct DropWorkerConfig {
45 pub batch_size: usize,
47 pub flush_interval: Duration,
49}
50
51impl Default for DropWorkerConfig {
52 fn default() -> Self {
53 Self {
54 batch_size: 100,
55 flush_interval: Duration::from_millis(50),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct DropRequest {
63 pub table: EntryKind,
65 pub key: CowVec<u8>,
67 pub up_to_version: Option<CommitVersion>,
69 pub keep_last_versions: Option<usize>,
71 pub commit_version: CommitVersion,
73 pub pending_version: Option<CommitVersion>,
75}
76
77#[derive(Clone)]
79pub enum DropMessage {
80 Request(DropRequest),
82 Batch(Vec<DropRequest>),
84 Tick,
86 Shutdown,
88}
89
90pub struct DropActor {
92 storage: HotStorage,
93 event_bus: EventBus,
94 config: DropWorkerConfig,
95 clock: Clock,
96}
97
98pub struct DropActorState {
100 pending_requests: Vec<DropRequest>,
102 last_flush: Instant,
104 _timer_handle: Option<TimerHandle>,
106 flush_count: u64,
108}
109
110impl DropActor {
111 pub fn new(config: DropWorkerConfig, storage: HotStorage, event_bus: EventBus, clock: Clock) -> Self {
112 Self {
113 storage,
114 event_bus,
115 config,
116 clock,
117 }
118 }
119
120 pub fn spawn(
121 system: &ActorSystem,
122 config: DropWorkerConfig,
123 storage: HotStorage,
124 event_bus: EventBus,
125 clock: Clock,
126 ) -> ActorRef<DropMessage> {
127 let actor = Self::new(config, storage, event_bus, clock);
128 system.spawn("drop-worker", actor).actor_ref().clone()
129 }
130
131 fn maybe_flush(&self, state: &mut DropActorState) {
133 if state.pending_requests.len() >= self.config.batch_size {
134 self.flush(state);
135 }
136 }
137
138 fn flush(&self, state: &mut DropActorState) {
140 if state.pending_requests.is_empty() {
141 return;
142 }
143
144 Self::process_batch(&self.storage, &mut state.pending_requests, &self.event_bus);
145 state.last_flush = self.clock.instant();
146
147 state.flush_count += 1;
148 if state.flush_count % 100 == 0 {
149 self.storage.maintenance();
150 }
151 }
152
153 #[instrument(name = "drop::process_batch", level = "debug", skip_all, fields(num_requests = requests.len(), total_dropped))]
154 fn process_batch(storage: &HotStorage, requests: &mut Vec<DropRequest>, event_bus: &EventBus) {
155 let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>> = HashMap::new();
157 let mut drops_with_stats = Vec::new();
159 let mut max_pending_version = CommitVersion(0);
160
161 for request in requests.drain(..) {
162 let version_for_event = request.pending_version.unwrap_or(request.commit_version);
164 if version_for_event > max_pending_version {
165 max_pending_version = version_for_event;
166 }
167
168 match find_keys_to_drop(
169 storage,
170 request.table,
171 request.key.as_ref(),
172 request.up_to_version,
173 request.keep_last_versions,
174 request.pending_version,
175 ) {
176 Ok(entries_to_drop) => {
177 for entry in entries_to_drop {
178 drops_with_stats.push(StorageDrop {
180 key: EncodedKey(request.key.clone()),
181 value_bytes: entry.value_bytes,
182 });
183
184 batches.entry(request.table)
186 .or_default()
187 .push((entry.key, entry.version));
188 }
189 }
190 Err(e) => {
191 error!("Drop actor failed to find keys to drop: {}", e);
192 }
193 }
194 }
195
196 if !batches.is_empty() {
197 if let Err(e) = storage.drop(batches) {
198 error!("Drop actor failed to execute drops: {}", e);
199 }
200 }
201
202 let total_dropped = drops_with_stats.len();
203 Span::current().record("total_dropped", total_dropped);
204
205 event_bus.emit(StorageStatsRecordedEvent::new(vec![], vec![], drops_with_stats, max_pending_version));
206 }
207}
208
209impl Actor for DropActor {
210 type State = DropActorState;
211 type Message = DropMessage;
212
213 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
214 debug!("Drop actor started");
215
216 let timer_handle = ctx.schedule_repeat(Duration::from_millis(10), DropMessage::Tick);
218
219 DropActorState {
220 pending_requests: Vec::with_capacity(self.config.batch_size),
221 last_flush: self.clock.instant(),
222 _timer_handle: Some(timer_handle),
223 flush_count: 0,
224 }
225 }
226
227 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
228 if ctx.is_cancelled() {
230 self.flush(state);
232 return Directive::Stop;
233 }
234
235 match msg {
236 DropMessage::Request(request) => {
237 state.pending_requests.push(request);
238 self.maybe_flush(state);
239 }
240 DropMessage::Batch(requests) => {
241 state.pending_requests.extend(requests);
242 self.maybe_flush(state);
243 }
244 DropMessage::Tick => {
245 if !state.pending_requests.is_empty()
246 && state.last_flush.elapsed() >= self.config.flush_interval
247 {
248 self.flush(state);
249 }
250 }
251 DropMessage::Shutdown => {
252 debug!("Drop actor received shutdown signal");
253 self.flush(state);
255 return Directive::Stop;
256 }
257 }
258
259 Directive::Continue
260 }
261
262 fn post_stop(&self) {
263 debug!("Drop actor stopped");
264 }
265
266 fn config(&self) -> ActorConfig {
267 ActorConfig::new().mailbox_capacity(4096 * 16)
268 }
269}