use crate::err::Error;
use crate::kvs::Datastore;
use crate::kvs::{LockType::*, TransactionType::*};
use crate::vs::VersionStamp;
impl Datastore {
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub(crate) async fn changefeed_versionstamp(
&self,
ts: u64,
) -> Result<Option<VersionStamp>, Error> {
let mut vs: Option<VersionStamp> = None;
let txn = self.transaction(Write, Optimistic).await?;
let nss = catch!(txn, txn.all_ns().await);
for ns in nss.iter() {
let ns = &ns.name;
let dbs = catch!(txn, txn.all_db(ns).await);
for db in dbs.iter() {
let db = &db.name;
vs = Some(txn.lock().await.set_timestamp_for_versionstamp(ts, ns, db).await?);
}
}
catch!(txn, txn.commit().await);
Ok(vs)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub(crate) async fn changefeed_cleanup(&self, ts: u64) -> Result<(), Error> {
let txn = self.transaction(Write, Optimistic).await?;
catch!(txn, crate::cf::gc_all_at(&txn, ts).await);
catch!(txn, txn.commit().await);
Ok(())
}
}