1use std::collections::HashMap;
5
6use reifydb_core::{
7 actors::ttl::RowTtlMessage as Message,
8 event::row::RowsExpiredEvent,
9 interface::catalog::{config::ConfigKey, shape::ShapeId},
10 row::{RowTtlAnchor, RowTtlCleanupMode},
11};
12use reifydb_runtime::actor::{
13 context::Context,
14 mailbox::ActorRef,
15 system::{ActorConfig, ActorSystem},
16 timers::TimerHandle,
17 traits::{Actor as ActorTrait, Directive},
18};
19use reifydb_type::value::datetime::DateTime;
20use tracing::{debug, info, trace, warn};
21
22use super::{ListRowTtls, ScanStats, scanner};
23use crate::{store::StandardMultiStore, tier::RangeCursor};
24
25#[derive(Default)]
27pub struct ScannerState {
28 cursors: HashMap<ShapeId, RangeCursor>,
29}
30
31pub struct ActorState {
33 _timer_handle: Option<TimerHandle>,
34 scanning: bool,
35 scanner: ScannerState,
36}
37
38pub struct Actor<P: ListRowTtls> {
41 store: StandardMultiStore,
42 provider: P,
43}
44
45impl<P: ListRowTtls> Actor<P> {
46 pub fn new(store: StandardMultiStore, provider: P) -> Self {
47 Self {
48 store,
49 provider,
50 }
51 }
52
53 pub fn spawn(system: &ActorSystem, store: StandardMultiStore, provider: P) -> ActorRef<Message> {
54 let actor = Self::new(store, provider);
55 system.spawn("row-ttl", actor).actor_ref().clone()
56 }
57
58 fn run_scan(&self, state: &mut ActorState, now: DateTime) {
59 if state.scanning {
60 debug!("Row TTL scan already in progress, skipping tick");
61 return;
62 }
63
64 let Some(hot) = self.store.hot() else {
65 warn!("Row TTL scan skipped: hot tier is not configured");
66 return;
67 };
68
69 state.scanning = true;
70
71 let now_nanos = now.to_nanos();
72 trace!(now_nanos, "Starting row TTL scan");
73
74 let ttls = self.provider.list_row_ttls();
75 let config = self.provider.config();
76 let mut stats = ScanStats::default();
77
78 let batch_size = config.get_config_uint8(ConfigKey::RowTtlScanBatchSize) as usize;
79
80 for (shape_id, ttl_config) in &ttls {
81 trace!(?shape_id, ?ttl_config, "Evaluating TTL config for shape");
82 if ttl_config.cleanup_mode == RowTtlCleanupMode::Delete {
83 debug!(
84 ?shape_id,
85 "Skipping shape with RowTtlCleanupMode::Delete (not supported in V1)"
86 );
87 stats.shapes_skipped += 1;
88 continue;
89 }
90
91 let mut cursor = state.scanner.cursors.remove(shape_id).unwrap_or_default();
92
93 let scan_result = match ttl_config.anchor {
94 RowTtlAnchor::Created => scanner::scan_shape_by_created_at(
95 hot,
96 *shape_id,
97 ttl_config,
98 now_nanos,
99 batch_size,
100 &mut cursor,
101 ),
102 RowTtlAnchor::Updated => scanner::scan_shape_by_updated_at(
103 hot,
104 *shape_id,
105 ttl_config,
106 now_nanos,
107 batch_size,
108 &mut cursor,
109 ),
110 };
111
112 match scan_result {
113 Ok((expired, result)) => {
114 debug!(
115 ?shape_id,
116 expired_count = expired.len(),
117 ?result,
118 "Shape scan iteration completed"
119 );
120 stats.shapes_scanned += 1;
121
122 if !expired.is_empty() {
123 stats.rows_expired += expired.len() as u64;
124 for row in &expired {
125 *stats.bytes_discovered.entry(row.shape_id).or_insert(0) +=
126 row.scanned_bytes;
127 }
128
129 match scanner::drop_expired_keys(hot, &expired, &mut stats) {
130 Ok(_) => {
131 let bytes_freed: u64 =
132 stats.bytes_reclaimed.values().sum();
133 debug!(
134 ?shape_id,
135 bytes_freed,
136 "Freed storage from expired rows for shape"
137 );
138 }
139 Err(e) => {
140 warn!(?shape_id, error = %e, "Failed to drop expired keys");
141 }
142 }
143 }
144
145 match result {
146 scanner::ScanResult::Yielded => {
147 state.scanner.cursors.insert(*shape_id, cursor);
148 }
149 scanner::ScanResult::Exhausted => {
150 }
153 }
154 }
155 Err(e) => {
156 warn!(?shape_id, error = %e, "Failed to scan shape for expired rows");
157 }
159 }
160 }
161
162 if stats.rows_expired > 0 {
163 hot.maintenance();
165
166 info!(
167 shapes_scanned = stats.shapes_scanned,
168 shapes_skipped = stats.shapes_skipped,
169 rows_expired = stats.rows_expired,
170 versions_dropped = stats.versions_dropped,
171 bytes_discovered = ?stats.bytes_discovered.values().sum::<u64>(),
172 bytes_reclaimed = ?stats.bytes_reclaimed.values().sum::<u64>(),
173 "Row TTL scan completed"
174 );
175 } else {
176 debug!(
177 shapes_scanned = stats.shapes_scanned,
178 shapes_skipped = stats.shapes_skipped,
179 "Row TTL scan completed (no expired rows)"
180 );
181 }
182
183 self.store.event_bus.emit(RowsExpiredEvent::new(
184 stats.shapes_scanned,
185 stats.shapes_skipped,
186 stats.rows_expired,
187 stats.versions_dropped,
188 stats.bytes_discovered,
189 stats.bytes_reclaimed,
190 ));
191
192 state.scanning = false;
193 }
194}
195
196impl<P: ListRowTtls> ActorTrait for Actor<P> {
197 type State = ActorState;
198 type Message = Message;
199
200 fn init(&self, ctx: &Context<Message>) -> ActorState {
201 debug!("Row TTL actor started");
202 let config = self.provider.config();
203 let scan_interval = config.get_config_duration(ConfigKey::RowTtlScanInterval);
204
205 let timer_handle = ctx.schedule_tick(scan_interval, |nanos| Message::Tick(DateTime::from_nanos(nanos)));
206 ActorState {
207 _timer_handle: Some(timer_handle),
208 scanning: false,
209 scanner: ScannerState::default(),
210 }
211 }
212
213 fn handle(&self, state: &mut ActorState, msg: Message, ctx: &Context<Message>) -> Directive {
214 if ctx.is_cancelled() {
215 return Directive::Stop;
216 }
217
218 match msg {
219 Message::Tick(now) => {
220 self.run_scan(state, now);
221 }
222 Message::Shutdown => {
223 debug!("Row TTL actor shutting down");
224 return Directive::Stop;
225 }
226 }
227
228 Directive::Continue
229 }
230
231 fn post_stop(&self) {
232 debug!("Row TTL actor stopped");
233 }
234
235 fn config(&self) -> ActorConfig {
236 ActorConfig::new().mailbox_capacity(64)
237 }
238}
239
240pub fn spawn_row_ttl_actor<P: ListRowTtls>(
246 store: StandardMultiStore,
247 system: ActorSystem,
248 provider: P,
249) -> ActorRef<Message> {
250 Actor::spawn(&system, store, provider)
251}