Skip to main content

reifydb_sub_task/
task.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::{error::Error, fmt, future::Future, pin::Pin, sync::Arc};
5
6use reifydb_core::interface::catalog::task::TaskId;
7
8use crate::{context::TaskContext, schedule::Schedule};
9
10type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;
11
12type AsyncTaskFn = Arc<
13	dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
14>;
15
16#[derive(Clone)]
17pub enum TaskWork {
18	Sync(SyncTaskFn),
19
20	Async(AsyncTaskFn),
21}
22
23impl fmt::Debug for TaskWork {
24	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25		match self {
26			TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
27			TaskWork::Async(_) => write!(f, "TaskWork::Async"),
28		}
29	}
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum TaskExecutor {
34	ComputePool,
35
36	Tokio,
37}
38
39pub struct ScheduledTask {
40	pub id: TaskId,
41
42	pub name: String,
43
44	pub schedule: Schedule,
45
46	pub work: TaskWork,
47
48	pub executor: TaskExecutor,
49}
50
51impl fmt::Debug for ScheduledTask {
52	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53		f.debug_struct("ScheduledTask")
54			.field("id", &self.id)
55			.field("name", &self.name)
56			.field("schedule", &self.schedule)
57			.field("work", &self.work)
58			.field("executor", &self.executor)
59			.finish()
60	}
61}
62
63impl ScheduledTask {
64	pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
65		ScheduledTaskBuilder::new(name)
66	}
67}
68
69pub struct ScheduledTaskBuilder {
70	name: String,
71	schedule: Option<Schedule>,
72	work: Option<TaskWork>,
73	executor: Option<TaskExecutor>,
74}
75
76impl ScheduledTaskBuilder {
77	pub fn new(name: impl Into<String>) -> Self {
78		Self {
79			name: name.into(),
80			schedule: None,
81			work: None,
82			executor: None,
83		}
84	}
85
86	pub fn schedule(mut self, schedule: Schedule) -> Self {
87		self.schedule = Some(schedule);
88		self
89	}
90
91	pub fn work_sync<F>(mut self, f: F) -> Self
92	where
93		F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
94	{
95		self.work = Some(TaskWork::Sync(Arc::new(f)));
96		self
97	}
98
99	pub fn work_async<F, Fut>(mut self, f: F) -> Self
100	where
101		F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
102		Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
103	{
104		self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
105		self
106	}
107
108	pub fn executor(mut self, executor: TaskExecutor) -> Self {
109		self.executor = Some(executor);
110		self
111	}
112
113	pub fn build(self) -> Result<ScheduledTask, String> {
114		let schedule = self.schedule.ok_or("schedule is required")?;
115		let work = self.work.ok_or("work is required")?;
116		let executor = self.executor.ok_or("executor is required")?;
117
118		schedule.validate()?;
119
120		Ok(ScheduledTask {
121			id: TaskId::new(),
122			name: self.name,
123			schedule,
124			work,
125			executor,
126		})
127	}
128}