Skip to main content

reifydb_store_multi/ttl/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::HashMap;
5
6use reifydb_core::{
7	actors::ttl::RowTtlMessage as Message,
8	event::row::RowsExpiredEvent,
9	interface::catalog::{config::ConfigKey, shape::ShapeId},
10	row::{RowTtlAnchor, RowTtlCleanupMode},
11};
12use reifydb_runtime::actor::{
13	context::Context,
14	mailbox::ActorRef,
15	system::{ActorConfig, ActorSystem},
16	timers::TimerHandle,
17	traits::{Actor as ActorTrait, Directive},
18};
19use reifydb_type::value::datetime::DateTime;
20use tracing::{debug, info, trace, warn};
21
22use super::{ListRowTtls, ScanStats, scanner};
23use crate::{store::StandardMultiStore, tier::RangeCursor};
24
25/// Holds state for the chunked, stateful scanner.
26#[derive(Default)]
27pub struct ScannerState {
28	cursors: HashMap<ShapeId, RangeCursor>,
29}
30
31/// Internal state for the row TTL actor.
32pub struct ActorState {
33	_timer_handle: Option<TimerHandle>,
34	scanning: bool,
35	scanner: ScannerState,
36}
37
38/// Background actor that periodically scans shapes for expired rows
39/// and physically drops them based on TTL configuration.
40pub struct Actor<P: ListRowTtls> {
41	store: StandardMultiStore,
42	provider: P,
43}
44
45impl<P: ListRowTtls> Actor<P> {
46	pub fn new(store: StandardMultiStore, provider: P) -> Self {
47		Self {
48			store,
49			provider,
50		}
51	}
52
53	pub fn spawn(system: &ActorSystem, store: StandardMultiStore, provider: P) -> ActorRef<Message> {
54		let actor = Self::new(store, provider);
55		system.spawn("row-ttl", actor).actor_ref().clone()
56	}
57
58	fn run_scan(&self, state: &mut ActorState, now: DateTime) {
59		if state.scanning {
60			debug!("Row TTL scan already in progress, skipping tick");
61			return;
62		}
63
64		let Some(hot) = self.store.hot() else {
65			warn!("Row TTL scan skipped: hot tier is not configured");
66			return;
67		};
68
69		state.scanning = true;
70
71		let now_nanos = now.to_nanos();
72		trace!(now_nanos, "Starting row TTL scan");
73
74		let ttls = self.provider.list_row_ttls();
75		let config = self.provider.config();
76		let mut stats = ScanStats::default();
77
78		let batch_size = config.get_config_uint8(ConfigKey::RowTtlScanBatchSize) as usize;
79
80		for (shape_id, ttl_config) in &ttls {
81			trace!(?shape_id, ?ttl_config, "Evaluating TTL config for shape");
82			if ttl_config.cleanup_mode == RowTtlCleanupMode::Delete {
83				debug!(
84					?shape_id,
85					"Skipping shape with RowTtlCleanupMode::Delete (not supported in V1)"
86				);
87				stats.shapes_skipped += 1;
88				continue;
89			}
90
91			let mut cursor = state.scanner.cursors.remove(shape_id).unwrap_or_default();
92
93			let scan_result = match ttl_config.anchor {
94				RowTtlAnchor::Created => scanner::scan_shape_by_created_at(
95					hot,
96					*shape_id,
97					ttl_config,
98					now_nanos,
99					batch_size,
100					&mut cursor,
101				),
102				RowTtlAnchor::Updated => scanner::scan_shape_by_updated_at(
103					hot,
104					*shape_id,
105					ttl_config,
106					now_nanos,
107					batch_size,
108					&mut cursor,
109				),
110			};
111
112			match scan_result {
113				Ok((expired, result)) => {
114					debug!(
115						?shape_id,
116						expired_count = expired.len(),
117						?result,
118						"Shape scan iteration completed"
119					);
120					stats.shapes_scanned += 1;
121
122					if !expired.is_empty() {
123						stats.rows_expired += expired.len() as u64;
124						for row in &expired {
125							*stats.bytes_discovered.entry(row.shape_id).or_insert(0) +=
126								row.scanned_bytes;
127						}
128
129						match scanner::drop_expired_keys(hot, &expired, &mut stats) {
130							Ok(_) => {
131								let bytes_freed: u64 =
132									stats.bytes_reclaimed.values().sum();
133								debug!(
134									?shape_id,
135									bytes_freed,
136									"Freed storage from expired rows for shape"
137								);
138							}
139							Err(e) => {
140								warn!(?shape_id, error = %e, "Failed to drop expired keys");
141							}
142						}
143					}
144
145					match result {
146						scanner::ScanResult::Yielded => {
147							state.scanner.cursors.insert(*shape_id, cursor);
148						}
149						scanner::ScanResult::Exhausted => {
150							// Cursor is already removed, shape will restart from beginning
151							// next tick.
152						}
153					}
154				}
155				Err(e) => {
156					warn!(?shape_id, error = %e, "Failed to scan shape for expired rows");
157					// On error, we drop the cursor to restart scanning for this shape next tick.
158				}
159			}
160		}
161
162		if stats.rows_expired > 0 {
163			// Trigger maintenance to physically reclaim memory/disk space (especially for SQLite)
164			hot.maintenance();
165
166			info!(
167				shapes_scanned = stats.shapes_scanned,
168				shapes_skipped = stats.shapes_skipped,
169				rows_expired = stats.rows_expired,
170				versions_dropped = stats.versions_dropped,
171				bytes_discovered = ?stats.bytes_discovered.values().sum::<u64>(),
172				bytes_reclaimed = ?stats.bytes_reclaimed.values().sum::<u64>(),
173				"Row TTL scan completed"
174			);
175		} else {
176			debug!(
177				shapes_scanned = stats.shapes_scanned,
178				shapes_skipped = stats.shapes_skipped,
179				"Row TTL scan completed (no expired rows)"
180			);
181		}
182
183		self.store.event_bus.emit(RowsExpiredEvent::new(
184			stats.shapes_scanned,
185			stats.shapes_skipped,
186			stats.rows_expired,
187			stats.versions_dropped,
188			stats.bytes_discovered,
189			stats.bytes_reclaimed,
190		));
191
192		state.scanning = false;
193	}
194}
195
196impl<P: ListRowTtls> ActorTrait for Actor<P> {
197	type State = ActorState;
198	type Message = Message;
199
200	fn init(&self, ctx: &Context<Message>) -> ActorState {
201		debug!("Row TTL actor started");
202		let config = self.provider.config();
203		let scan_interval = config.get_config_duration(ConfigKey::RowTtlScanInterval);
204
205		let timer_handle = ctx.schedule_tick(scan_interval, |nanos| Message::Tick(DateTime::from_nanos(nanos)));
206		ActorState {
207			_timer_handle: Some(timer_handle),
208			scanning: false,
209			scanner: ScannerState::default(),
210		}
211	}
212
213	fn handle(&self, state: &mut ActorState, msg: Message, ctx: &Context<Message>) -> Directive {
214		if ctx.is_cancelled() {
215			return Directive::Stop;
216		}
217
218		match msg {
219			Message::Tick(now) => {
220				self.run_scan(state, now);
221			}
222			Message::Shutdown => {
223				debug!("Row TTL actor shutting down");
224				return Directive::Stop;
225			}
226		}
227
228		Directive::Continue
229	}
230
231	fn post_stop(&self) {
232		debug!("Row TTL actor stopped");
233	}
234
235	fn config(&self) -> ActorConfig {
236		ActorConfig::new().mailbox_capacity(64)
237	}
238}
239
240/// Spawn a row TTL actor that periodically scans and drops expired rows.
241///
242/// The provider is typically implemented by the engine layer wrapping
243/// the materialized catalog. Call this after both store and catalog
244/// are available.
245pub fn spawn_row_ttl_actor<P: ListRowTtls>(
246	store: StandardMultiStore,
247	system: ActorSystem,
248	provider: P,
249) -> ActorRef<Message> {
250	Actor::spawn(&system, store, provider)
251}