Skip to main content

reifydb_sub_task/
task.rs

1use std::{
2	error::Error,
3	fmt,
4	future::Future,
5	pin::Pin,
6	sync::{
7		Arc,
8		atomic::{AtomicU64, Ordering},
9	},
10};
11
12use crate::{context::TaskContext, schedule::Schedule};
13
14/// Unique identifier for a scheduled task
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct TaskId(u64);
17
18impl TaskId {
19	/// Generate a new unique task ID
20	pub fn new() -> Self {
21		static COUNTER: AtomicU64 = AtomicU64::new(1);
22		Self(COUNTER.fetch_add(1, Ordering::Relaxed))
23	}
24}
25
26impl Default for TaskId {
27	fn default() -> Self {
28		Self::new()
29	}
30}
31
32impl fmt::Display for TaskId {
33	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34		write!(f, "task-{}", self.0)
35	}
36}
37
38/// Defines the type of work a task performs
39#[derive(Clone)]
40pub enum TaskWork {
41	/// Synchronous blocking work (runs on compute pool)
42	Sync(Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>),
43	/// Asynchronous work (runs on tokio runtime)
44	Async(
45		Arc<
46			dyn Fn(
47					TaskContext,
48				) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>>
49				+ Send
50				+ Sync,
51		>,
52	),
53}
54
55impl fmt::Debug for TaskWork {
56	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57		match self {
58			TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
59			TaskWork::Async(_) => write!(f, "TaskWork::Async"),
60		}
61	}
62}
63
64/// Defines where a task should be executed
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum TaskExecutor {
67	/// Execute on the compute pool (for CPU-bound work)
68	ComputePool,
69	/// Execute on the tokio runtime (for I/O-bound async work)
70	Tokio,
71}
72
73/// A scheduled task definition
74pub struct ScheduledTask {
75	/// Unique identifier for this task
76	pub id: TaskId,
77	/// Human-readable name
78	pub name: String,
79	/// When to execute this task
80	pub schedule: Schedule,
81	/// The work to perform
82	pub work: TaskWork,
83	/// Where to execute the work
84	pub executor: TaskExecutor,
85}
86
87impl fmt::Debug for ScheduledTask {
88	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89		f.debug_struct("ScheduledTask")
90			.field("id", &self.id)
91			.field("name", &self.name)
92			.field("schedule", &self.schedule)
93			.field("work", &self.work)
94			.field("executor", &self.executor)
95			.finish()
96	}
97}
98
99impl ScheduledTask {
100	/// Start building a new scheduled task
101	pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
102		ScheduledTaskBuilder::new(name)
103	}
104}
105
106/// Builder for creating scheduled tasks
107pub struct ScheduledTaskBuilder {
108	name: String,
109	schedule: Option<Schedule>,
110	work: Option<TaskWork>,
111	executor: Option<TaskExecutor>,
112}
113
114impl ScheduledTaskBuilder {
115	/// Create a new task builder
116	pub fn new(name: impl Into<String>) -> Self {
117		Self {
118			name: name.into(),
119			schedule: None,
120			work: None,
121			executor: None,
122		}
123	}
124
125	/// Set the schedule for this task
126	pub fn schedule(mut self, schedule: Schedule) -> Self {
127		self.schedule = Some(schedule);
128		self
129	}
130
131	/// Set synchronous work for this task
132	pub fn work_sync<F>(mut self, f: F) -> Self
133	where
134		F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
135	{
136		self.work = Some(TaskWork::Sync(Arc::new(f)));
137		self
138	}
139
140	/// Set asynchronous work for this task
141	pub fn work_async<F, Fut>(mut self, f: F) -> Self
142	where
143		F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
144		Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
145	{
146		self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
147		self
148	}
149
150	/// Set the executor for this task
151	pub fn executor(mut self, executor: TaskExecutor) -> Self {
152		self.executor = Some(executor);
153		self
154	}
155
156	/// Build the scheduled task
157	pub fn build(self) -> Result<ScheduledTask, String> {
158		let schedule = self.schedule.ok_or("schedule is required")?;
159		let work = self.work.ok_or("work is required")?;
160		let executor = self.executor.ok_or("executor is required")?;
161
162		// Validate the schedule
163		schedule.validate()?;
164
165		Ok(ScheduledTask {
166			id: TaskId::new(),
167			name: self.name,
168			schedule,
169			work,
170			executor,
171		})
172	}
173}