1use std::{
11 sync::{
12 Arc,
13 Weak,
14 atomic::AtomicBool,
15 },
16 thread,
17 time::Instant,
18};
19
20use qubit_function::{
21 Callable,
22 Runnable,
23};
24
25use crate::{
26 TaskHandle,
27 service::{
28 ExecutorService,
29 ExecutorServiceBuilderError,
30 ExecutorServiceLifecycle,
31 StopReport,
32 SubmissionError,
33 },
34 task::spi::TaskEndpointPair,
35};
36
37use super::{
38 completable_scheduled_task::CompletableScheduledTask,
39 scheduled_executor_service::ScheduledExecutorService,
40 scheduled_task::ScheduledTask,
41 scheduled_task_entry::ScheduledTaskEntry,
42 scheduled_task_handle::ScheduledTaskHandle,
43 scheduled_worker::ScheduledWorker,
44 single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
45};
46
47pub struct SingleThreadScheduledExecutorService {
54 inner: Arc<SingleThreadScheduledExecutorServiceInner>,
56}
57
58impl SingleThreadScheduledExecutorService {
59 #[inline]
74 pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
75 Self::with_stack_size(thread_name, None)
76 }
77
78 pub fn with_stack_size(
94 thread_name: &str,
95 stack_size: Option<usize>,
96 ) -> Result<Self, ExecutorServiceBuilderError> {
97 let inner = Arc::new(SingleThreadScheduledExecutorServiceInner::new());
98 let worker_inner = Arc::clone(&inner);
99 let mut builder = thread::Builder::new().name(thread_name.to_string());
100 if let Some(stack_size) = stack_size {
101 builder = builder.stack_size(stack_size);
102 }
103 if let Err(source) = builder.spawn(move || ScheduledWorker::run(worker_inner)) {
104 return Err(ExecutorServiceBuilderError::SpawnWorker {
105 index: Some(0),
106 source,
107 });
108 }
109 Ok(Self { inner })
110 }
111
112 #[inline]
118 pub fn queued_count(&self) -> usize {
119 self.inner.queued_count()
120 }
121
122 #[inline]
128 pub fn running_count(&self) -> usize {
129 self.inner.running_count()
130 }
131
132 fn cancellation_callback(&self) -> Arc<dyn Fn() + Send + Sync + 'static> {
139 let inner = Arc::downgrade(&self.inner);
140 Arc::new(move || finish_queued_cancellation(&inner))
141 }
142
143 fn schedule_entry(
154 &self,
155 deadline: Instant,
156 entry: Box<dyn ScheduledTaskEntry>,
157 ) -> Result<(), SubmissionError> {
158 let mut state = self.inner.state.lock();
159 if state.lifecycle != ExecutorServiceLifecycle::Running {
160 return Err(SubmissionError::Shutdown);
161 }
162 entry.accept();
163 let sequence = state.next_sequence;
164 state.next_sequence = state.next_sequence.wrapping_add(1);
165 state
166 .tasks
167 .push(ScheduledTask::new(deadline, sequence, entry));
168 self.inner.add_queued_task();
169 self.inner.state.notify_all();
170 Ok(())
171 }
172
173 fn schedule_result_handle<C, R, E>(
188 &self,
189 deadline: Instant,
190 task: C,
191 ) -> Result<TaskHandle<R, E>, SubmissionError>
192 where
193 C: Callable<R, E> + Send + 'static,
194 R: Send + 'static,
195 E: Send + 'static,
196 {
197 let (handle, slot) = TaskEndpointPair::new().into_parts();
198 let cancelled = Arc::new(AtomicBool::new(false));
199 let entry = CompletableScheduledTask::new(task, slot, cancelled);
200 self.schedule_entry(deadline, Box::new(entry))?;
201 Ok(handle)
202 }
203}
204
205impl Drop for SingleThreadScheduledExecutorService {
206 fn drop(&mut self) {
208 self.inner.shutdown();
209 }
210}
211
212impl ExecutorService for SingleThreadScheduledExecutorService {
213 type ResultHandle<R, E>
214 = TaskHandle<R, E>
215 where
216 R: Send + 'static,
217 E: Send + 'static;
218
219 type TrackedHandle<R, E>
220 = ScheduledTaskHandle<R, E>
221 where
222 R: Send + 'static,
223 E: Send + 'static;
224
225 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
227 where
228 T: Runnable<E> + Send + 'static,
229 E: Send + 'static,
230 {
231 let mut task = task;
232 let handle = self.submit_callable(move || task.run())?;
233 drop(handle);
234 Ok(())
235 }
236
237 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
239 where
240 C: Callable<R, E> + Send + 'static,
241 R: Send + 'static,
242 E: Send + 'static,
243 {
244 self.schedule_result_handle(Instant::now(), task)
245 }
246
247 fn submit_tracked_callable<C, R, E>(
249 &self,
250 task: C,
251 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
252 where
253 C: Callable<R, E> + Send + 'static,
254 R: Send + 'static,
255 E: Send + 'static,
256 {
257 self.schedule_callable_at(Instant::now(), task)
258 }
259
260 #[inline]
262 fn shutdown(&self) {
263 self.inner.shutdown();
264 }
265
266 #[inline]
268 fn stop(&self) -> StopReport {
269 self.inner.stop()
270 }
271
272 #[inline]
274 fn lifecycle(&self) -> ExecutorServiceLifecycle {
275 self.inner.lifecycle()
276 }
277
278 #[inline]
280 fn is_not_running(&self) -> bool {
281 self.inner.is_not_running()
282 }
283
284 #[inline]
286 fn is_terminated(&self) -> bool {
287 self.inner.is_terminated()
288 }
289
290 #[inline]
292 fn wait_termination(&self) {
293 self.inner.wait_for_termination();
294 }
295}
296
297impl ScheduledExecutorService for SingleThreadScheduledExecutorService {
298 fn schedule_callable_at<C, R, E>(
300 &self,
301 instant: Instant,
302 task: C,
303 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
304 where
305 C: Callable<R, E> + Send + 'static,
306 R: Send + 'static,
307 E: Send + 'static,
308 {
309 let (tracked, slot) = TaskEndpointPair::new().into_tracked_parts();
310 let cancellation_marker = Arc::new(AtomicBool::new(false));
311 let entry = CompletableScheduledTask::new(task, slot, Arc::clone(&cancellation_marker));
312 self.schedule_entry(instant, Box::new(entry))?;
313 Ok(ScheduledTaskHandle::new(
314 tracked,
315 cancellation_marker,
316 self.cancellation_callback(),
317 ))
318 }
319}
320
321fn finish_queued_cancellation(inner: &Weak<SingleThreadScheduledExecutorServiceInner>) {
327 if let Some(inner) = inner.upgrade() {
328 inner.finish_queued_cancellation();
329 }
330}