use crate::engine::IntervalStream;
use crate::err::Error;
#[cfg(not(target_family = "wasm"))]
use core::future::Future;
use futures::StreamExt;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use surrealdb_core::{kvs::Datastore, options::EngineOptions};
use tokio_util::sync::CancellationToken;
#[cfg(not(target_family = "wasm"))]
use tokio::spawn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_futures::spawn_local as spawn;
#[cfg(not(target_family = "wasm"))]
type Task = Pin<Box<dyn Future<Output = Result<(), tokio::task::JoinError>> + Send + 'static>>;
#[cfg(target_family = "wasm")]
type Task = Pin<Box<()>>;
pub struct Tasks(#[allow(dead_code)] Vec<Task>);
impl Tasks {
#[cfg(target_family = "wasm")]
pub async fn resolve(self) -> Result<(), Error> {
Ok(())
}
#[cfg(not(target_family = "wasm"))]
pub async fn resolve(self) -> Result<(), Error> {
for task in self.0.into_iter() {
let _ = task.await;
}
Ok(())
}
}
pub fn init(dbs: Arc<Datastore>, canceller: CancellationToken, opts: &EngineOptions) -> Tasks {
let task1 = spawn_task_node_membership_refresh(dbs.clone(), canceller.clone(), opts);
let task2 = spawn_task_node_membership_check(dbs.clone(), canceller.clone(), opts);
let task3 = spawn_task_node_membership_cleanup(dbs.clone(), canceller.clone(), opts);
let task4 = spawn_task_changefeed_cleanup(dbs.clone(), canceller.clone(), opts);
let task5 = spawn_task_index_compaction(dbs.clone(), canceller.clone(), opts);
Tasks(vec![task1, task2, task3, task4, task5])
}
fn spawn_task_node_membership_refresh(
dbs: Arc<Datastore>,
canceller: CancellationToken,
opts: &EngineOptions,
) -> Task {
let delay = opts.node_membership_refresh_interval;
Box::pin(spawn(async move {
trace!("Updating node registration information every {delay:?}");
let mut ticker = interval_ticker(delay).await;
loop {
tokio::select! {
biased;
_ = canceller.cancelled() => break,
Some(_) = ticker.next() => {
if let Err(e) = dbs.node_membership_update().await {
error!("Error updating node registration information: {e}");
}
}
}
}
trace!("Background task exited: Updating node registration information");
}))
}
fn spawn_task_node_membership_check(
dbs: Arc<Datastore>,
canceller: CancellationToken,
opts: &EngineOptions,
) -> Task {
let delay = opts.node_membership_check_interval;
Box::pin(spawn(async move {
trace!("Processing and archiving inactive nodes every {delay:?}");
let mut ticker = interval_ticker(delay).await;
loop {
tokio::select! {
biased;
_ = canceller.cancelled() => break,
Some(_) = ticker.next() => {
if let Err(e) = dbs.node_membership_expire().await {
error!("Error processing and archiving inactive nodes: {e}");
}
}
}
}
trace!("Background task exited: Processing and archiving inactive nodes");
}))
}
fn spawn_task_node_membership_cleanup(
dbs: Arc<Datastore>,
canceller: CancellationToken,
opts: &EngineOptions,
) -> Task {
let delay = opts.node_membership_cleanup_interval;
Box::pin(spawn(async move {
trace!("Processing and cleaning archived nodes every {delay:?}");
let mut ticker = interval_ticker(delay).await;
loop {
tokio::select! {
biased;
_ = canceller.cancelled() => break,
Some(_) = ticker.next() => {
if let Err(e) = dbs.node_membership_remove().await {
error!("Error processing and cleaning archived nodes: {e}");
}
}
}
}
trace!("Background task exited: Processing and cleaning archived nodes");
}))
}
fn spawn_task_changefeed_cleanup(
dbs: Arc<Datastore>,
canceller: CancellationToken,
opts: &EngineOptions,
) -> Task {
let delay = opts.changefeed_gc_interval;
Box::pin(spawn(async move {
trace!("Running changefeed garbage collection every {delay:?}");
let mut ticker = interval_ticker(delay).await;
loop {
tokio::select! {
biased;
_ = canceller.cancelled() => break,
Some(_) = ticker.next() => {
if let Err(e) = dbs.changefeed_process().await {
error!("Error running changefeed garbage collection: {e}");
}
}
}
}
trace!("Background task exited: Running changefeed garbage collection");
}))
}
fn spawn_task_index_compaction(
dbs: Arc<Datastore>,
canceller: CancellationToken,
opts: &EngineOptions,
) -> Task {
let interval = opts.index_compaction_interval;
Box::pin(spawn(async move {
trace!("Running index compaction every {interval:?}");
let mut ticker = interval_ticker(interval).await;
loop {
tokio::select! {
biased;
_ = canceller.cancelled() => break,
Some(_) = ticker.next() => {
if let Err(e) = dbs.index_compaction(interval).await {
error!("Error running index compaction: {e}");
}
}
}
}
trace!("Background task exited: Running index compaction");
}))
}
async fn interval_ticker(interval: Duration) -> IntervalStream {
#[cfg(not(target_family = "wasm"))]
use tokio::{time, time::MissedTickBehavior};
#[cfg(target_family = "wasm")]
use wasmtimer::{tokio as time, tokio::MissedTickBehavior};
let mut interval = time::interval(interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
IntervalStream::new(interval)
}
#[cfg(test)]
#[cfg(feature = "kv-mem")]
mod test {
use crate::engine::tasks;
use std::sync::Arc;
use std::time::Duration;
use surrealdb_core::{kvs::Datastore, options::EngineOptions};
use tokio_util::sync::CancellationToken;
#[test_log::test(tokio::test)]
pub async fn tasks_complete() {
let can = CancellationToken::new();
let opt = EngineOptions::default();
let dbs = Arc::new(Datastore::new("memory").await.unwrap());
let tasks = tasks::init(dbs.clone(), can.clone(), &opt);
can.cancel();
tasks.resolve().await.unwrap();
}
#[test_log::test(tokio::test)]
pub async fn tasks_complete_channel_closed() {
let can = CancellationToken::new();
let opt = EngineOptions::default();
let dbs = Arc::new(Datastore::new("memory").await.unwrap());
let tasks = tasks::init(dbs.clone(), can.clone(), &opt);
can.cancel();
tokio::time::timeout(Duration::from_secs(10), tasks.resolve())
.await
.map_err(|e| format!("Timed out after {e}"))
.unwrap()
.map_err(|e| format!("Resolution failed: {e}"))
.unwrap();
}
}