Skip to main content

reifydb_sub_task/
task.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{error::Error, fmt, future::Future, pin::Pin, sync::Arc};
5
6use reifydb_core::interface::catalog::task::TaskId;
7
8use crate::{context::TaskContext, schedule::Schedule};
9
10/// A synchronous task function that runs on the compute pool.
11type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;
12
13/// An asynchronous task function that runs on the tokio runtime.
14type AsyncTaskFn = Arc<
15	dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
16>;
17
18/// Defines the type of work a task performs
19#[derive(Clone)]
20pub enum TaskWork {
21	/// Synchronous blocking work (runs on compute pool)
22	Sync(SyncTaskFn),
23	/// Asynchronous work (runs on tokio runtime)
24	Async(AsyncTaskFn),
25}
26
27impl fmt::Debug for TaskWork {
28	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29		match self {
30			TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
31			TaskWork::Async(_) => write!(f, "TaskWork::Async"),
32		}
33	}
34}
35
36/// Defines where a task should be executed
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum TaskExecutor {
39	/// Execute on the compute pool (for CPU-bound work)
40	ComputePool,
41	/// Execute on the tokio runtime (for I/O-bound async work)
42	Tokio,
43}
44
45/// A scheduled task definition
46pub struct ScheduledTask {
47	/// Unique identifier for this task
48	pub id: TaskId,
49	/// Human-readable name
50	pub name: String,
51	/// When to execute this task
52	pub schedule: Schedule,
53	/// The work to perform
54	pub work: TaskWork,
55	/// Where to execute the work
56	pub executor: TaskExecutor,
57}
58
59impl fmt::Debug for ScheduledTask {
60	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61		f.debug_struct("ScheduledTask")
62			.field("id", &self.id)
63			.field("name", &self.name)
64			.field("schedule", &self.schedule)
65			.field("work", &self.work)
66			.field("executor", &self.executor)
67			.finish()
68	}
69}
70
71impl ScheduledTask {
72	/// Start building a new scheduled task
73	pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
74		ScheduledTaskBuilder::new(name)
75	}
76}
77
78/// Builder for creating scheduled tasks
79pub struct ScheduledTaskBuilder {
80	name: String,
81	schedule: Option<Schedule>,
82	work: Option<TaskWork>,
83	executor: Option<TaskExecutor>,
84}
85
86impl ScheduledTaskBuilder {
87	/// Create a new task builder
88	pub fn new(name: impl Into<String>) -> Self {
89		Self {
90			name: name.into(),
91			schedule: None,
92			work: None,
93			executor: None,
94		}
95	}
96
97	/// Set the schedule for this task
98	pub fn schedule(mut self, schedule: Schedule) -> Self {
99		self.schedule = Some(schedule);
100		self
101	}
102
103	/// Set synchronous work for this task
104	pub fn work_sync<F>(mut self, f: F) -> Self
105	where
106		F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
107	{
108		self.work = Some(TaskWork::Sync(Arc::new(f)));
109		self
110	}
111
112	/// Set asynchronous work for this task
113	pub fn work_async<F, Fut>(mut self, f: F) -> Self
114	where
115		F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
116		Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
117	{
118		self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
119		self
120	}
121
122	/// Set the executor for this task
123	pub fn executor(mut self, executor: TaskExecutor) -> Self {
124		self.executor = Some(executor);
125		self
126	}
127
128	/// Build the scheduled task
129	pub fn build(self) -> Result<ScheduledTask, String> {
130		let schedule = self.schedule.ok_or("schedule is required")?;
131		let work = self.work.ok_or("work is required")?;
132		let executor = self.executor.ok_or("executor is required")?;
133
134		// Validate the schedule
135		schedule.validate()?;
136
137		Ok(ScheduledTask {
138			id: TaskId::new(),
139			name: self.name,
140			schedule,
141			work,
142			executor,
143		})
144	}
145}