qubit_executor/executor/schedule_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 thread,
13 time::Instant,
14};
15
16use qubit_function::Callable;
17
18use crate::{
19 TrackedTask,
20 hook::{
21 TaskHook,
22 notify_rejected_optional,
23 },
24 service::SubmissionError,
25 task::{
26 spi::TaskEndpointPair,
27 task_admission_gate::TaskAdmissionGate,
28 },
29};
30
31use super::{
32 Executor,
33 thread_spawn_config::ThreadSpawnConfig,
34};
35
36/// Executor that starts each task at a specified monotonic instant.
37///
38/// `ScheduleExecutor` creates the returned [`TrackedTask`] immediately, starts
39/// a helper OS thread, waits on that helper thread until the configured
40/// [`Instant`], and then runs the task on the helper thread. If the configured
41/// instant is not in the future, the helper thread runs the task immediately.
42#[derive(Clone)]
43pub struct ScheduleExecutor {
44 /// Monotonic instant at which each submitted task starts.
45 instant: Instant,
46 /// Hook notified about accepted task lifecycle events.
47 hook: Option<Arc<dyn TaskHook>>,
48 /// Optional stack size for each helper thread.
49 stack_size: Option<usize>,
50}
51
52impl ScheduleExecutor {
53 /// Creates an executor that starts tasks at the supplied monotonic instant.
54 ///
55 /// # Parameters
56 ///
57 /// * `instant` - Monotonic instant at which each submitted task starts.
58 ///
59 /// # Returns
60 ///
61 /// A schedule executor using `instant`.
62 #[inline]
63 pub fn at(instant: Instant) -> Self {
64 Self {
65 instant,
66 hook: None,
67 stack_size: None,
68 }
69 }
70
71 /// Returns a copy of this executor using the supplied task hook.
72 ///
73 /// # Parameters
74 ///
75 /// * `hook` - Hook notified about accepted task lifecycle events.
76 ///
77 /// # Returns
78 ///
79 /// This executor configured with `hook`.
80 #[inline]
81 pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
82 self.hook = Some(hook);
83 self
84 }
85
86 /// Returns a copy of this executor using the supplied helper thread stack size.
87 ///
88 /// # Parameters
89 ///
90 /// * `stack_size` - Stack size in bytes for each helper thread.
91 ///
92 /// # Returns
93 ///
94 /// This executor configured with `stack_size`.
95 #[inline]
96 pub fn with_stack_size(mut self, stack_size: usize) -> Self {
97 self.stack_size = Some(stack_size);
98 self
99 }
100
101 /// Returns the configured scheduled instant.
102 ///
103 /// # Returns
104 ///
105 /// The monotonic instant at which each task starts.
106 #[inline]
107 pub const fn instant(&self) -> Instant {
108 self.instant
109 }
110}
111
112impl Executor for ScheduleExecutor {
113 /// Starts a helper thread that waits until the scheduled instant and then
114 /// runs the callable.
115 fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
116 where
117 C: Callable<R, E> + Send + 'static,
118 R: Send + 'static,
119 E: Send + 'static,
120 {
121 let (handle, slot) =
122 TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
123 let instant = self.instant;
124 let gate = TaskAdmissionGate::new(self.hook.is_some());
125 let worker_gate = gate.clone();
126 let hook = self.hook.clone();
127 ThreadSpawnConfig::new(self.stack_size)
128 .spawn(move || {
129 worker_gate.wait();
130 let now = Instant::now();
131 if instant > now {
132 thread::sleep(instant.duration_since(now));
133 }
134 slot.run(task);
135 })
136 .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
137 handle.accept();
138 gate.open();
139 Ok(handle)
140 }
141}