Skip to main content

reifydb_sub_task/
task.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	error::Error,
6	fmt,
7	future::Future,
8	pin::Pin,
9	sync::{
10		Arc,
11		atomic::{AtomicU64, Ordering},
12	},
13};
14
15use crate::{context::TaskContext, schedule::Schedule};
16
17/// Unique identifier for a scheduled task
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub struct TaskId(u64);
20
21impl TaskId {
22	/// Generate a new unique task ID
23	pub fn new() -> Self {
24		static COUNTER: AtomicU64 = AtomicU64::new(1);
25		Self(COUNTER.fetch_add(1, Ordering::Relaxed))
26	}
27}
28
29impl Default for TaskId {
30	fn default() -> Self {
31		Self::new()
32	}
33}
34
35impl fmt::Display for TaskId {
36	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37		write!(f, "task-{}", self.0)
38	}
39}
40
41/// Defines the type of work a task performs
42#[derive(Clone)]
43pub enum TaskWork {
44	/// Synchronous blocking work (runs on compute pool)
45	Sync(Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>),
46	/// Asynchronous work (runs on tokio runtime)
47	Async(
48		Arc<
49			dyn Fn(
50					TaskContext,
51				) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>>
52				+ Send
53				+ Sync,
54		>,
55	),
56}
57
58impl fmt::Debug for TaskWork {
59	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60		match self {
61			TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
62			TaskWork::Async(_) => write!(f, "TaskWork::Async"),
63		}
64	}
65}
66
67/// Defines where a task should be executed
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum TaskExecutor {
70	/// Execute on the compute pool (for CPU-bound work)
71	ComputePool,
72	/// Execute on the tokio runtime (for I/O-bound async work)
73	Tokio,
74}
75
76/// A scheduled task definition
77pub struct ScheduledTask {
78	/// Unique identifier for this task
79	pub id: TaskId,
80	/// Human-readable name
81	pub name: String,
82	/// When to execute this task
83	pub schedule: Schedule,
84	/// The work to perform
85	pub work: TaskWork,
86	/// Where to execute the work
87	pub executor: TaskExecutor,
88}
89
90impl fmt::Debug for ScheduledTask {
91	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92		f.debug_struct("ScheduledTask")
93			.field("id", &self.id)
94			.field("name", &self.name)
95			.field("schedule", &self.schedule)
96			.field("work", &self.work)
97			.field("executor", &self.executor)
98			.finish()
99	}
100}
101
102impl ScheduledTask {
103	/// Start building a new scheduled task
104	pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
105		ScheduledTaskBuilder::new(name)
106	}
107}
108
109/// Builder for creating scheduled tasks
110pub struct ScheduledTaskBuilder {
111	name: String,
112	schedule: Option<Schedule>,
113	work: Option<TaskWork>,
114	executor: Option<TaskExecutor>,
115}
116
117impl ScheduledTaskBuilder {
118	/// Create a new task builder
119	pub fn new(name: impl Into<String>) -> Self {
120		Self {
121			name: name.into(),
122			schedule: None,
123			work: None,
124			executor: None,
125		}
126	}
127
128	/// Set the schedule for this task
129	pub fn schedule(mut self, schedule: Schedule) -> Self {
130		self.schedule = Some(schedule);
131		self
132	}
133
134	/// Set synchronous work for this task
135	pub fn work_sync<F>(mut self, f: F) -> Self
136	where
137		F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
138	{
139		self.work = Some(TaskWork::Sync(Arc::new(f)));
140		self
141	}
142
143	/// Set asynchronous work for this task
144	pub fn work_async<F, Fut>(mut self, f: F) -> Self
145	where
146		F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
147		Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
148	{
149		self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
150		self
151	}
152
153	/// Set the executor for this task
154	pub fn executor(mut self, executor: TaskExecutor) -> Self {
155		self.executor = Some(executor);
156		self
157	}
158
159	/// Build the scheduled task
160	pub fn build(self) -> Result<ScheduledTask, String> {
161		let schedule = self.schedule.ok_or("schedule is required")?;
162		let work = self.work.ok_or("work is required")?;
163		let executor = self.executor.ok_or("executor is required")?;
164
165		// Validate the schedule
166		schedule.validate()?;
167
168		Ok(ScheduledTask {
169			id: TaskId::new(),
170			name: self.name,
171			schedule,
172			work,
173			executor,
174		})
175	}
176}