Skip to main content

reifydb_sub_task/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::catalog::task::TaskId;
5use tokio::sync::mpsc;
6
7use crate::{
8	coordinator::TaskCoordinatorMessage,
9	registry::{TaskInfo, TaskRegistry},
10	task::ScheduledTask,
11};
12
13/// Handle for interacting with the task scheduler at runtime
14#[derive(Clone)]
15pub struct TaskHandle {
16	registry: TaskRegistry,
17	coordinator_tx: mpsc::Sender<TaskCoordinatorMessage>,
18}
19
20impl TaskHandle {
21	pub(crate) fn new(registry: TaskRegistry, coordinator_tx: mpsc::Sender<TaskCoordinatorMessage>) -> Self {
22		Self {
23			registry,
24			coordinator_tx,
25		}
26	}
27
28	pub async fn register_task(&self, task: ScheduledTask) -> Result<TaskId, String> {
29		let task_id = task.id;
30
31		self.coordinator_tx
32			.send(TaskCoordinatorMessage::Register(task))
33			.await
34			.map_err(|e| format!("Failed to register task: {}", e))?;
35
36		Ok(task_id)
37	}
38
39	pub async fn unregister_task(&self, task_id: TaskId) -> Result<(), String> {
40		self.coordinator_tx
41			.send(TaskCoordinatorMessage::Unregister(task_id))
42			.await
43			.map_err(|e| format!("Failed to unregister task: {}", e))?;
44
45		Ok(())
46	}
47
48	pub fn list_tasks(&self) -> Vec<TaskInfo> {
49		self.registry.iter().map(|entry| TaskInfo::from_entry(*entry.key(), entry.value())).collect()
50	}
51
52	pub fn get_task_info(&self, task_id: TaskId) -> Option<TaskInfo> {
53		self.registry.get(&task_id).map(|entry| TaskInfo::from_entry(task_id, entry.value()))
54	}
55}