reifydb_store_multi/store/
worker.rs1use std::{collections::HashMap, time::Duration};
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::key::EncodedKey,
9 event::{
10 EventBus,
11 metric::{MultiCommittedEvent, MultiDrop},
12 },
13 interface::store::EntryKind,
14};
15use reifydb_runtime::{
16 actor::{
17 context::Context,
18 mailbox::ActorRef,
19 system::{ActorConfig, ActorSystem},
20 timers::TimerHandle,
21 traits::{Actor, Directive},
22 },
23 context::clock::{Clock, Instant},
24};
25use reifydb_type::util::cowvec::CowVec;
26use tracing::{Span, debug, error, instrument};
27
28use super::drop::find_keys_to_drop;
29use crate::{hot::storage::HotStorage, tier::TierStorage};
30
31#[derive(Debug, Clone)]
33pub struct DropWorkerConfig {
34 pub batch_size: usize,
36 pub flush_interval: Duration,
38}
39
40impl Default for DropWorkerConfig {
41 fn default() -> Self {
42 Self {
43 batch_size: 100,
44 flush_interval: Duration::from_millis(50),
45 }
46 }
47}
48
49use reifydb_core::actors::drop::{DropMessage, DropRequest};
50
51pub struct DropActor {
53 storage: HotStorage,
54 event_bus: EventBus,
55 config: DropWorkerConfig,
56 clock: Clock,
57}
58
59pub struct DropActorState {
61 pending_requests: Vec<DropRequest>,
63 last_flush: Instant,
65 _timer_handle: Option<TimerHandle>,
67 flush_count: u64,
69}
70
71impl DropActor {
72 pub fn new(config: DropWorkerConfig, storage: HotStorage, event_bus: EventBus, clock: Clock) -> Self {
73 Self {
74 storage,
75 event_bus,
76 config,
77 clock,
78 }
79 }
80
81 pub fn spawn(
82 system: &ActorSystem,
83 config: DropWorkerConfig,
84 storage: HotStorage,
85 event_bus: EventBus,
86 clock: Clock,
87 ) -> ActorRef<DropMessage> {
88 let actor = Self::new(config, storage, event_bus, clock);
89 system.spawn("drop-worker", actor).actor_ref().clone()
90 }
91
92 fn maybe_flush(&self, state: &mut DropActorState) {
94 if state.pending_requests.len() >= self.config.batch_size {
95 self.flush(state);
96 }
97 }
98
99 fn flush(&self, state: &mut DropActorState) {
101 if state.pending_requests.is_empty() {
102 return;
103 }
104
105 Self::process_batch(&self.storage, &mut state.pending_requests, &self.event_bus);
106 state.last_flush = self.clock.instant();
107
108 state.flush_count += 1;
109 if state.flush_count.is_multiple_of(100) {
110 self.storage.maintenance();
111 }
112 }
113
114 #[instrument(name = "drop::process_batch", level = "debug", skip_all, fields(num_requests = requests.len(), total_dropped))]
115 fn process_batch(storage: &HotStorage, requests: &mut Vec<DropRequest>, event_bus: &EventBus) {
116 let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>> = HashMap::new();
118 let mut drops_with_stats = Vec::new();
120 let mut max_pending_version = CommitVersion(0);
121
122 for request in requests.drain(..) {
123 let version_for_event = request.pending_version.unwrap_or(request.commit_version);
125 if version_for_event > max_pending_version {
126 max_pending_version = version_for_event;
127 }
128
129 match find_keys_to_drop(storage, request.table, request.key.as_ref(), request.pending_version) {
130 Ok(entries_to_drop) => {
131 for entry in entries_to_drop {
132 drops_with_stats.push(MultiDrop {
134 key: EncodedKey(request.key.clone()),
135 value_bytes: entry.value_bytes,
136 });
137
138 batches.entry(request.table)
140 .or_default()
141 .push((entry.key, entry.version));
142 }
143 }
144 Err(e) => {
145 error!("Drop actor failed to find keys to drop: {}", e);
146 }
147 }
148 }
149
150 if !batches.is_empty()
151 && let Err(e) = storage.drop(batches)
152 {
153 error!("Drop actor failed to execute drops: {}", e);
154 }
155
156 let total_dropped = drops_with_stats.len();
157 Span::current().record("total_dropped", total_dropped);
158
159 event_bus.emit(MultiCommittedEvent::new(vec![], vec![], drops_with_stats, max_pending_version));
160 }
161}
162
163impl Actor for DropActor {
164 type State = DropActorState;
165 type Message = DropMessage;
166
167 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
168 debug!("Drop actor started");
169
170 let timer_handle = ctx.schedule_repeat(Duration::from_millis(10), DropMessage::Tick);
172
173 DropActorState {
174 pending_requests: Vec::with_capacity(self.config.batch_size),
175 last_flush: self.clock.instant(),
176 _timer_handle: Some(timer_handle),
177 flush_count: 0,
178 }
179 }
180
181 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
182 if ctx.is_cancelled() {
184 self.flush(state);
186 return Directive::Stop;
187 }
188
189 match msg {
190 DropMessage::Request(request) => {
191 state.pending_requests.push(request);
192 self.maybe_flush(state);
193 }
194 DropMessage::Batch(requests) => {
195 state.pending_requests.extend(requests);
196 self.maybe_flush(state);
197 }
198 DropMessage::Tick => {
199 if !state.pending_requests.is_empty()
200 && state.last_flush.elapsed() >= self.config.flush_interval
201 {
202 self.flush(state);
203 }
204 }
205 DropMessage::Shutdown => {
206 debug!("Drop actor received shutdown signal");
207 self.flush(state);
209 return Directive::Stop;
210 }
211 }
212
213 Directive::Continue
214 }
215
216 fn post_stop(&self) {
217 debug!("Drop actor stopped");
218 }
219
220 fn config(&self) -> ActorConfig {
221 ActorConfig::new().mailbox_capacity(4096 * 16)
222 }
223}