Skip to main content

qubit_executor/executor/
schedule_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    thread,
13    time::Instant,
14};
15
16use qubit_function::Callable;
17
18use crate::{
19    TrackedTask,
20    hook::{
21        TaskHook,
22        notify_rejected_optional,
23    },
24    service::SubmissionError,
25    task::{
26        spi::TaskEndpointPair,
27        task_admission_gate::TaskAdmissionGate,
28    },
29};
30
31use super::{
32    Executor,
33    thread_spawn_config::ThreadSpawnConfig,
34};
35
36/// Executor that starts each task at a specified monotonic instant.
37///
38/// `ScheduleExecutor` creates the returned [`TrackedTask`] immediately, starts
39/// a helper OS thread, waits on that helper thread until the configured
40/// [`Instant`], and then runs the task on the helper thread. If the configured
41/// instant is not in the future, the helper thread runs the task immediately.
42#[derive(Clone)]
43pub struct ScheduleExecutor {
44    /// Monotonic instant at which each submitted task starts.
45    instant: Instant,
46    /// Hook notified about accepted task lifecycle events.
47    hook: Option<Arc<dyn TaskHook>>,
48    /// Optional stack size for each helper thread.
49    stack_size: Option<usize>,
50}
51
52impl ScheduleExecutor {
53    /// Creates an executor that starts tasks at the supplied monotonic instant.
54    ///
55    /// # Parameters
56    ///
57    /// * `instant` - Monotonic instant at which each submitted task starts.
58    ///
59    /// # Returns
60    ///
61    /// A schedule executor using `instant`.
62    #[inline]
63    pub fn at(instant: Instant) -> Self {
64        Self {
65            instant,
66            hook: None,
67            stack_size: None,
68        }
69    }
70
71    /// Returns a copy of this executor using the supplied task hook.
72    ///
73    /// # Parameters
74    ///
75    /// * `hook` - Hook notified about accepted task lifecycle events.
76    ///
77    /// # Returns
78    ///
79    /// This executor configured with `hook`.
80    #[inline]
81    pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
82        self.hook = Some(hook);
83        self
84    }
85
86    /// Returns a copy of this executor using the supplied helper thread stack size.
87    ///
88    /// # Parameters
89    ///
90    /// * `stack_size` - Stack size in bytes for each helper thread.
91    ///
92    /// # Returns
93    ///
94    /// This executor configured with `stack_size`.
95    #[inline]
96    pub fn with_stack_size(mut self, stack_size: usize) -> Self {
97        self.stack_size = Some(stack_size);
98        self
99    }
100
101    /// Returns the configured scheduled instant.
102    ///
103    /// # Returns
104    ///
105    /// The monotonic instant at which each task starts.
106    #[inline]
107    pub const fn instant(&self) -> Instant {
108        self.instant
109    }
110}
111
112impl Executor for ScheduleExecutor {
113    /// Starts a helper thread that waits until the scheduled instant and then
114    /// runs the callable.
115    fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
116    where
117        C: Callable<R, E> + Send + 'static,
118        R: Send + 'static,
119        E: Send + 'static,
120    {
121        let (handle, slot) =
122            TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
123        let instant = self.instant;
124        let gate = TaskAdmissionGate::new(self.hook.is_some());
125        let worker_gate = gate.clone();
126        let hook = self.hook.clone();
127        ThreadSpawnConfig::new(self.stack_size)
128            .spawn(move || {
129                worker_gate.wait();
130                let now = Instant::now();
131                if instant > now {
132                    thread::sleep(instant.duration_since(now));
133                }
134                slot.run(task);
135            })
136            .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
137        handle.accept();
138        gate.open();
139        Ok(handle)
140    }
141}