reifydb-sub-task 0.6.0

Task execution subsystem for ReifyDB
Documentation
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (c) 2026 ReifyDB

use std::{error::Error, fmt, future::Future, pin::Pin, sync::Arc};

use reifydb_core::interface::catalog::task::TaskId;

use crate::{context::TaskContext, schedule::Schedule};

type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;

type AsyncTaskFn = Arc<
	dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
>;

#[derive(Clone)]
pub enum TaskWork {
	Sync(SyncTaskFn),

	Async(AsyncTaskFn),
}

impl fmt::Debug for TaskWork {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
			TaskWork::Async(_) => write!(f, "TaskWork::Async"),
		}
	}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskExecutor {
	ComputePool,

	Tokio,
}

pub struct ScheduledTask {
	pub id: TaskId,

	pub name: String,

	pub schedule: Schedule,

	pub work: TaskWork,

	pub executor: TaskExecutor,
}

impl fmt::Debug for ScheduledTask {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("ScheduledTask")
			.field("id", &self.id)
			.field("name", &self.name)
			.field("schedule", &self.schedule)
			.field("work", &self.work)
			.field("executor", &self.executor)
			.finish()
	}
}

impl ScheduledTask {
	pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
		ScheduledTaskBuilder::new(name)
	}
}

pub struct ScheduledTaskBuilder {
	name: String,
	schedule: Option<Schedule>,
	work: Option<TaskWork>,
	executor: Option<TaskExecutor>,
}

impl ScheduledTaskBuilder {
	pub fn new(name: impl Into<String>) -> Self {
		Self {
			name: name.into(),
			schedule: None,
			work: None,
			executor: None,
		}
	}

	pub fn schedule(mut self, schedule: Schedule) -> Self {
		self.schedule = Some(schedule);
		self
	}

	pub fn work_sync<F>(mut self, f: F) -> Self
	where
		F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
	{
		self.work = Some(TaskWork::Sync(Arc::new(f)));
		self
	}

	pub fn work_async<F, Fut>(mut self, f: F) -> Self
	where
		F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
		Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
	{
		self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
		self
	}

	pub fn executor(mut self, executor: TaskExecutor) -> Self {
		self.executor = Some(executor);
		self
	}

	pub fn build(self) -> Result<ScheduledTask, String> {
		let schedule = self.schedule.ok_or("schedule is required")?;
		let work = self.work.ok_or("work is required")?;
		let executor = self.executor.ok_or("executor is required")?;

		schedule.validate()?;

		Ok(ScheduledTask {
			id: TaskId::new(),
			name: self.name,
			schedule,
			work,
			executor,
		})
	}
}