reifydb_engine/
watermark.rs1use reifydb_cdc::consume::watermark::compute_watermark;
5use reifydb_core::common::CommitVersion;
6use reifydb_store_multi::gc::{EvictionWatermark, historical::QueryWatermark};
7use reifydb_transaction::transaction::Transaction;
8use reifydb_value::value::identity::IdentityId;
9
10use crate::engine::StandardEngine;
11
12impl QueryWatermark for StandardEngine {
13 fn effective_gc_cutoff(&self) -> CommitVersion {
14 let qdu = self.query_done_until();
15 let lease_min = self.multi().leases().min_active().unwrap_or(CommitVersion(u64::MAX));
16 qdu.min(lease_min)
17 }
18}
19
20impl EvictionWatermark for StandardEngine {
21 fn watermark(&self) -> CommitVersion {
22 self.effective_gc_cutoff().min(self.consumer_watermark())
23 }
24}
25
26impl StandardEngine {
27 fn consumer_watermark(&self) -> CommitVersion {
28 let mut txn = match self.begin_query(IdentityId::system()) {
29 Ok(txn) => txn,
30 Err(_) => return CommitVersion(0),
31 };
32 match compute_watermark(&mut Transaction::Query(&mut txn)) {
33 Ok(Some(v)) => v,
34 Ok(None) => CommitVersion(u64::MAX),
35 Err(_) => CommitVersion(0),
36 }
37 }
38}