Skip to main content

reifydb_store_multi/ttl/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::HashMap;
5
6use reifydb_core::{
7	event::row::RowsExpiredEvent,
8	interface::catalog::{config::ConfigKey, shape::ShapeId},
9	row::{RowTtlAnchor, RowTtlCleanupMode},
10};
11use reifydb_runtime::actor::{
12	context::Context,
13	mailbox::ActorRef,
14	system::{ActorConfig, ActorSystem},
15	timers::TimerHandle,
16	traits::{Actor as ActorTrait, Directive},
17};
18use reifydb_type::value::datetime::DateTime;
19use tracing::{debug, info, trace, warn};
20
21use super::{ListRowTtls, ScanStats, scanner};
22use crate::{store::StandardMultiStore, tier::RangeCursor};
23
24/// Messages handled by the row TTL actor.
25#[derive(Debug, Clone)]
26pub enum Message {
27	/// Periodic tick triggers a full scan cycle.
28	Tick(DateTime),
29	/// Shutdown gracefully.
30	Shutdown,
31}
32
33/// Holds state for the chunked, stateful scanner.
34#[derive(Default)]
35pub struct ScannerState {
36	cursors: HashMap<ShapeId, RangeCursor>,
37}
38
39/// Internal state for the row TTL actor.
40pub struct ActorState {
41	_timer_handle: Option<TimerHandle>,
42	scanning: bool,
43	scanner: ScannerState,
44}
45
46/// Background actor that periodically scans shapes for expired rows
47/// and physically drops them based on TTL configuration.
48pub struct Actor<P: ListRowTtls> {
49	store: StandardMultiStore,
50	provider: P,
51}
52
53impl<P: ListRowTtls> Actor<P> {
54	pub fn new(store: StandardMultiStore, provider: P) -> Self {
55		Self {
56			store,
57			provider,
58		}
59	}
60
61	pub fn spawn(system: &ActorSystem, store: StandardMultiStore, provider: P) -> ActorRef<Message> {
62		let actor = Self::new(store, provider);
63		system.spawn("row-ttl", actor).actor_ref().clone()
64	}
65
66	fn run_scan(&self, state: &mut ActorState, now: DateTime) {
67		if state.scanning {
68			debug!("Row TTL scan already in progress, skipping tick");
69			return;
70		}
71
72		let Some(hot) = self.store.hot() else {
73			warn!("Row TTL scan skipped: hot tier is not configured");
74			return;
75		};
76
77		state.scanning = true;
78
79		let now_nanos = now.to_nanos();
80		trace!(now_nanos, "Starting row TTL scan");
81
82		let ttls = self.provider.list_row_ttls();
83		let config = self.provider.config();
84		let mut stats = ScanStats::default();
85
86		let batch_size = config.get_config_uint8(ConfigKey::RowTtlScanBatchSize) as usize;
87
88		for (shape_id, ttl_config) in &ttls {
89			trace!(?shape_id, ?ttl_config, "Evaluating TTL config for shape");
90			if ttl_config.cleanup_mode == RowTtlCleanupMode::Delete {
91				debug!(
92					?shape_id,
93					"Skipping shape with RowTtlCleanupMode::Delete (not supported in V1)"
94				);
95				stats.shapes_skipped += 1;
96				continue;
97			}
98
99			let mut cursor = state.scanner.cursors.remove(shape_id).unwrap_or_default();
100
101			let scan_result = match ttl_config.anchor {
102				RowTtlAnchor::Created => scanner::scan_shape_by_created_at(
103					hot,
104					*shape_id,
105					ttl_config,
106					now_nanos,
107					batch_size,
108					&mut cursor,
109				),
110				RowTtlAnchor::Updated => scanner::scan_shape_by_updated_at(
111					hot,
112					*shape_id,
113					ttl_config,
114					now_nanos,
115					batch_size,
116					&mut cursor,
117				),
118			};
119
120			match scan_result {
121				Ok((expired, result)) => {
122					debug!(
123						?shape_id,
124						expired_count = expired.len(),
125						?result,
126						"Shape scan iteration completed"
127					);
128					stats.shapes_scanned += 1;
129
130					if !expired.is_empty() {
131						stats.rows_expired += expired.len() as u64;
132						for row in &expired {
133							*stats.bytes_discovered.entry(row.shape_id).or_insert(0) +=
134								row.scanned_bytes;
135						}
136
137						match scanner::drop_expired_keys(hot, &expired, &mut stats) {
138							Ok(_) => {
139								let bytes_freed: u64 =
140									stats.bytes_reclaimed.values().sum();
141								debug!(
142									?shape_id,
143									bytes_freed,
144									"Freed storage from expired rows for shape"
145								);
146							}
147							Err(e) => {
148								warn!(?shape_id, error = %e, "Failed to drop expired keys");
149							}
150						}
151					}
152
153					match result {
154						scanner::ScanResult::Yielded => {
155							state.scanner.cursors.insert(*shape_id, cursor);
156						}
157						scanner::ScanResult::Exhausted => {
158							// Cursor is already removed, shape will restart from beginning
159							// next tick.
160						}
161					}
162				}
163				Err(e) => {
164					warn!(?shape_id, error = %e, "Failed to scan shape for expired rows");
165					// On error, we drop the cursor to restart scanning for this shape next tick.
166				}
167			}
168		}
169
170		if stats.rows_expired > 0 {
171			// Trigger maintenance to physically reclaim memory/disk space (especially for SQLite)
172			hot.maintenance();
173
174			info!(
175				shapes_scanned = stats.shapes_scanned,
176				shapes_skipped = stats.shapes_skipped,
177				rows_expired = stats.rows_expired,
178				versions_dropped = stats.versions_dropped,
179				bytes_discovered = ?stats.bytes_discovered.values().sum::<u64>(),
180				bytes_reclaimed = ?stats.bytes_reclaimed.values().sum::<u64>(),
181				"Row TTL scan completed"
182			);
183		} else {
184			debug!(
185				shapes_scanned = stats.shapes_scanned,
186				shapes_skipped = stats.shapes_skipped,
187				"Row TTL scan completed (no expired rows)"
188			);
189		}
190
191		self.store.event_bus.emit(RowsExpiredEvent::new(
192			stats.shapes_scanned,
193			stats.shapes_skipped,
194			stats.rows_expired,
195			stats.versions_dropped,
196			stats.bytes_discovered,
197			stats.bytes_reclaimed,
198		));
199
200		state.scanning = false;
201	}
202}
203
204impl<P: ListRowTtls> ActorTrait for Actor<P> {
205	type State = ActorState;
206	type Message = Message;
207
208	fn init(&self, ctx: &Context<Message>) -> ActorState {
209		debug!("Row TTL actor started");
210		let config = self.provider.config();
211		let scan_interval = config.get_config_duration(ConfigKey::RowTtlScanInterval);
212
213		let timer_handle = ctx.schedule_tick(scan_interval, |nanos| Message::Tick(DateTime::from_nanos(nanos)));
214		ActorState {
215			_timer_handle: Some(timer_handle),
216			scanning: false,
217			scanner: ScannerState::default(),
218		}
219	}
220
221	fn handle(&self, state: &mut ActorState, msg: Message, ctx: &Context<Message>) -> Directive {
222		if ctx.is_cancelled() {
223			return Directive::Stop;
224		}
225
226		match msg {
227			Message::Tick(now) => {
228				self.run_scan(state, now);
229			}
230			Message::Shutdown => {
231				debug!("Row TTL actor shutting down");
232				return Directive::Stop;
233			}
234		}
235
236		Directive::Continue
237	}
238
239	fn post_stop(&self) {
240		debug!("Row TTL actor stopped");
241	}
242
243	fn config(&self) -> ActorConfig {
244		ActorConfig::new().mailbox_capacity(64)
245	}
246}
247
248/// Spawn a row TTL actor that periodically scans and drops expired rows.
249///
250/// The provider is typically implemented by the engine layer wrapping
251/// the materialized catalog. Call this after both store and catalog
252/// are available.
253pub fn spawn_row_ttl_actor<P: ListRowTtls>(
254	store: StandardMultiStore,
255	system: ActorSystem,
256	provider: P,
257) -> ActorRef<Message> {
258	Actor::spawn(&system, store, provider)
259}