Skip to main content

reifydb_engine/
watermark.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use reifydb_cdc::consume::watermark::compute_watermark;
5use reifydb_core::common::CommitVersion;
6use reifydb_store_multi::gc::{EvictionWatermark, historical::QueryWatermark};
7use reifydb_transaction::transaction::Transaction;
8use reifydb_value::value::identity::IdentityId;
9
10use crate::engine::StandardEngine;
11
12impl QueryWatermark for StandardEngine {
13	fn effective_gc_cutoff(&self) -> CommitVersion {
14		let qdu = self.query_done_until();
15		let lease_min = self.multi().leases().min_active().unwrap_or(CommitVersion(u64::MAX));
16		qdu.min(lease_min)
17	}
18}
19
20impl EvictionWatermark for StandardEngine {
21	fn watermark(&self) -> CommitVersion {
22		self.effective_gc_cutoff().min(self.consumer_watermark())
23	}
24}
25
26impl StandardEngine {
27	fn consumer_watermark(&self) -> CommitVersion {
28		let mut txn = match self.begin_query(IdentityId::system()) {
29			Ok(txn) => txn,
30			Err(_) => return CommitVersion(0),
31		};
32		match compute_watermark(&mut Transaction::Query(&mut txn)) {
33			Ok(Some(v)) => v,
34			Ok(None) => CommitVersion(u64::MAX),
35			Err(_) => CommitVersion(0),
36		}
37	}
38}