1use std::collections::HashMap;
5
6use reifydb_core::{
7 event::row::RowsExpiredEvent,
8 interface::catalog::{config::ConfigKey, shape::ShapeId},
9 row::{RowTtlAnchor, RowTtlCleanupMode},
10};
11use reifydb_runtime::actor::{
12 context::Context,
13 mailbox::ActorRef,
14 system::{ActorConfig, ActorSystem},
15 timers::TimerHandle,
16 traits::{Actor as ActorTrait, Directive},
17};
18use reifydb_type::value::datetime::DateTime;
19use tracing::{debug, info, trace, warn};
20
21use super::{ListRowTtls, ScanStats, scanner};
22use crate::{store::StandardMultiStore, tier::RangeCursor};
23
24#[derive(Debug, Clone)]
26pub enum Message {
27 Tick(DateTime),
29 Shutdown,
31}
32
33#[derive(Default)]
35pub struct ScannerState {
36 cursors: HashMap<ShapeId, RangeCursor>,
37}
38
39pub struct ActorState {
41 _timer_handle: Option<TimerHandle>,
42 scanning: bool,
43 scanner: ScannerState,
44}
45
46pub struct Actor<P: ListRowTtls> {
49 store: StandardMultiStore,
50 provider: P,
51}
52
53impl<P: ListRowTtls> Actor<P> {
54 pub fn new(store: StandardMultiStore, provider: P) -> Self {
55 Self {
56 store,
57 provider,
58 }
59 }
60
61 pub fn spawn(system: &ActorSystem, store: StandardMultiStore, provider: P) -> ActorRef<Message> {
62 let actor = Self::new(store, provider);
63 system.spawn("row-ttl", actor).actor_ref().clone()
64 }
65
66 fn run_scan(&self, state: &mut ActorState, now: DateTime) {
67 if state.scanning {
68 debug!("Row TTL scan already in progress, skipping tick");
69 return;
70 }
71
72 let Some(hot) = self.store.hot() else {
73 warn!("Row TTL scan skipped: hot tier is not configured");
74 return;
75 };
76
77 state.scanning = true;
78
79 let now_nanos = now.to_nanos();
80 trace!(now_nanos, "Starting row TTL scan");
81
82 let ttls = self.provider.list_row_ttls();
83 let config = self.provider.config();
84 let mut stats = ScanStats::default();
85
86 let batch_size = config.get_config_uint8(ConfigKey::RowTtlScanBatchSize) as usize;
87
88 for (shape_id, ttl_config) in &ttls {
89 trace!(?shape_id, ?ttl_config, "Evaluating TTL config for shape");
90 if ttl_config.cleanup_mode == RowTtlCleanupMode::Delete {
91 debug!(
92 ?shape_id,
93 "Skipping shape with RowTtlCleanupMode::Delete (not supported in V1)"
94 );
95 stats.shapes_skipped += 1;
96 continue;
97 }
98
99 let mut cursor = state.scanner.cursors.remove(shape_id).unwrap_or_default();
100
101 let scan_result = match ttl_config.anchor {
102 RowTtlAnchor::Created => scanner::scan_shape_by_created_at(
103 hot,
104 *shape_id,
105 ttl_config,
106 now_nanos,
107 batch_size,
108 &mut cursor,
109 ),
110 RowTtlAnchor::Updated => scanner::scan_shape_by_updated_at(
111 hot,
112 *shape_id,
113 ttl_config,
114 now_nanos,
115 batch_size,
116 &mut cursor,
117 ),
118 };
119
120 match scan_result {
121 Ok((expired, result)) => {
122 debug!(
123 ?shape_id,
124 expired_count = expired.len(),
125 ?result,
126 "Shape scan iteration completed"
127 );
128 stats.shapes_scanned += 1;
129
130 if !expired.is_empty() {
131 stats.rows_expired += expired.len() as u64;
132 for row in &expired {
133 *stats.bytes_discovered.entry(row.shape_id).or_insert(0) +=
134 row.scanned_bytes;
135 }
136
137 match scanner::drop_expired_keys(hot, &expired, &mut stats) {
138 Ok(_) => {
139 let bytes_freed: u64 =
140 stats.bytes_reclaimed.values().sum();
141 debug!(
142 ?shape_id,
143 bytes_freed,
144 "Freed storage from expired rows for shape"
145 );
146 }
147 Err(e) => {
148 warn!(?shape_id, error = %e, "Failed to drop expired keys");
149 }
150 }
151 }
152
153 match result {
154 scanner::ScanResult::Yielded => {
155 state.scanner.cursors.insert(*shape_id, cursor);
156 }
157 scanner::ScanResult::Exhausted => {
158 }
161 }
162 }
163 Err(e) => {
164 warn!(?shape_id, error = %e, "Failed to scan shape for expired rows");
165 }
167 }
168 }
169
170 if stats.rows_expired > 0 {
171 hot.maintenance();
173
174 info!(
175 shapes_scanned = stats.shapes_scanned,
176 shapes_skipped = stats.shapes_skipped,
177 rows_expired = stats.rows_expired,
178 versions_dropped = stats.versions_dropped,
179 bytes_discovered = ?stats.bytes_discovered.values().sum::<u64>(),
180 bytes_reclaimed = ?stats.bytes_reclaimed.values().sum::<u64>(),
181 "Row TTL scan completed"
182 );
183 } else {
184 debug!(
185 shapes_scanned = stats.shapes_scanned,
186 shapes_skipped = stats.shapes_skipped,
187 "Row TTL scan completed (no expired rows)"
188 );
189 }
190
191 self.store.event_bus.emit(RowsExpiredEvent::new(
192 stats.shapes_scanned,
193 stats.shapes_skipped,
194 stats.rows_expired,
195 stats.versions_dropped,
196 stats.bytes_discovered,
197 stats.bytes_reclaimed,
198 ));
199
200 state.scanning = false;
201 }
202}
203
204impl<P: ListRowTtls> ActorTrait for Actor<P> {
205 type State = ActorState;
206 type Message = Message;
207
208 fn init(&self, ctx: &Context<Message>) -> ActorState {
209 debug!("Row TTL actor started");
210 let config = self.provider.config();
211 let scan_interval = config.get_config_duration(ConfigKey::RowTtlScanInterval);
212
213 let timer_handle = ctx.schedule_tick(scan_interval, |nanos| Message::Tick(DateTime::from_nanos(nanos)));
214 ActorState {
215 _timer_handle: Some(timer_handle),
216 scanning: false,
217 scanner: ScannerState::default(),
218 }
219 }
220
221 fn handle(&self, state: &mut ActorState, msg: Message, ctx: &Context<Message>) -> Directive {
222 if ctx.is_cancelled() {
223 return Directive::Stop;
224 }
225
226 match msg {
227 Message::Tick(now) => {
228 self.run_scan(state, now);
229 }
230 Message::Shutdown => {
231 debug!("Row TTL actor shutting down");
232 return Directive::Stop;
233 }
234 }
235
236 Directive::Continue
237 }
238
239 fn post_stop(&self) {
240 debug!("Row TTL actor stopped");
241 }
242
243 fn config(&self) -> ActorConfig {
244 ActorConfig::new().mailbox_capacity(64)
245 }
246}
247
248pub fn spawn_row_ttl_actor<P: ListRowTtls>(
254 store: StandardMultiStore,
255 system: ActorSystem,
256 provider: P,
257) -> ActorRef<Message> {
258 Actor::spawn(&system, store, provider)
259}